问题描述
我试图展平嵌套的JSON,并创建spark数据框,最终目标是将给定的数据框推到phoenix.我可以使用代码成功地将JSON扁平化.
I was trying to flatten the very nested JSON, and create spark dataframe and the ultimate goal is to push the given dataframe to phoenix. I am successfully able to flatten the JSON using code.
def recurs(df: DataFrame): DataFrame = {
if(df.schema.fields.find(_.dataType match {
case ArrayType(StructType(_),_) | StructType(_) => true
case _ => false
}).isEmpty) df
else {
val columns = df.schema.fields.map(f => f.dataType match {
case _: ArrayType => explode(col(f.name)).as(f.name)
case s: StructType => col(s"${f.name}.*")
case _ => col(f.name)
})
recurs(df.select(columns:_*))
}
}
val df = spark.read.json(json_location)
flatten_df = recurs(df)
flatten_df.show()
我的嵌套json类似于:
My nested json is something like:
{
"Total Value": 3,
"Topic": "Example",
"values": [
{
"value": "#example1",
"points": [
[
"123",
"156"
]
],
"properties": {
"date": "12-04-19",
"value": "Model example 1"
}
},
{"value": "#example2",
"points": [
[
"124",
"157"
]
],
"properties": {
"date": "12-05-19",
"value": "Model example 2"
}
}
]
}
我得到的输出:
+-----------+-----------+----------+-------------+------------------------+------------------------+
|Total Value| Topic |value | points | date | value |
+-----------+-----------+----------+-------------+------------------------+------------------------+
| 3 | Example | example1 | [123,156] | 12-04-19 | Model example 1 |
| 3 | Example | example2 | [124,157] | 12-05-19 | Model example 2 |
+-----------+-----------+----------+-------------+------------------------+------------------------+
因此,值键在json中被发现了2次,因此它正在创建2个列名,但这是一个错误,在Phoenix中是不允许提取此数据的.
So, value key is found 2 times in json so it is creating 2 column name but this is an error and not allowed in Phoenix to ingest this data.
错误消息是:
我希望得到这样的输出,以便凤凰可以区分这些列.
I am expecting this output so that phoenix could differentiate the columns.
+-----------+-----------+--------------+---------------+------------------------+------------------------+
|Total Value| Topic |values.value | values.points | values.properties.date | values.properties.value| |
+-----------+-----------+--------------+---------------+------------------------+------------------------+
| 3 | Example | example1 | [123,156] | 12-04-19 | Model example 1 |
| 3 | Example | example2 | [124,157] | 12-05-19 | Model example 2 |
+-----------+-----------+--------------+---------------+------------------------+------------------------+
通过这种方式,凤凰城可以完美地吸收数据,请建议在拼合代码中进行任何更改,或为实现此目的提供任何帮助.谢谢
In this way phoenix can ingest the data perfectly, please suggest any changes in flattening code or any help to achieve the same. Thanks
推荐答案
您需要对recurs
方法进行一些更改:
You need slight changes to the recurs
method:
- 使用
ArrayType(st: StructType, _)
而不是ArrayType
进行交易. - 避免使用*,并在第二个匹配项(
StructType
)中命名每个字段. - 在正确的位置使用
backticks
重命名字段,并保持优先级命名.
- Dealing with
ArrayType(st: StructType, _)
instead ofArrayType
. - Avoid using *, and name every field in the second match (
StructType
). - Use
backticks
at the right places to rename the fields, keeping precedence naming.
以下是一些代码:
def recurs(df: DataFrame): DataFrame = {
if(!df.schema.fields.exists(_.dataType match {
case ArrayType(StructType(_),_) | StructType(_) => true
case _ => false
})) df
else {
val columns = df.schema.fields.flatMap(f => f.dataType match {
case ArrayType(st: StructType, _) => Seq(explode(col(f.name)).as(f.name))
case s: StructType =>
s.fieldNames.map{sf => col(s"`${f.name}`.$sf").as(s"${f.name}.$sf")}
case _ => Seq(col(s"`${f.name}`"))
})
recurs(df.select(columns:_*))
}
}
val newDF = recurs(df).cache
newDF.show(false)
newDF.printSchema
新的输出:
+-------+-----------+-------------+----------------------+-----------------------+------------+
|Topic |Total Value|values.points|values.properties.date|values.properties.value|values.value|
+-------+-----------+-------------+----------------------+-----------------------+------------+
|Example|3 |[[123, 156]] |12-04-19 |Model example 1 |#example1 |
|Example|3 |[[124, 157]] |12-05-19 |Model example 2 |#example2 |
+-------+-----------+-------------+----------------------+-----------------------+------------+
root
|-- Topic: string (nullable = true)
|-- Total Value: long (nullable = true)
|-- values.points: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
|-- values.properties.date: string (nullable = true)
|-- values.properties.value: string (nullable = true)
|-- values.value: string (nullable = true)
这篇关于使用spark scala扁平化嵌套json来创建具有相同名称的2列并在Phoenix中给出重复的错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!