本文介绍了Spark 2.0.0使用可变模式读取JSON数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试处理一个月的网站流量,该流量作为json(每行/网站流量命中一个json对象)存储在S3存储桶中.数据量足够大,我无法要求Spark推断模式(OOM错误).如果我指定架构,则显然可以正常加载.但是,问题在于每个json对象中包含的字段都不同,因此,即使我使用一天的流量来构建架构,每月的架构也会有所不同(更多字段),因此我的Spark作业会失败.

I am trying to process a month's worth of website traffic, which is stored in an S3 bucket as json (one json object per line/website traffic hit). The amount of data is big enough that I can't ask Spark to infer the schema (OOM errors). If I specify the schema it loads fine obviously. But, the issue is that the fields contained in each json object differ, so even if I build a schema using one day's worth of traffic, the monthly schema will be different (more fields) and so my Spark job fails.

所以我很好奇了解其他人如何处理此问题.例如,我可以使用传统的RDD mapreduce作业来提取我感兴趣的字段,将其导出,然后将所有内容加载到数据框中.但这很慢,似乎有点自欺欺人.

So I'm curious to understand how others deal with this issue. I can for example use a traditional RDD mapreduce job to extract the fields I'm interested in, export and then load everything into a dataframe. But this is slow and seems a bit like self-defeating.

我在这里找到了类似的问题,但没有任何相关之处给我的信息.

I've found a similar question here but no relevant info for me.

谢谢.

推荐答案

如果您知道您感兴趣的字段,请提供一个模式子集. JSON阅读器可以优雅地忽略意外字段.假设您的数据如下所示:

If you know the fields you're interested in just provide a subset of schema. JSON reader can gracefully ignore unexpected fields. Let's say your data looks like this:

import json
import tempfile

object = {"foo": {"bar": {"x": 1, "y": 1}, "baz": [1, 2, 3]}}

_, f = tempfile.mkstemp()
with open(f, "w") as fw:
    json.dump(object, fw)

,而您仅对foo.bar.xfoo.bar.z(不存在)感兴趣:

and you're interested only in foo.bar.x and foo.bar.z (non-existent):

from pyspark.sql.types import StructType

schema = StructType.fromJson({'fields': [{'metadata': {},
   'name': 'foo',
   'nullable': True,
   'type': {'fields': [
       {'metadata': {}, 'name': 'bar', 'nullable': True, 'type': {'fields': [
           {'metadata': {}, 'name': 'x', 'nullable': True, 'type': 'long'},
           {'metadata': {}, 'name': 'z', 'nullable': True, 'type': 'double'}],
       'type': 'struct'}}],
    'type': 'struct'}}],
 'type': 'struct'})

df = spark.read.schema(schema).json(f)
df.show()

## +----------+
## |       foo|
## +----------+
## |[[1,null]]|
## +----------+

df.printSchema()
## root
##  |-- foo: struct (nullable = true)
##  |    |-- bar: struct (nullable = true)
##  |    |    |-- x: long (nullable = true)
##  |    |    |-- z: double (nullable = true)

您还可以降低架构推断的采样率,以提高整体性能.

You can also reduce sampling ratio for schema inference to improve overall performance.

这篇关于Spark 2.0.0使用可变模式读取JSON数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-05 05:00