平化嵌套json来创建具有相同名称的2列并在Phoenix中给出

平化嵌套json来创建具有相同名称的2列并在Phoenix中给出

本文介绍了使用spark scala扁平化嵌套json来创建具有相同名称的2列并在Phoenix中给出重复的错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图展平嵌套的JSON,并创建spark数据框,最终目标是将给定的数据框推到phoenix.我可以使用代码成功地将JSON扁平化.

I was trying to flatten the very nested JSON, and create spark dataframe and the ultimate goal is to push the given dataframe to phoenix. I am successfully able to flatten the JSON using code.

def recurs(df: DataFrame): DataFrame = {
  if(df.schema.fields.find(_.dataType match {
    case ArrayType(StructType(_),_) | StructType(_) => true
    case _ => false
  }).isEmpty) df
  else {
    val columns = df.schema.fields.map(f => f.dataType match {
      case _: ArrayType => explode(col(f.name)).as(f.name)
      case s: StructType => col(s"${f.name}.*")
      case _ => col(f.name)
    })
    recurs(df.select(columns:_*))
  }
}
val df = spark.read.json(json_location)
flatten_df = recurs(df)
flatten_df.show()

我的嵌套json类似于:

My nested json is something like:

          {
           "Total Value": 3,
           "Topic": "Example",
           "values": [
                      {
                        "value": "#example1",
                        "points": [
                                   [
                                   "123",
                                   "156"
                                  ]
                            ],
                        "properties": {
                         "date": "12-04-19",
                         "value": "Model example 1"
                            }
                         },
                       {"value": "#example2",
                        "points": [
                                   [
                                   "124",
                                   "157"
                                  ]
                            ],
                        "properties": {
                         "date": "12-05-19",
                         "value": "Model example 2"
                            }
                         }
                      ]
               }

我得到的输出:

+-----------+-----------+----------+-------------+------------------------+------------------------+
|Total Value| Topic     |value     | points      | date                   |    value               |
+-----------+-----------+----------+-------------+------------------------+------------------------+
| 3         | Example   | example1 | [123,156]   | 12-04-19               |    Model example 1     |
| 3         | Example   | example2 | [124,157]   | 12-05-19               |    Model example 2     |
+-----------+-----------+----------+-------------+------------------------+------------------------+

因此,值键在json中被发现了2次,因此它正在创建2个列名,但这是一个错误,在Phoenix中是不允许提取此数据的.

So, value key is found 2 times in json so it is creating 2 column name but this is an error and not allowed in Phoenix to ingest this data.

错误消息是:

我希望得到这样的输出,以便凤凰可以区分这些列.

I am expecting this output so that phoenix could differentiate the columns.

+-----------+-----------+--------------+---------------+------------------------+------------------------+
|Total Value| Topic     |values.value  | values.points | values.properties.date | values.properties.value|              |
+-----------+-----------+--------------+---------------+------------------------+------------------------+
| 3         | Example   | example1     | [123,156]     | 12-04-19               |    Model example 1     |
| 3         | Example   | example2     | [124,157]     | 12-05-19               |    Model example 2     |
+-----------+-----------+--------------+---------------+------------------------+------------------------+

通过这种方式,凤凰城可以完美地吸收数据,请建议在拼合代码中进行任何更改,或为实现此目的提供任何帮助.谢谢

In this way phoenix can ingest the data perfectly, please suggest any changes in flattening code or any help to achieve the same. Thanks

推荐答案

您需要对recurs方法进行一些更改:

You need slight changes to the recurs method:

  1. 使用ArrayType(st: StructType, _)而不是ArrayType进行交易.
  2. 避免使用*,并在第二个匹配项(StructType)中命名每个字段.
  3. 在正确的位置使用backticks重命名字段,并保持优先级命名.
  1. Dealing with ArrayType(st: StructType, _) instead of ArrayType.
  2. Avoid using *, and name every field in the second match (StructType).
  3. Use backticks at the right places to rename the fields, keeping precedence naming.

以下是一些代码:

def recurs(df: DataFrame): DataFrame = {
  if(!df.schema.fields.exists(_.dataType match {
    case ArrayType(StructType(_),_) | StructType(_) => true
    case _ => false
  })) df
  else {
    val columns = df.schema.fields.flatMap(f => f.dataType match {
      case ArrayType(st: StructType, _) => Seq(explode(col(f.name)).as(f.name))
      case s: StructType =>
        s.fieldNames.map{sf => col(s"`${f.name}`.$sf").as(s"${f.name}.$sf")}
      case _ => Seq(col(s"`${f.name}`"))
    })
    recurs(df.select(columns:_*))
  }
}

val newDF = recurs(df).cache
newDF.show(false)
newDF.printSchema

新的输出:

+-------+-----------+-------------+----------------------+-----------------------+------------+
|Topic  |Total Value|values.points|values.properties.date|values.properties.value|values.value|
+-------+-----------+-------------+----------------------+-----------------------+------------+
|Example|3          |[[123, 156]] |12-04-19              |Model example 1        |#example1   |
|Example|3          |[[124, 157]] |12-05-19              |Model example 2        |#example2   |
+-------+-----------+-------------+----------------------+-----------------------+------------+

root
 |-- Topic: string (nullable = true)
 |-- Total Value: long (nullable = true)
 |-- values.points: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- values.properties.date: string (nullable = true)
 |-- values.properties.value: string (nullable = true)
 |-- values.value: string (nullable = true)

这篇关于使用spark scala扁平化嵌套json来创建具有相同名称的2列并在Phoenix中给出重复的错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-29 02:27