本文介绍了Spark DAG的"withColumn"与"select"不同的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在最近的 SO-post ,我发现使用 withColumn 可以在结合不同的Windows规范处理堆叠/链列表达式时改善DAG.但是,在此示例中, withColumn 实际上使DAG变得更糟,并且不同于使用 select 的结果.

In a recent SO-post, I discovered that using withColumn may improve the DAG when dealing with stacked/chain column expressions in conjunction with distinct windows specifications. However, in this example, withColumn actually makes the DAG worse and differs to the outcome of using select instead.

首先,介绍一些测试数据(PySpark 2.4.4独立版):

First, some test data (PySpark 2.4.4 standalone):

import pandas as pd
import numpy as np

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

dfp = pd.DataFrame(
    {
        "col1": np.random.randint(0, 5, size=100),
        "col2": np.random.randint(0, 5, size=100),
        "col3": np.random.randint(0, 5, size=100),
        "col4": np.random.randint(0, 5, size=100),      
        "col5": np.random.randint(0, 5, size=100),        

    }
)

df = spark.createDataFrame(dfp)
df.show(5)

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|   0|   3|   2|   2|   2|
|   1|   3|   3|   2|   4|
|   0|   0|   3|   3|   2|
|   3|   0|   1|   4|   4|
|   4|   0|   3|   3|   3|
+----+----+----+----+----+
only showing top 5 rows

这个例子很简单.在其中包含2个窗口规范和基于它们的4个独立列表达式:

The example is simple. In contains 2 window specifications and 4 independent column expressions based on them:

w1 = Window.partitionBy("col1").orderBy("col2")
w2 = Window.partitionBy("col3").orderBy("col4")

col_w1_1 = F.max("col5").over(w1).alias("col_w1_1")
col_w1_2 = F.sum("col5").over(w1).alias("col_w1_2")
col_w2_1 = F.max("col5").over(w2).alias("col_w2_1")
col_w2_2 = F.sum("col5").over(w2).alias("col_w2_2")

expr = [col_w1_1, col_w1_2, col_w2_1, col_w2_2]

withColumn-随机播放4次

如果 withColumn 与交替的窗口规范一起使用,则DAG会创建不必要的混洗:

withColumn - 4 shuffles

If withColumn is used with alternating window specs, the DAG creates unnecessary shuffles:

df.withColumn("col_w1_1", col_w1_1)\
  .withColumn("col_w2_1", col_w2_1)\
  .withColumn("col_w1_2", col_w1_2)\
  .withColumn("col_w2_2", col_w2_2)\
  .explain()

== Physical Plan ==
Window [sum(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_2#147L], [col3#90L], [col4#91L ASC NULLS FIRST]
+- *(4) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(col3#90L, 200)
      +- Window [sum(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_2#143L], [col1#88L], [col2#89L ASC NULLS FIRST]
         +- *(3) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(col1#88L, 200)
               +- Window [max(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_1#145L], [col3#90L], [col4#91L ASC NULLS FIRST]
                  +- *(2) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
                     +- Exchange hashpartitioning(col3#90L, 200)
                        +- Window [max(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_1#141L], [col1#88L], [col2#89L ASC NULLS FIRST]
                           +- *(1) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
                              +- Exchange hashpartitioning(col1#88L, 200)
                                 +- Scan ExistingRDD[col1#88L,col2#89L,col3#90L,col4#91L,col5#92L]

选择-2个混洗

如果所有列都通过 select 传递,则DAG是正确的.

select - 2 shuffles

If all columns are passed with select, the DAG is correct.

df.select("*", *expr).explain()

== Physical Plan ==
Window [max(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_1#119L, sum(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_2#121L], [col3#90L], [col4#91L ASC NULLS FIRST]
+- *(2) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(col3#90L, 200)
      +- Window [max(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_1#115L, sum(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_2#117L], [col1#88L], [col2#89L ASC NULLS FIRST]
         +- *(1) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(col1#88L, 200)
               +- Scan ExistingRDD[col1#88L,col2#89L,col3#90L,col4#91L,col5#92L]

问题

关于为什么应该避免使用 withColumn 的一些现有信息,但是它们主要涉及多次调用 withColumn 的问题,并且没有解决偏离问题.DAG(请参见此处此处).有谁知道为什么DAG在 withColumn select 之间不同?Spark的优化算法在任何情况下都应适用,并且不应依赖于表达完全相同的事物的不同方法.

Question

There is some existing information about why one should avoid withColumn, however they are mainly concerned with calling withColumn a lot of times and they do not address the issue of deviating DAGs (see here and here). Does anyone have an idea why the DAG differs between withColumn and select? Spark's optimization algorithms should apply in any case and should not be dependent on different ways to express the exact same thing.

谢谢.

推荐答案

何时使用嵌套的withColumns和窗口函数?

when using nested withColumns and window functions?

让我说:

w1 = ...rangeBetween(-300, 0)
w2 = ...rowsBetween(-1,0)

(df.withColumn("some1", col(f.max("original1").over(w1))
   .withColumn("some2", lag("some1")).over(w2)).show()

即使使用非常小的数据集,我仍然遇到很多内存问题和大量溢出.如果我使用select而不是withColumn进行相同操作,则执行速度会更快.

I got a lot of memory problems and high spill even with very small datasets. If I do the same using select instead of withColumn it performs way faster.

df.select(
    f.max(col("original1")).over(w1).alias("some1"),
    f.lag("some1")).over(w2)
).show()

这篇关于Spark DAG的"withColumn"与"select"不同的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-23 14:42