我使用以下代码导出 HBase 表并将输出保存到 HDFS:
hbase org.apache.hadoop.hbase.mapreduce.Export \
MyHbaseTable1 hdfs://nameservice1/user/ken/data/exportTable1
输出文件是二进制文件。如果我使用 pyspark 读取文件夹:
test1 = sc.textFile('hdfs://nameservice1/user/ken/data/exportTable1')
test1.show(5)
表明:
u'SEQ\x061org.apache.hadoop.hbase.io.ImmutableBytesWritable%org.apache.hadoop.hbase.client.Result\x00\x00\x00\x00\x00\x00\ufffd-\x10A\ufffd~lUE\u025bt\ufffd\ufffd\ufffd&\x00\x00\x04\ufffd\x00\x00\x00'
u'\x00\x00\x00\x067-2010\ufffd\t'
u'|'
u'\x067-2010\x12\x01r\x1a\x08clo-0101 \ufffd\ufffd\ufffd*(\x042\\6.67|10|10|10|7.33|6.67|6.67|6.67|6.67|6.67|6.67|5.83|3.17|0|0|0.67|0.67|0.67|0.67|0|0|0|0|0'
u'u'
我可以说
我不知道第 3 行和第 5 行是从哪里来的。似乎 Hbase-export 遵循自己的规则生成文件,如果我使用自己的方式对其进行解码,则数据可能会损坏。
问题:
如何将此文件转换回可读格式?例如:
7-2010, r, clo-0101, 6.67|10|10|10|7.33|6.67|6.67|6.67|6.67|6.67|6.67|5.83|3.17|0|0|0.67|0.67|0.67|0.67|0|0|0|0|0
我试过了:
test1 = sc.sequenceFile('/user/youyang/data/hbaseSnapshot1/', keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, minSplits=None, batchSize=0)
test1.show(5)
和
test1 = sc.sequenceFile('hdfs://nameservice1/user/ken/data/exportTable1'
, keyClass='org.apache.hadoop.hbase.mapreduce.TableInputFormat'
, valueClass='org.apache.hadoop.hbase.io.ImmutableBytesWritable'
, keyConverter='org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter'
, valueConverter='org.apache.spark.examples.pythonconverters.HBaseResultToStringCon verter'
, minSplits=None
, batchSize=100)
不走运,代码不起作用,错误:
有什么建议么?谢谢!
最佳答案
我最近自己也遇到了这个问题。我通过远离 sc.sequenceFile
来解决它,而是使用 sc.newAPIHadoopFile
(或者如果你使用的是旧 API,则只是 hadoopFile )。 Spark SequenceFile-reader 似乎只处理 Writable 类型的键/值(在 docs 中说明)。
如果你使用 newAPIHadoopFile
它使用 Hadoop 反序列化逻辑,你可以在你给它的 config-dictionary 中指定你需要的序列化类型:
hadoop_conf = {"io.serializations": "org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.hbase.mapreduce.ResultSerialization"}
sc.newAPIHadoopFile(
<input_path>,
'org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat',
keyClass='org.apache.hadoop.hbase.io.ImmutableBytesWritable',
valueClass='org.apache.hadoop.hbase.client.Result',
keyConverter='org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter',
valueConverter='org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter',
conf=hadoop_conf)
请注意,hadoop_conf 中“io.serializations”的值是一个逗号分隔的列表,其中包括“org.apache.hadoop.hbase.mapreduce.ResultSerialization”。这是您能够反序列化结果所需的关键配置。为了能够反序列化 ImmutableBytesWritable,还需要 WritableSerialization。
您也可以使用
sc.newAPIHadoopRDD
,但是您还需要在配置字典中为“mapreduce.input.fileinputformat.inputdir”设置一个值。关于apache-spark - 如何读取从 HBase 导出的序列文件,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38275824/