本文介绍了Scala-Spark用参数值动态调用groupby和agg的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想编写一个自定义分组和聚合函数来获取用户指定的列名和用户指定的聚合图。我不知道列名和聚合映射图。我想写一个类似于下面的东西的函数。但我是新来的Scala,我无法解决它。

  def groupAndAggregate(df:DataFrame,aggregateFun:Map [String,String ],cols:List [String]):DataFrame = {
val grouped = df.groupBy(cols)
val aggregated = grouped.agg(aggregateFun)
aggregated.show()

$ / code>

并且想把它称为

  val listOfStrings =列表(A,B,C)
val结果= groupAndAggregate(df,Map(D - >SUM ,E - >COUNT),listOfStrings)

我该怎么做?
任何人都可以帮助我。

解决方案

您的代码几乎正确 - 有两个问题:

  • 函数的返回类型是 DataFrame ,但最后一行是 aggregated.show() code>,它返回 Unit 。删除对 show 的调用,以返回聚合本身,或者仅返回 agg 立即


  • DataFrame.groupBy 需要参数如下: col1:String,cols:String * - 所以你需要传递匹配的参数:第一列,然后其余的列作为参数列表,你可以这样做: df.groupBy(cols.head,cols.tail:_ *)


  • 总而言之,你的函数是:

    pre $ def $ groupAndAggregate(df:DataFrame,aggregateFun:Map [String,String ],cols:List [String]):DataFrame = {
    val groupped = df.groupBy(cols.head,cols.tail:_ *)
    val aggregated = grouped.agg(aggregateFun)






    $ b

    或者,类似的较短版本:

      def groupAndAggregate(df:DataFrame,aggregateFun:Map [String,String],cols:List [String]):DataFrame = {
    df.groupBy(cols.head,cols.tail:_ *)。agg(aggregateFun)
    }

    如果您想在您的功能中调用 show

      def groupAndAggregate(df:DataFrame,aggregateFun:Map [String,String],cols:List [String]):DataFrame = {
    val groupped = df.groupBy(cols.head ,cols.tail:_ *)
    val aggregated = grouped.agg(aggregateFun)
    aggregated.show()
    汇总
    }


    I want to write a custom grouping and aggregate function to get user specified column names and user specified aggregation map.I do not know the column names and aggregation map up front. I want to write a function similar to something like below. But i am new to Scala and i cannot solve it.

    def groupAndAggregate(df: DataFrame,  aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
      val grouped = df.groupBy(cols)
      val aggregated = grouped.agg(aggregateFun)
      aggregated.show()
    }
    

    and want to call it like

    val listOfStrings =  List("A", "B", "C")
    val result = groupAndAggregate(df, Map("D"-> "SUM", "E"-> "COUNT"), listOfStrings)
    

    How can i do this?Can anyone help me please.

    解决方案

    Your code is almost correct - with two issues:

    1. The return type of your function is DataFrame, but the last line is aggregated.show(), which returns Unit. Remove the call to show to return aggregated itself, or just return the result of agg immediately

    2. DataFrame.groupBy expects arguments as follows: col1: String, cols: String* - so you need to pass matching arguments: the first columns, and then the rest of the columns as a list of arguments, you can do that as follows: df.groupBy(cols.head, cols.tail: _*)

    Altogether, your function would be:

    def groupAndAggregate(df: DataFrame,  aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
      val grouped = df.groupBy(cols.head, cols.tail: _*)
      val aggregated = grouped.agg(aggregateFun)
      aggregated
    }
    

    Or, a similar shorter version:

    def groupAndAggregate(df: DataFrame,  aggregateFun: Map[String, String], cols: List[String] ): DataFrame = {
      df.groupBy(cols.head, cols.tail: _*).agg(aggregateFun)
    }
    

    If you do want to call show within your function:

    def groupAndAggregate(df: DataFrame,  aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
      val grouped = df.groupBy(cols.head, cols.tail: _*)
      val aggregated = grouped.agg(aggregateFun)
      aggregated.show()
      aggregated
    }
    

    这篇关于Scala-Spark用参数值动态调用groupby和agg的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

    09-01 23:16