问题描述
在最近的 SO-post ,我发现使用 withColumn
可以在结合不同的Windows规范处理堆叠/链列表达式时改善DAG.但是,在此示例中, withColumn
实际上使DAG变得更糟,并且不同于使用 select
的结果.
In a recent SO-post, I discovered that using withColumn
may improve the DAG when dealing with stacked/chain column expressions in conjunction with distinct windows specifications. However, in this example, withColumn
actually makes the DAG worse and differs to the outcome of using select
instead.
首先,介绍一些测试数据(PySpark 2.4.4独立版):
First, some test data (PySpark 2.4.4 standalone):
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
dfp = pd.DataFrame(
{
"col1": np.random.randint(0, 5, size=100),
"col2": np.random.randint(0, 5, size=100),
"col3": np.random.randint(0, 5, size=100),
"col4": np.random.randint(0, 5, size=100),
"col5": np.random.randint(0, 5, size=100),
}
)
df = spark.createDataFrame(dfp)
df.show(5)
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
| 0| 3| 2| 2| 2|
| 1| 3| 3| 2| 4|
| 0| 0| 3| 3| 2|
| 3| 0| 1| 4| 4|
| 4| 0| 3| 3| 3|
+----+----+----+----+----+
only showing top 5 rows
这个例子很简单.在其中包含2个窗口规范和基于它们的4个独立列表达式:
The example is simple. In contains 2 window specifications and 4 independent column expressions based on them:
w1 = Window.partitionBy("col1").orderBy("col2")
w2 = Window.partitionBy("col3").orderBy("col4")
col_w1_1 = F.max("col5").over(w1).alias("col_w1_1")
col_w1_2 = F.sum("col5").over(w1).alias("col_w1_2")
col_w2_1 = F.max("col5").over(w2).alias("col_w2_1")
col_w2_2 = F.sum("col5").over(w2).alias("col_w2_2")
expr = [col_w1_1, col_w1_2, col_w2_1, col_w2_2]
withColumn-随机播放4次
如果 withColumn
与交替的窗口规范一起使用,则DAG会创建不必要的混洗:
withColumn - 4 shuffles
If withColumn
is used with alternating window specs, the DAG creates unnecessary shuffles:
df.withColumn("col_w1_1", col_w1_1)\
.withColumn("col_w2_1", col_w2_1)\
.withColumn("col_w1_2", col_w1_2)\
.withColumn("col_w2_2", col_w2_2)\
.explain()
== Physical Plan ==
Window [sum(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_2#147L], [col3#90L], [col4#91L ASC NULLS FIRST]
+- *(4) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col3#90L, 200)
+- Window [sum(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_2#143L], [col1#88L], [col2#89L ASC NULLS FIRST]
+- *(3) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col1#88L, 200)
+- Window [max(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_1#145L], [col3#90L], [col4#91L ASC NULLS FIRST]
+- *(2) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col3#90L, 200)
+- Window [max(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_1#141L], [col1#88L], [col2#89L ASC NULLS FIRST]
+- *(1) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col1#88L, 200)
+- Scan ExistingRDD[col1#88L,col2#89L,col3#90L,col4#91L,col5#92L]
选择-2个混洗
如果所有列都通过 select
传递,则DAG是正确的.
select - 2 shuffles
If all columns are passed with select
, the DAG is correct.
df.select("*", *expr).explain()
== Physical Plan ==
Window [max(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_1#119L, sum(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_2#121L], [col3#90L], [col4#91L ASC NULLS FIRST]
+- *(2) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col3#90L, 200)
+- Window [max(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_1#115L, sum(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_2#117L], [col1#88L], [col2#89L ASC NULLS FIRST]
+- *(1) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col1#88L, 200)
+- Scan ExistingRDD[col1#88L,col2#89L,col3#90L,col4#91L,col5#92L]
问题
关于为什么应该避免使用 withColumn
的一些现有信息,但是它们主要涉及多次调用 withColumn
的问题,并且没有解决偏离问题.DAG(请参见此处和此处).有谁知道为什么DAG在 withColumn
和 select
之间不同?Spark的优化算法在任何情况下都应适用,并且不应依赖于表达完全相同的事物的不同方法.
Question
There is some existing information about why one should avoid withColumn
, however they are mainly concerned with calling withColumn
a lot of times and they do not address the issue of deviating DAGs (see here and here). Does anyone have an idea why the DAG differs between withColumn
and select
? Spark's optimization algorithms should apply in any case and should not be dependent on different ways to express the exact same thing.
谢谢.
推荐答案
何时使用嵌套的withColumns和窗口函数?
when using nested withColumns and window functions?
让我说:
w1 = ...rangeBetween(-300, 0)
w2 = ...rowsBetween(-1,0)
(df.withColumn("some1", col(f.max("original1").over(w1))
.withColumn("some2", lag("some1")).over(w2)).show()
即使使用非常小的数据集,我仍然遇到很多内存问题和大量溢出.如果我使用select而不是withColumn进行相同操作,则执行速度会更快.
I got a lot of memory problems and high spill even with very small datasets. If I do the same using select instead of withColumn it performs way faster.
df.select(
f.max(col("original1")).over(w1).alias("some1"),
f.lag("some1")).over(w2)
).show()
这篇关于Spark DAG的"withColumn"与"select"不同的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!