给定一个 DF,假设我有 3 个类,每个类都有一个 addCol 方法,该方法将使用 DF 中的列创建新列并将其附加到 DF(基于不同的计算).

Given a DF, let's say I have 3 classes each with a method addCol that will use the columns in the DF to create and append a new column to the DF (based on different calculations).

获得包含原始 df A 和 3 个添加列的结果 df 的最佳方法是什么?

What is the best way to get a resulting df that will contain the original df A and the 3 added columns?

val df = Seq((1, 2), (2,5), (3, 7)).toDF("num1", "num2")

def addCol(df: DataFrame): DataFrame = {
    df.withColumn("method1", col("num1")/col("num2"))
def addCol(df: DataFrame): DataFrame = {
    df.withColumn("method2", col("num1")*col("num2"))
def addCol(df: DataFrame): DataFrame = {
    df.withColumn("method3", col("num1")+col("num2"))

一种选择是 actions.foldLeft(df) { (df, action) =>action.addCol(df))}.最终结果是我想要的 DF —— 包含 num1num2method1method2 和 .但是根据我的理解,这不会使用分布式评估,并且每个 addCol 将按顺序发生.什么是更有效的方法?

One option is actions.foldLeft(df) { (df, action) => action.addCol(df))}. The end result is the DF I want -- with columns num1, num2, method1, method2, and method3. But from my understanding this will not make use of distributed evaluation, and each addCol will happen sequentially. What is the more efficient way to do this?


有效的方法是使用 select.

selectfoldLeft 快,如果你有非常大的数据 - 查看这篇文章

select is faster than the foldLeft if you have very huge data - Check this post

您可以构建所需的表达式 &在 select 中使用它,检查下面的代码.

You can build required expressions & use that inside select, check below code.

scala> df.show(false)
|1   |2   |
|2   |5   |
|3   |7   |
scala> val colExpr = Seq(
                          ($"num1" * $"num2").as("method2"),
                          ($"num1" + $"num2").as("method3")


scala> df.select(colExpr:_*).show(false)
|num1|num2|method1            |method2|method3|
|1   |2   |0.5                |2      |3      |
|2   |5   |0.4                |10     |7      |
|3   |7   |0.42857142857142855|21     |10     |


返回 Column 而不是 DataFrame.尝试使用高阶函数,您的所有三个函数都可以替换为以下一个函数.

Return Column instead of DataFrame. Try using higher order functions, Your all three function can be replaced with below one function.

scala> def add(
               num1:Column, // May be you can try to use variable args here if you want.
               f: (Column,Column) => Column
             ): Column = f(num1,num2)

例如,varargs &调用此方法时,您需要在最后传递所需的列.

For Example, varargs & while invoking this method you need to pass required columns at the end.

def add(f: (Column,Column) => Column,cols:Column*): Column = cols.reduce(f)


scala> val colExpr = Seq(
    add($"num1",$"num2",(_ / _)).as("method1"),
    add($"num1", $"num2",(_ * _)).as("method2"),
    add($"num1", $"num2",(_ + _)).as("method3")


scala> df.select(colExpr:_*).show(false)
|num1|num2|method1            |method2|method3|
|1   |2   |0.5                |2      |3      |
|2   |5   |0.4                |10     |7      |
|3   |7   |0.42857142857142855|21     |10     |

