我是火花scala的新手,需要社区提供一些帮助。

这是一个应用程序日志,每个请求分散在5至6行中,所有行中的唯一键均为reqID。每行都有一些要收集的列,
我需要在表中的每个reqID写入1条记录。

val jsondftemp = spark.read.json('path') to read the json file

我的输入连接文件:
{"srchTrnsPhrs":"Psychiatric Care","Nm":"bh","Num":"746","reqPlsize":"11707","reqID":"a520a039-310b-485e-9be2-3bfe51d376a2"}
{"CoreFuncStrtTm":"2019-04-16 00:00:16.356614","reqID":"a520a039-310b-485e-9be2-3bfe51d376a2"}
{"CoreFuncEndTm":"2019-04-16 00:00:16.536903","execTm":"180","reqID":"a520a039-310b-485e-9be2-3bfe51d376a2"}

我的架构:
|-- CoreFuncEndTm: string (nullable = true)
|-- CoreFuncStrtTm: string (nullable = true)
|-- Nm: string (nullable = true)
|-- Num : string (nullable = true)
|-- execTm: string (nullable = true)
|-- reqID: string (nullable = true)
|-- srchTrnsPhrs: string (nullable = true)
|-- reqPlsize:  string (nullable = true)

数据框具有:
+--------------------+--------------------+-------+-----------+--------------------+--------------------+-------+---------+
|       CoreFuncEndTm|      CoreFuncStrtTm|Nm     |execTm     |               reqID|       srchEntrdPhrs|Num    |reqPlsize|
+--------------------+--------------------+-------+-----------+--------------------+--------------------+-------+---------+
|                null|                null|     bh|       null|a520a039-310b-485...|    Psychiatric Care|   746 |   11707|
|                null|2019-04-16 00:00:...|   null|       null|a520a039-310b-485...|                null|   null|   null|
|2019-04-16 00:00:...|                null|   null|        180|a520a039-310b-485...|                null|   null|   null|
+--------------------+--------------------+-------+-----------+--------------------+--------------------+-------+-------+

预期输出:
+--------------------+--------------------+-------+-----------+--------------------+--------------------+-------+---------+
|       CoreFuncEndTm|      CoreFuncStrtTm|Nm     |execTm     |               reqID|       srchEntrdPhrs|Num    |reqPlsize|
+--------------------+--------------------+-------+-----------+--------------------+--------------------+-------+---------+
|2019-04-16 00:00:...|2019-04-16 00:00:...|     bh|        180|a520a039-310b-485...|    Psychiatric Care|   746 |   11707|

对此表示任何帮助,我们深表感谢。
ReqID是连接所有行的键,与Reducebykey和group by键操作相混淆。

最佳答案

仅从加载的DF开始的简单方法。


  • 列的明确命名,但可以与(_)等一起动态使用。
  • 类型相同。
  • 需要评估如何处理空值。
  • 处理一般可以丢弃的任何格式的数据。


  • 这里有一些技巧,但有一些技巧,但是不会像以前那样使新手超载:
    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._
    
    import spark.implicits._
    
    val colAggList = List("a", "b", "c", "d")
    val dropCols = Seq("a", "b", "c", "d")
    
    val convToString = udf((arr: Seq[String]) => arr.mkString(",")) // Could just get the 1st element via data.withColumn("newcolname", $"colname"(0))
    
    val df = sc.parallelize(Seq(
       ("r1", Some(1), Some(1), None, Some("x")),
       ("r1", None, None, Some(3), None),
       ("r2", Some(6), Some(4), None, Some("y")),
       ("r3", None, Some(1), Some(5), Some("abc")),
       ("r3", Some(4), None, None, None),
       ("r4", Some(1), None, None, None),
       ("r4", None, Some(2), None, None),
       ("r4", None, None, Some(3), None),
       ("r4", None, None, None, Some("xyz")),
       ("r5", Some(1), Some(2), Some(7), Some("A"))
       )).toDF("ID", "a", "b", "c", "d")
    df.show(false)
    df.printSchema()
    
    // Note Nones, nulls are not collected.
    val df2 = df.groupBy("ID").agg( collect_list(colAggList(0)).as("a"), collect_list(colAggList(1)).as("b"), collect_list(colAggList(2)).as("c"),     collect_list(colAggList(3)).as("d") )
    df2.show(false)
    df2.printSchema()
    
    val df3 = df2.withColumn("aStr", convToString($"a")).withColumn("bStr", convToString($"b")).withColumn("cStr", convToString($"c")).withColumn("dStr", convToString($"d")).drop(dropCols:_*)
    df3.show(false)
    df3.printSchema()
    

    返回,因此您可以看到它是如何工作的-仅显示原始和最终输出:
    +---+----+----+----+----+
    |ID |a   |b   |c   |d   |
    +---+----+----+----+----+
    |r1 |1   |1   |null|x   |
    |r1 |null|null|3   |null|
    |r2 |6   |4   |null|y   |
    |r3 |null|1   |5   |abc |
    |r3 |4   |null|null|null|
    |r4 |1   |null|null|null|
    |r4 |null|2   |null|null|
    |r4 |null|null|3   |null|
    |r4 |null|null|null|xyz |
    |r5 |1   |2   |7   |A   |
    +---+----+----+----+----+
    
    
    +---+----+----+----+----+
    |ID |aStr|bStr|cStr|dStr|
    +---+----+----+----+----+
    |r1 |1   |1   |3   |x   |
    |r5 |1   |2   |7   |A   |
    |r2 |6   |4   |    |y   |
    |r4 |1   |2   |3   |xyz |
    |r3 |4   |1   |5   |abc |
    +---+----+----+----+----+
    

    注意人为的缺失值显示为空白。

    09-09 20:31
    查看更多