本文介绍了使用动态模式 Spark from_json的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Spark 处理具有可变结构(嵌套 JSON)的 JSON 数据.输入的 JSON 数据可能非常大,每行超过 1000 个键,一批可能超过 20 GB.整个批次已从 30 个数据源生成,每个 JSON 的key2"可用于识别源,并预定义每个源的结构.

I am trying to use Spark for processing JSON data with variable structure(nested JSON). Input JSON data could be very large with more than 1000 of keys per row and one batch could be more than 20 GB.Entire batch has been generated from 30 data sources and 'key2' of each JSON can be used to identify the source and structure for each source is predefined.

处理此类数据的最佳方法是什么?我曾尝试使用如下所示的 from_json,但它仅适用于固定模式,并且首先使用它我需要根据每个源对数据进行分组,然后应用该模式.由于数据量很大,我的首选是仅扫描数据一次并根据预定义的架构从每个来源提取所需的值.

What would be the best approach for processing such data?I have tried using from_json like below but it works only with fixed schema and to use it first I need to group the data based on each source and then apply the schema.Due to large data volume my preferred choice is to scan the data only once and extract required values from each source, based on predefined schema.

import org.apache.spark.sql.types._
import spark.implicits._

val data = sc.parallelize(
    """{"key1":"val1","key2":"source1","key3":{"key3_k1":"key3_v1"}}"""
    :: Nil)
val df = data.toDF


val schema = (new StructType)
    .add("key1", StringType)
    .add("key2", StringType)
    .add("key3", (new StructType)
    .add("key3_k1", StringType))


df.select(from_json($"value",schema).as("json_str"))
  .select($"json_str.key3.key3_k1").collect
res17: Array[org.apache.spark.sql.Row] = Array([xxx])

推荐答案

这只是对@Ramesh Maharjan 答案的重述,但使用了更现代的 Spark 语法.

This is just a restatement of @Ramesh Maharjan's answer, but with more modern Spark syntax.

我发现这种方法潜伏在 DataFrameReader 中,它允许您将 Dataset[String] 中的 JSON 字符串解析为任意 DataFrame 并获取相同模式推理的优势 Spark 为您提供 spark.read.json("filepath") 直接从 JSON 文件读取时.每行的架构可以完全不同.

I found this method lurking in DataFrameReader which allows you to parse JSON strings from a Dataset[String] into an arbitrary DataFrame and take advantage of the same schema inference Spark gives you with spark.read.json("filepath") when reading directly from a JSON file. The schema of each row can be completely different.

def json(jsonDataset: Dataset[String]): DataFrame

示例用法:

val jsonStringDs = spark.createDataset[String](
  Seq(
      ("""{"firstname": "Sherlock", "lastname": "Holmes", "address": {"streetNumber": 121, "street": "Baker", "city": "London"}}"""),
      ("""{"name": "Amazon", "employeeCount": 500000, "marketCap": 817117000000, "revenue": 177900000000, "CEO": "Jeff Bezos"}""")))

jsonStringDs.show

jsonStringDs:org.apache.spark.sql.Dataset[String] = [value: string]
+----------------------------------------------------------------------------------------------------------------------+
|value
|
+----------------------------------------------------------------------------------------------------------------------+
|{"firstname": "Sherlock", "lastname": "Holmes", "address": {"streetNumber": 121, "street": "Baker", "city": "London"}}|
|{"name": "Amazon", "employeeCount": 500000, "marketCap": 817117000000, "revenue": 177900000000, "CEO": "Jeff Bezos"}  |
+----------------------------------------------------------------------------------------------------------------------+


val df = spark.read.json(jsonStringDs)
df.show(false)

df:org.apache.spark.sql.DataFrame = [CEO: string, address: struct ... 6 more fields]
+----------+------------------+-------------+---------+--------+------------+------+------------+
|CEO       |address           |employeeCount|firstname|lastname|marketCap   |name  |revenue     |
+----------+------------------+-------------+---------+--------+------------+------+------------+
|null      |[London,Baker,121]|null         |Sherlock |Holmes  |null        |null  |null        |
|Jeff Bezos|null              |500000       |null     |null    |817117000000|Amazon|177900000000|
+----------+------------------+-------------+---------+--------+------------+------+------------+

该方法可从 Spark 2.2.0 获得:http://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.DataFrameReader@json(jsonDataset:org.apache.spark.sql.Dataset[String]):org.apache.spark.sql.DataFrame

The method is available from Spark 2.2.0:http://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.DataFrameReader@json(jsonDataset:org.apache.spark.sql.Dataset[String]):org.apache.spark.sql.DataFrame

这篇关于使用动态模式 Spark from_json的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-05 04:59