我有此数据框,其中包含3列-> userId,日期,生成

+-------+--------+----------------------------------------------------------------------------+
|userId |   date |generation                                                                  |
+-------+--------+----------------------------------------------------------------------------+
|1      |20160926|Map("screen_WiFi" -> 15.127, "upload_WiFi" -> 0.603, "total_WiFi" -> 19.551)|
|1      |20160926|Map("screen_2g" -> 0.573, "upload_2g" -> 0.466, "total_2g" -> 1.419)        |
|1      |20160926|Map("screen_3g" -> 10.084, "upload_3g" -> 80.515, "total_3g" -> 175.435)    |
+-------+--------+----------------------------------------------------------------------------+


我想根据用户ID和日期对这些值进行分组
但是问题出在第三列,其中包含maptype的值,并且要求将所有maptype值合并到一列中,最终输出应如下所示->


+-------+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId |date    |generation                                                                                                                                                                                                       |
+-------+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1      |20160926|Map("screen_WiFi" -> 15.127, "upload_WiFi" -> 0.603, "total_WiFi" -> 19.551,"screen_2g" -> 0.573, "upload_2g" -> 0.466, "total_2g" -> 1.419, "screen_3g" -> 10.084, "upload_3g" -> 80.515, "total_3g" -> 175.435)|
+-------+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+


有什么方法可以解决此问题,或有任何可能的解决方法?

最佳答案

您可以创建一个幼稚的用户定义的聚合功能(UDAF),该功能可以组合地图,然后将其用作聚合功能。由于您没有定义如何在地图中为两个相同的键合并两个值,因此我将假设键是唯一的,即对于每个userIddate,在两个不同的记录中都不会出现键:

/***
  * UDAF combining maps, overriding any duplicate key with "latest" value
  * @param keyType DataType of Map key
  * @param valueType DataType of Value key
  * @tparam K key type
  * @tparam V value type
  */
class CombineMaps[K, V](keyType: DataType, valueType: DataType) extends UserDefinedAggregateFunction {
  override def inputSchema: StructType = new StructType().add("map", dataType)
  override def bufferSchema: StructType = inputSchema
  override def dataType: DataType = MapType(keyType, valueType)
  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0 , Map[K, V]())

  // naive implementation - assuming keys won't repeat, otherwise later value for key overrides earlier one
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val before = buffer.getAs[Map[K, V]](0)
    val toAdd = input.getAs[Map[K, V]](0)
    val result = before ++ toAdd
    buffer.update(0, result)
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = update(buffer1, buffer2)

  override def evaluate(buffer: Row): Any = buffer.getAs[Map[String, Int]](0)
}

// instantiate a CombineMaps with the relevant types:
val combineMaps = new CombineMaps[String, Double](StringType, DoubleType)

// groupBy and aggregate
val result = input.groupBy("userId", "date").agg(combineMaps(col("generation")))

10-01 09:12