问题描述
我有一个带有一个结构类型列的数据框.示例数据框架构是:
I have a Dataframe with one struct type column. Sample dataframe schema is:
root
|-- Data: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- value: string (nullable = true)
字段 name
保存列名称,字段 value
保存列值.Data
列中的元素数量未定义,因此可能会有所不同.我需要解析该数据并摆脱嵌套结构.(数组 Explode
在这种情况下不起作用,因为一行中的数据属于一个元素).真正的模式要大得多,并且有多个数组字段,例如数据",因此我的目标是创建一个通用解决方案,我将应用该解决方案应用于类似结构的数组.示例:
Field name
holds column name and fields value
holds column value. Number of elements in Data
column is not defined so it can vary. I need to parse that data and get rid of nested structure. (Array Explode
will not work in this case because data in one row belongs to one element). The real schema is much bigger and has multiple array field like 'Data' so my aim is to create a general solution which I will be apply to apply on similar structure arrays.Example:
示例数据:
val data = Seq(
"""{"Data": [{ "name": "FName", "value": "Alex" }, { "name": "LName", "value": "Strong" }]}""",
"""{"Data": [{ "name": "FName", "value": "Robert " }, { "name": "MName", "value": "Nesta " }]} { "name": "LName", "value": "Marley" }]}"""
)
val df = spark.read.json(spark.sparkContext.parallelize(data))
预期结果:
+-------+------+
| FName| LName|
+-------+------+
| Alex|Strong|
|Robert |Marley|
+-------+------+
作为解决方案,我创建了一个在整个 Data
列上执行的 UDF.作为输入参数,我传递了要提取的列名和字段名.
As a solution I have create a UDF which I execute on whole Data
column. As input parameters I am passing column name and a field name which I want to extract.
val find_scheme_name_in_array = udf { (arr: Seq[Row], columnName: String) => {
var value = ""
arr.foreach(el =>
if(el.getAs[String]("name") == columnName){
value = el.getAs[String]("value")
}
)
value
}}
问题是我使用变量 value
来存储中间结果,我不想为将执行我的 UDF 的每一行创建一个新变量.
The problem is that I am using variable value
for storing an intermediate result and I don't want to create a new a variable for each row on which my UDF will be executed.
我执行 UDF 的方式(该查询生成预期结果):
The way how I am executing my UDF (That query generates an expected result):
df.select(find_scheme_name_in_array(col("Data"), lit("FName")).as("FName"),find_scheme_name_in_array(col("Data"), lit("LName")).as("LName")).show()
我很高兴听到关于如何改进 UDF 的逻辑以及如何解决解析问题的一些不同方法的任何评论.
I would be happy to hear any comments on how I can improve the UDF's logic and some different ways of how to solve the parsing issue.
推荐答案
我已经通过将 foreach
循环替换为 find
方法解决了这个问题:
I have solved the issue by substituting foreach
loop with find
method:
val find_scheme_name_in_array = udf { (arr: Seq[Row], columnName: String) =>
arr.find(_.getAs[String]("name") == columnName) match {
case Some(i) => i.getAs[String]("value")
case None => null
}
}
这篇关于Spark数据帧中的结构解析数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!