本文介绍了Spark/Scala在多个列上使用相同的函数重复调用withColumn()的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前有一些代码,其中我通过多个.withColumn链重复地将同一过程应用于多个DataFrame列,并且想要创建一个简化过程的函数.就我而言,我发现了键汇总的列上的累积总和:

I currently have code in which I repeatedly apply the same procedure to multiple DataFrame Columns via multiple chains of .withColumn, and am wanting to create a function to streamline the procedure. In my case, I am finding cumulative sums over columns aggregated by keys:

val newDF = oldDF
  .withColumn("cumA", sum("A").over(Window.partitionBy("ID").orderBy("time")))
  .withColumn("cumB", sum("B").over(Window.partitionBy("ID").orderBy("time")))
  .withColumn("cumC", sum("C").over(Window.partitionBy("ID").orderBy("time")))
  //.withColumn(...)

我想要的是类似的东西:

What I would like is either something like:

def createCumulativeColums(cols: Array[String], df: DataFrame): DataFrame = {
  // Implement the above cumulative sums, partitioning, and ordering
}

或更妙的是:

def withColumns(cols: Array[String], df: DataFrame, f: function): DataFrame = {
  // Implement a udf/arbitrary function on all the specified columns
}

推荐答案

您可以将select与包含*的变量一起使用:

You can use select with varargs including *:

import spark.implicits._

df.select($"*" +: Seq("A", "B", "C").map(c => 
  sum(c).over(Window.partitionBy("ID").orderBy("time")).alias(s"cum$c")
): _*)

此:

  • 使用Seq("A", ...).map(...)
  • 将列名称映射到窗口表达式
  • 在所有现有列之前添加$"*" +: ....
  • ... : _*解压缩组合序列.
  • Maps columns names to window expressions with Seq("A", ...).map(...)
  • Prepends all pre-existing columns with $"*" +: ....
  • Unpacks combined sequence with ... : _*.

,可以概括为:

import org.apache.spark.sql.{Column, DataFrame}

/**
 * @param cols a sequence of columns to transform
 * @param df an input DataFrame
 * @param f a function to be applied on each col in cols
 */
def withColumns(cols: Seq[String], df: DataFrame, f: String => Column) =
  df.select($"*" +: cols.map(c => f(c)): _*)

如果您发现withColumn语法更易读,则可以使用foldLeft:

If you find withColumn syntax more readable you can use foldLeft:

Seq("A", "B", "C").foldLeft(df)((df, c) =>
  df.withColumn(s"cum$c",  sum(c).over(Window.partitionBy("ID").orderBy("time")))
)

例如可以概括为:

/**
 * @param cols a sequence of columns to transform
 * @param df an input DataFrame
 * @param f a function to be applied on each col in cols
 * @param name a function mapping from input to output name.
 */
def withColumns(cols: Seq[String], df: DataFrame, 
    f: String =>  Column, name: String => String = identity) =
  cols.foldLeft(df)((df, c) => df.withColumn(name(c), f(c)))

这篇关于Spark/Scala在多个列上使用相同的函数重复调用withColumn()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-23 10:23