我有一个火花数据框为:

A      B       val_of_B    val1   val2  val3   val4
"c1"  "MCC"     "cd1"      1      2     1.1    1.05
"c1"  "MCC"     "cd2"      2      3     1.1    1.05
"c1"  "MCC"     "cd3"      3      4     1.1    1.05


val1和val2通过A,B和val_of_B的分组获得,其中与val3一样,val4仅是A级信息(例如,与A截然不同,val3仅是“ c1”,1.1)

我想将其写为嵌套JSON,它应如下所示:

对于每个A,JSON格式应类似于

{"val3": 1.1, "val4": 1.05, "MCC":[["cd1",1,2], ["cd2",2,3], ["cd3",3,4]]}


是否可以使用spark api下的现有工具来完成此任务?如果没有,您可以提供准则吗?

最佳答案

您应该使用groupByaggregatefirst内置函数在A列和collect_list必要列上array

import org.apache.spark.sql.functions._
def zipping = udf((arr1: Seq[String], arr2: Seq[Seq[String]])=> arr1.indices.map(index => Array(arr1(index))++arr2(index)))
val jsonDF = df.groupBy("A")
  .agg(first(col("val3")).as("val3"), first(col("val4")).as("val4"), first(col("B")).as("B"), collect_list("val_of_B").as("val_of_B"), collect_list(array("val1", "val2")).as("list"))
  .select(col("val3"), col("val4"), col("B"), zipping(col("val_of_B"), col("list")).as("list"))
  .toJSON


这应该给你

    +-----------------------------------------------------------------------------------------------+
|value                                                                                          |
+-----------------------------------------------------------------------------------------------+
|{"val3":"1.1","val4":"1.05","B":"MCC","list":[["cd1","1","2"],["cd2","2","3"],["cd3","3","4"]]}|
+-----------------------------------------------------------------------------------------------+


接下来是使用list函数将B名称交换为udf的值

def exchangeName = udf((json: String)=> {
  val splitted = json.split(",")
  val name = splitted(2).split(":")(1).trim
  val value = splitted(3).split(":")(1).trim
  splitted(0).trim+","+splitted(1).trim+","+name+":"+value+","+(4 until splitted.size).map(splitted(_)).mkString(",")
})

jsonDF.select(exchangeName(col("value")).as("json"))
  .show(false)


这应该给你你想要的输出

+------------------------------------------------------------------------------------+
|json                                                                                |
+------------------------------------------------------------------------------------+
|{"val3":"1.1","val4":"1.05","MCC":[["cd1","1","2"],["cd2","2","3"],["cd3","3","4"]]}|
+------------------------------------------------------------------------------------+

10-06 00:01