问题描述
我想编写一个自定义分组和聚合函数来获取用户指定的列名和用户指定的聚合映射.我不知道前面的列名和聚合映射.我想写一个类似于下面的函数.但我是 Scala 的新手,我无法解决它.
I want to write a custom grouping and aggregate function to get user specified column names and user specified aggregation map.I do not know the column names and aggregation map up front. I want to write a function similar to something like below. But i am new to Scala and i cannot solve it.
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
val grouped = df.groupBy(cols)
val aggregated = grouped.agg(aggregateFun)
aggregated.show()
}
并想像
val listOfStrings = List("A", "B", "C")
val result = groupAndAggregate(df, Map("D"-> "SUM", "E"-> "COUNT"), listOfStrings)
我该怎么做?任何人都可以请帮助我.
How can i do this?Can anyone help me please.
推荐答案
您的代码几乎正确 - 有两个问题:
Your code is almost correct - with two issues:
你的函数的返回类型是
DataFrame
,但最后一行是aggregated.show()
,它返回Unit
.去掉对show
的调用,返回aggregated
本身,或者直接返回agg
的结果
The return type of your function is
DataFrame
, but the last line isaggregated.show()
, which returnsUnit
. Remove the call toshow
to returnaggregated
itself, or just return the result ofagg
immediately
DataFrame.groupBy
期望参数如下:col1: String, cols: String*
- 所以你需要传递匹配的参数:第一列,和然后将其余列作为参数列表,您可以按如下方式执行:df.groupBy(cols.head, cols.tail: _*)
DataFrame.groupBy
expects arguments as follows: col1: String, cols: String*
- so you need to pass matching arguments: the first columns, and then the rest of the columns as a list of arguments, you can do that as follows: df.groupBy(cols.head, cols.tail: _*)
总而言之,您的功能是:
Altogether, your function would be:
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
val grouped = df.groupBy(cols.head, cols.tail: _*)
val aggregated = grouped.agg(aggregateFun)
aggregated
}
或者,类似的较短版本:
Or, a similar shorter version:
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame = {
df.groupBy(cols.head, cols.tail: _*).agg(aggregateFun)
}
如果你确实想在你的函数中调用show
:
If you do want to call show
within your function:
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
val grouped = df.groupBy(cols.head, cols.tail: _*)
val aggregated = grouped.agg(aggregateFun)
aggregated.show()
aggregated
}
这篇关于Scala-Spark 使用参数值动态调用 groupby 和 agg的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!