使用PySpark,我试图将嵌套字典的RDD转换为数据帧,但是在某些设置为空的字段中丢失数据。
这是代码:
sc = SparkContext()
sqlContext = SQLContext(sc)
def convert_to_row(d):
return Row(**d)
df2 = sc.parallelize([{"id": "14yy74hwogxoyl2l3v", "geoloc": {"country": {"geoname_id": 3017382, "iso_code": "FR", "name": "France"}}}]).map(convert_to_row).toDF()
df2.printSchema()
df2.show()
df2.toJSON().saveAsTextFile("/tmp/json.test")
当我查看/tmp/json.test时,这里是内容(手动缩进之后):
{
"geoloc": {
"country": {
"name": null,
"iso_code": null,
"geoname_id": 3017382
}
},
"id": "14yy74hwogxoyl2l3v"
}
iso_code
和name
已转换为null
。有人可以帮我吗?我听不懂
我正在使用Python 2.7和Spark 2.0.0
谢谢 !
最佳答案
按照@ user6910411已经提供的说明(并节省我自己的时间),补救措施(即中间JSON表示形式)是使用read.json
而不是toDF
和您的函数:
spark.version
# u'2.0.2'
jsonRDD = sc.parallelize([{"id": "14yy74hwogxoyl2l3v", "geoloc": {"country": {"geoname_id": 3017382, "iso_code": "FR", "name": "France"}}}])
df = spark.read.json(jsonRDD)
df.collect()
# result:
[Row(geoloc=Row(country=Row(geoname_id=3017382, iso_code=u'FR', name=u'France')), id=u'14yy74hwogxoyl2l3v')]
# just to have a look at what will be saved:
df.toJSON().collect()
# result:
[u'{"geoloc":{"country":{"geoname_id":3017382,"iso_code":"FR","name":"France"}},"id":"14yy74hwogxoyl2l3v"}']
df.toJSON().saveAsTextFile("/tmp/json.test")
为了进行比较,以下是您自己的
df2
的外观:df2.collect()
# result:
[Row(geoloc={u'country': {u'geoname_id': 3017382, u'iso_code': None, u'name': None}}, id=u'14yy74hwogxoyl2l3v')]
df2.toJSON().collect()
# result:
[u'{"geoloc":{"country":{"name":null,"iso_code":null,"geoname_id":3017382}},"id":"14yy74hwogxoyl2l3v"}']