我使用以下代码导出 HBase 表并将输出保存到 HDFS:

hbase org.apache.hadoop.hbase.mapreduce.Export \
MyHbaseTable1 hdfs://nameservice1/user/ken/data/exportTable1

输出文件是二进制文件。如果我使用 pyspark 读取文件夹:
test1 = sc.textFile('hdfs://nameservice1/user/ken/data/exportTable1')
test1.show(5)

表明:
u'SEQ\x061org.apache.hadoop.hbase.io.ImmutableBytesWritable%org.apache.hadoop.hbase.client.Result\x00\x00\x00\x00\x00\x00\ufffd-\x10A\ufffd~lUE\u025bt\ufffd\ufffd\ufffd&\x00\x00\x04\ufffd\x00\x00\x00'
u'\x00\x00\x00\x067-2010\ufffd\t'
u'|'
u'\x067-2010\x12\x01r\x1a\x08clo-0101 \ufffd\ufffd\ufffd*(\x042\\6.67|10|10|10|7.33|6.67|6.67|6.67|6.67|6.67|6.67|5.83|3.17|0|0|0.67|0.67|0.67|0.67|0|0|0|0|0'
u'u'

我可以说
  • '7-2010' 第二行中的 是 Rowkey,
  • 'r' 第4行的是列族,
  • 'clo-0101' 第4行的是列名,
  • '6.67|10|10|10|7.33|6.67|6.67|6.67|6.67|6.67|6.67|5.83|3.17|0|0|0.67|0.67|0.67|0.67|0.67|0.67| ' 是值。

  • 我不知道第 3 行和第 5 行是从哪里来的。似乎 Hbase-export 遵循自己的规则生成文件,如果我使用自己的方式对其进行解码,则数据可能会损坏。

    问题:

    如何将此文件转换回可读格式?例如:
    7-2010, r, clo-0101, 6.67|10|10|10|7.33|6.67|6.67|6.67|6.67|6.67|6.67|5.83|3.17|0|0|0.67|0.67|0.67|0.67|0|0|0|0|0
    

    我试过了:
    test1 = sc.sequenceFile('/user/youyang/data/hbaseSnapshot1/', keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, minSplits=None, batchSize=0)
    test1.show(5)
    


    test1 = sc.sequenceFile('hdfs://nameservice1/user/ken/data/exportTable1'
              , keyClass='org.apache.hadoop.hbase.mapreduce.TableInputFormat'
              , valueClass='org.apache.hadoop.hbase.io.ImmutableBytesWritable'
              , keyConverter='org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter'
              , valueConverter='org.apache.spark.examples.pythonconverters.HBaseResultToStringCon verter'
              , minSplits=None
              , batchSize=100)
    

    不走运,代码不起作用,错误:



    有什么建议么?谢谢!

    最佳答案

    我最近自己也遇到了这个问题。我通过远离 sc.sequenceFile 来解决它,而是使用 sc.newAPIHadoopFile (或者如果你使用的是旧 API,则只是 hadoopFile )。 Spark SequenceFile-reader 似乎只处理 Writable 类型的键/值(在 docs 中说明)。

    如果你使用 newAPIHadoopFile 它使用 Hadoop 反序列化逻辑,你可以在你给它的 config-dictionary 中指定你需要的序列化类型:

    hadoop_conf = {"io.serializations": "org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.hbase.mapreduce.ResultSerialization"}
    
    sc.newAPIHadoopFile(
    <input_path>,
    'org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat',
     keyClass='org.apache.hadoop.hbase.io.ImmutableBytesWritable',
     valueClass='org.apache.hadoop.hbase.client.Result',
     keyConverter='org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter',
     valueConverter='org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter',
     conf=hadoop_conf)
    

    请注意,hadoop_conf 中“io.serializations”的值是一个逗号分隔的列表,其中包括“org.apache.hadoop.hbase.mapreduce.ResultSerialization”。这是您能够反序列化结果所需的关键配置。为了能够反序列化 ImmutableBytesWritable,还需要 WritableSerialization。

    您也可以使用 sc.newAPIHadoopRDD ,但是您还需要在配置字典中为“mapreduce.input.fileinputformat.inputdir”设置一个值。

    关于apache-spark - 如何读取从 HBase 导出的序列文件,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38275824/

    10-15 07:38