将JSON字符串列拆分为

将JSON字符串列拆分为

本文介绍了将JSON字符串列拆分为多列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找一种通用的解决方案,以从JSON字符串列中提取所有json字段作为列.

I'm looking for a generic solution to extract all the json fields as columns from a JSON string column.

df =  spark.read.load(path)
df.show()

路径"中文件的文件格式为镶木地板

File format of the files in 'path' is parquet

样本数据

|id | json_data
| 1 | {"name":"abc", "depts":["dep01", "dep02"]}
| 2 | {"name":"xyz", "depts":["dep03"],"sal":100}
| 3 | {"name":"pqr", "depts":["dep02"], "address":{"city":"SF","state":"CA"}}

预期输出

|id | name    | depts              | sal | address_city | address_state
| 1 | "abc"   | ["dep01", "dep02"] | null| null         | null
| 2 | "xyz"   | ["dep03"]          | 100 | null         | null
| 3 | "pqr"   | ["dep02"]          | null| "SF"         | "CA"

我知道我可以通过使用定义的模式创建StructType并使用'from_json'方法来提取列.

I know that I can extract the columns by creating a StructType with the schema defined and using 'from_json' method.

但是这种方法需要手动定义架构.

But this approach requires manual schema definition.

val myStruct = StructType(
  Seq(
    StructField("name", StringType),
    StructField("depts", ArrayType(StringType)),
    StructField("sal", IntegerType)
  ))

var newDf = df.withColumn("depts", from_json(col("depts"), myStruct))

是否有更好的方法来平铺JSON列而无需手动定义架构?在提供的示例中,我可以看到可用的JSON字段.但实际上,我无法遍历所有行以找到所有字段.

Is there a better way to flatten the JSON column without manually defining the schema?In the example provided, I can see the JSON fields available.But in reality, I can't traverse all the rows to find all the fields.

因此,我正在寻找一种解决方案,可将所有字段拆分为列,而无需指定列的名称或类型.

So I'm looking for a solution to split all the fields to columns without specifying the names or types of the columns.

推荐答案

基于@Gaurang Shah的回答,我实现了一个解决嵌套JSON结构的解决方案,并使用monotonically_increasing_id(Non-sequential)解决了问题

Based on @Gaurang Shah's answer, I have implemented a solution to handle nested JSON structure and fixed the issues with using monotonically_increasing_id(Non-sequential)

在这种方法中,"populateColumnName"函数递归检查StructType列并填充列名.

In this approach, 'populateColumnName' function recursively checks for StructType column and populate the column name.

'renameColumns'函数通过替换'.'重命名列.用'_'标识嵌套的json字段.

'renameColumns' function renames the columns by replacing '.' with '_' to identify the nested json fields.

'addIndex'函数在解析JSON列后将索引添加到数据框以加入数据框.

'addIndex' function adds index to the dataframe to join the dataframe after parsing the JSON column.

def flattenJSON(df : DataFrame, columnName: String) : DataFrame = {

    val indexCol = "internal_temp_id"

    def populateColumnName(col : StructField) : Array[String] = {
        col.dataType match {
          case struct: StructType => struct.fields.flatMap(populateColumnName).map(col.name + "." + _)
          case rest         => Array(col.name)
        }
    }

    def renameColumns(name : String) : String = {
        if(name contains ".") {
            name + " as " + name.replaceAll("\\.", "_")
        }
        else name
    }

    def addIndex(df : DataFrame) : DataFrame = {

        // Append "rowid" column of type Long
        val newSchema = StructType(df.schema.fields ++ Array(StructField(indexCol, LongType, false)))

        // Zip on RDD level
        val rddWithId = df.rdd.zipWithIndex
        // Convert back to DataFrame
        spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
    }

    val dfWithID = addIndex(df)

    val jsonDF = df.select(columnName)

    val ds = jsonDF.rdd.map(_.getString(0)).toDS
    val parseDF = spark.read.option("inferSchema",true).json(ds)

    val columnNames = parseDF.schema.fields.flatMap(populateColumnName).map(renameColumns)

    var resultDF = parseDF.selectExpr(columnNames:_*)

    val jsonDFWithID = addIndex(resultDF)

    val joinDF = dfWithID.join(jsonDFWithID, indexCol).drop(indexCol)

    joinDF
}

val res = flattenJSON(jsonDF, "address")

这篇关于将JSON字符串列拆分为多列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-15 00:59