Spark数据帧中的结构解析数组

Spark数据帧中的结构解析数组

本文介绍了Spark数据帧中的结构解析数组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个带有一个结构类型列的数据框.示例数据框架构是:

I have a Dataframe with one struct type column. Sample dataframe schema is:

root
 |-- Data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: string (nullable = true)

字段 name 保存列名称,字段 value 保存列值.Data 列中的元素数量未定义,因此可能会有所不同.我需要解析该数据并摆脱嵌套结构.(数组 Explode 在这种情况下不起作用,因为一行中的数据属于一个元素).真正的模式要大得多,并且有多个数组字段,例如数据",因此我的目标是创建一个通用解决方案,我将应用该解决方案应用于类似结构的数组.示例:

Field name holds column name and fields value holds column value. Number of elements in Data column is not defined so it can vary. I need to parse that data and get rid of nested structure. (Array Explode will not work in this case because data in one row belongs to one element). The real schema is much bigger and has multiple array field like 'Data' so my aim is to create a general solution which I will be apply to apply on similar structure arrays.Example:

示例数据:

val data = Seq(
    """{"Data": [{ "name": "FName", "value": "Alex" }, { "name": "LName",   "value": "Strong"  }]}""",
    """{"Data": [{ "name": "FName", "value": "Robert " }, { "name": "MName",   "value": "Nesta "  }]} { "name": "LName",   "value": "Marley"  }]}"""
)
val df = spark.read.json(spark.sparkContext.parallelize(data))

预期结果:

+-------+------+
|  FName| LName|
+-------+------+
|   Alex|Strong|
|Robert |Marley|
+-------+------+

作为解决方案,我创建了一个在整个 Data 列上执行的 UDF.作为输入参数,我传递了要提取的列名和字段名.

As a solution I have create a UDF which I execute on whole Data column. As input parameters I am passing column name and a field name which I want to extract.

 val find_scheme_name_in_array = udf { (arr: Seq[Row], columnName: String) => {
    var value = ""
    arr.foreach(el =>
        if(el.getAs[String]("name") == columnName){
            value = el.getAs[String]("value")
        }
    )
    value
}}

问题是我使用变量 value 来存储中间结果,我不想为将执行我的 UDF 的每一行创建一个新变量.

The problem is that I am using variable value for storing an intermediate result and I don't want to create a new a variable for each row on which my UDF will be executed.

我执行 UDF 的方式(该查询生成预期结果):

The way how I am executing my UDF (That query generates an expected result):

df.select(find_scheme_name_in_array(col("Data"), lit("FName")).as("FName"),find_scheme_name_in_array(col("Data"), lit("LName")).as("LName")).show()

我很高兴听到关于如何改进 UDF 的逻辑以及如何解决解析问题的一些不同方法的任何评论.

I would be happy to hear any comments on how I can improve the UDF's logic and some different ways of how to solve the parsing issue.

推荐答案

我已经通过将 foreach 循环替换为 find 方法解决了这个问题:

I have solved the issue by substituting foreach loop with find method:

val find_scheme_name_in_array = udf { (arr: Seq[Row], columnName: String) =>
    arr.find(_.getAs[String]("name") == columnName) match {
        case Some(i) => i.getAs[String]("value")
        case None => null
    }
}

这篇关于Spark数据帧中的结构解析数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 13:03