我有此数据框,其中包含3列-> userId,日期,生成
+-------+--------+----------------------------------------------------------------------------+
|userId | date |generation |
+-------+--------+----------------------------------------------------------------------------+
|1 |20160926|Map("screen_WiFi" -> 15.127, "upload_WiFi" -> 0.603, "total_WiFi" -> 19.551)|
|1 |20160926|Map("screen_2g" -> 0.573, "upload_2g" -> 0.466, "total_2g" -> 1.419) |
|1 |20160926|Map("screen_3g" -> 10.084, "upload_3g" -> 80.515, "total_3g" -> 175.435) |
+-------+--------+----------------------------------------------------------------------------+
我想根据用户ID和日期对这些值进行分组
但是问题出在第三列,其中包含maptype的值,并且要求将所有maptype值合并到一列中,最终输出应如下所示->
+-------+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId |date |generation |
+-------+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1 |20160926|Map("screen_WiFi" -> 15.127, "upload_WiFi" -> 0.603, "total_WiFi" -> 19.551,"screen_2g" -> 0.573, "upload_2g" -> 0.466, "total_2g" -> 1.419, "screen_3g" -> 10.084, "upload_3g" -> 80.515, "total_3g" -> 175.435)|
+-------+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
有什么方法可以解决此问题,或有任何可能的解决方法?
最佳答案
您可以创建一个幼稚的用户定义的聚合功能(UDAF),该功能可以组合地图,然后将其用作聚合功能。由于您没有定义如何在地图中为两个相同的键合并两个值,因此我将假设键是唯一的,即对于每个userId
和date
,在两个不同的记录中都不会出现键:
/***
* UDAF combining maps, overriding any duplicate key with "latest" value
* @param keyType DataType of Map key
* @param valueType DataType of Value key
* @tparam K key type
* @tparam V value type
*/
class CombineMaps[K, V](keyType: DataType, valueType: DataType) extends UserDefinedAggregateFunction {
override def inputSchema: StructType = new StructType().add("map", dataType)
override def bufferSchema: StructType = inputSchema
override def dataType: DataType = MapType(keyType, valueType)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0 , Map[K, V]())
// naive implementation - assuming keys won't repeat, otherwise later value for key overrides earlier one
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val before = buffer.getAs[Map[K, V]](0)
val toAdd = input.getAs[Map[K, V]](0)
val result = before ++ toAdd
buffer.update(0, result)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = update(buffer1, buffer2)
override def evaluate(buffer: Row): Any = buffer.getAs[Map[String, Int]](0)
}
// instantiate a CombineMaps with the relevant types:
val combineMaps = new CombineMaps[String, Double](StringType, DoubleType)
// groupBy and aggregate
val result = input.groupBy("userId", "date").agg(combineMaps(col("generation")))