问题描述
我想编写一个自定义分组和聚合函数来获取用户指定的列名和用户指定的聚合图。我不知道列名和聚合映射图。我想写一个类似于下面的东西的函数。但我是新来的Scala,我无法解决它。
def groupAndAggregate(df:DataFrame,aggregateFun:Map [String,String ],cols:List [String]):DataFrame = {
val grouped = df.groupBy(cols)
val aggregated = grouped.agg(aggregateFun)
aggregated.show()
$ / code>
并且想把它称为
val listOfStrings =列表(A,B,C)
val结果= groupAndAggregate(df,Map(D - >SUM ,E - >COUNT),listOfStrings)
我该怎么做?
任何人都可以帮助我。
您的代码几乎正确 - 有两个问题:
函数的返回类型是 DataFrame
,但最后一行是 aggregated.show() code>,它返回
Unit
。删除对 show
的调用,以返回聚合
本身,或者仅返回 agg
立即
DataFrame.groupBy
需要参数如下: col1:String,cols:String *
- 所以你需要传递匹配的参数:第一列,然后其余的列作为参数列表,你可以这样做: df.groupBy(cols.head,cols.tail:_ *)
总而言之,你的函数是:
pre $ def $ groupAndAggregate(df:DataFrame,aggregateFun:Map [String,String ],cols:List [String]):DataFrame = {
val groupped = df.groupBy(cols.head,cols.tail:_ *)
val aggregated = grouped.agg(aggregateFun)
$ b
或者,类似的较短版本:
def groupAndAggregate(df:DataFrame,aggregateFun:Map [String,String],cols:List [String]):DataFrame = {
df.groupBy(cols.head,cols.tail:_ *)。agg(aggregateFun)
}
如果您想在您的功能中调用 show
:
def groupAndAggregate(df:DataFrame,aggregateFun:Map [String,String],cols:List [String]):DataFrame = {
val groupped = df.groupBy(cols.head ,cols.tail:_ *)
val aggregated = grouped.agg(aggregateFun)
aggregated.show()
汇总
}
I want to write a custom grouping and aggregate function to get user specified column names and user specified aggregation map.I do not know the column names and aggregation map up front. I want to write a function similar to something like below. But i am new to Scala and i cannot solve it.
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
val grouped = df.groupBy(cols)
val aggregated = grouped.agg(aggregateFun)
aggregated.show()
}
and want to call it like
val listOfStrings = List("A", "B", "C")
val result = groupAndAggregate(df, Map("D"-> "SUM", "E"-> "COUNT"), listOfStrings)
How can i do this?Can anyone help me please.
Your code is almost correct - with two issues:
The return type of your function is
DataFrame
, but the last line isaggregated.show()
, which returnsUnit
. Remove the call toshow
to returnaggregated
itself, or just return the result ofagg
immediatelyDataFrame.groupBy
expects arguments as follows:col1: String, cols: String*
- so you need to pass matching arguments: the first columns, and then the rest of the columns as a list of arguments, you can do that as follows:df.groupBy(cols.head, cols.tail: _*)
Altogether, your function would be:
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
val grouped = df.groupBy(cols.head, cols.tail: _*)
val aggregated = grouped.agg(aggregateFun)
aggregated
}
Or, a similar shorter version:
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame = {
df.groupBy(cols.head, cols.tail: _*).agg(aggregateFun)
}
If you do want to call show
within your function:
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
val grouped = df.groupBy(cols.head, cols.tail: _*)
val aggregated = grouped.agg(aggregateFun)
aggregated.show()
aggregated
}
这篇关于Scala-Spark用参数值动态调用groupby和agg的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!