使用PySpark,我试图将嵌套字典的RDD转换为数据帧,但是在某些设置为空的字段中丢失数据。

这是代码:

sc = SparkContext()
sqlContext = SQLContext(sc)

def convert_to_row(d):
    return Row(**d)

df2 = sc.parallelize([{"id": "14yy74hwogxoyl2l3v", "geoloc": {"country": {"geoname_id": 3017382, "iso_code": "FR", "name": "France"}}}]).map(convert_to_row).toDF()
df2.printSchema()
df2.show()
df2.toJSON().saveAsTextFile("/tmp/json.test")


当我查看/tmp/json.test时,这里是内容(手动缩进之后):

{
    "geoloc": {
        "country": {
            "name": null,
            "iso_code": null,
            "geoname_id": 3017382
        }
    },
    "id": "14yy74hwogxoyl2l3v"
}


iso_codename已转换为null

有人可以帮我吗?我听不懂

我正在使用Python 2.7和Spark 2.0.0

谢谢 !

最佳答案

按照@ user6910411已经提供的说明(并节省我自己的时间),补救措施(即中间JSON表示形式)是使用read.json而不是toDF和您的函数:

spark.version
# u'2.0.2'

jsonRDD = sc.parallelize([{"id": "14yy74hwogxoyl2l3v", "geoloc": {"country": {"geoname_id": 3017382, "iso_code": "FR", "name": "France"}}}])

df = spark.read.json(jsonRDD)
df.collect()
# result:
[Row(geoloc=Row(country=Row(geoname_id=3017382, iso_code=u'FR', name=u'France')), id=u'14yy74hwogxoyl2l3v')]

# just to have a look at what will be saved:
df.toJSON().collect()
# result:
[u'{"geoloc":{"country":{"geoname_id":3017382,"iso_code":"FR","name":"France"}},"id":"14yy74hwogxoyl2l3v"}']

df.toJSON().saveAsTextFile("/tmp/json.test")


为了进行比较,以下是您自己的df2的外观:

df2.collect()
# result:
[Row(geoloc={u'country': {u'geoname_id': 3017382, u'iso_code': None, u'name': None}}, id=u'14yy74hwogxoyl2l3v')]

df2.toJSON().collect()
# result:
[u'{"geoloc":{"country":{"name":null,"iso_code":null,"geoname_id":3017382}},"id":"14yy74hwogxoyl2l3v"}']

09-25 17:13
查看更多