问题描述
我有一个Spark查询,该查询从S3读取很多镶木地板数据,对其进行过滤,并添加一个计算为regexp_extract(input_file_name, ...)
的列,我认为这是一个相对繁重的操作(如果在过滤之前而不是之后应用)./p>
整个查询如下:
val df = spark
.read
.option("mergeSchema", "true")
.parquet("s3://bucket/path/date=2020-01-1{5,6}/clientType=EXTENSION_CHROME/type={ACCEPT,IGNORE*}/")
.where(...)
.withColumn("type", regexp_extract(input_file_name, "type=([^/]+)", 1))
.repartition(300)
.cache()
df.count()
withColumn是在何处或何处执行的??是否取决于我编写它们的顺序?如果我的where语句使用withColumn添加的列怎么办?
withColumn
和filter
按照它们被调用的顺序执行.该计划对此进行了说明.请自下而上阅读计划.
val employees = spark.createDataFrame(Seq(("E1",100.0), ("E2",200.0),("E3",300.0))).toDF("employee","salary")
employees.withColumn("column1", when(col("salary") > 200, lit("rich")).otherwise("poor")).filter(col("column1")==="poor").explain(true)
计划-项目首先发生,然后进行过滤.
== Parsed Logical Plan ==
'Filter ('column1 = poor)
+- Project [employee#4, salary#5, CASE WHEN (salary#5 > cast(200 as double)) THEN rich ELSE poor END AS column1#8]
+- Project [_1#0 AS employee#4, _2#1 AS salary#5]
+- LocalRelation [_1#0, _2#1]
== Analyzed Logical Plan ==
employee: string, salary: double, column1: string
Filter (column1#8 = poor)
+- Project [employee#4, salary#5, CASE WHEN (salary#5 > cast(200 as double)) THEN rich ELSE poor END AS column1#8]
+- Project [_1#0 AS employee#4, _2#1 AS salary#5]
+- LocalRelation [_1#0, _2#1]
代码1st过滤器然后添加新列
employees.filter(col("employee")==="E1").withColumn("column1", when(col("salary") > 200, lit("rich")).otherwise("poor")).explain(true)
计划-然后第一个过滤器进行投影
== Parsed Logical Plan ==
'Project [employee#4, salary#5, CASE WHEN ('salary > 200) THEN rich ELSE poor END AS column1#13]
+- Filter (employee#4 = E1)
+- Project [_1#0 AS employee#4, _2#1 AS salary#5]
+- LocalRelation [_1#0, _2#1]
== Analyzed Logical Plan ==
employee: string, salary: double, column1: string
Project [employee#4, salary#5, CASE WHEN (salary#5 > cast(200 as double)) THEN rich ELSE poor END AS column1#13]
+- Filter (employee#4 = E1)
+- Project [_1#0 AS employee#4, _2#1 AS salary#5]
+- LocalRelation [_1#0, _2#1]
另一个证据-在添加过滤器之前(显然)在列上调用过滤器时,它会给出错误消息
employees.filter(col("column1")==="poor").withColumn("column1", when(col("salary") > 200, lit("rich")).otherwise("poor")).show()
org.apache.spark.sql.AnalysisException: cannot resolve '`column1`' given input columns: [employee, salary];;
'Filter ('column1 = poor)
+- Project [_1#0 AS employee#4, _2#1 AS salary#5]
+- LocalRelation [_1#0, _2#1]
I have a Spark query which reads a lot of parquet data from S3, filters it, and adds a column computed as regexp_extract(input_file_name, ...)
which I assume is a relatively heavy operation (if applied before filtering rather than after it).
The whole query looks like this:
val df = spark
.read
.option("mergeSchema", "true")
.parquet("s3://bucket/path/date=2020-01-1{5,6}/clientType=EXTENSION_CHROME/type={ACCEPT,IGNORE*}/")
.where(...)
.withColumn("type", regexp_extract(input_file_name, "type=([^/]+)", 1))
.repartition(300)
.cache()
df.count()
Is withColumn executed after where or before where? Does it depend on the order in which I write them? What if my where statement used a column added by withColumn?
The withColumn
and filter
execute in the order they are called. The plan explains it. Please read the plan bottom up.
val employees = spark.createDataFrame(Seq(("E1",100.0), ("E2",200.0),("E3",300.0))).toDF("employee","salary")
employees.withColumn("column1", when(col("salary") > 200, lit("rich")).otherwise("poor")).filter(col("column1")==="poor").explain(true)
Plan - project happened 1st then filter.
== Parsed Logical Plan ==
'Filter ('column1 = poor)
+- Project [employee#4, salary#5, CASE WHEN (salary#5 > cast(200 as double)) THEN rich ELSE poor END AS column1#8]
+- Project [_1#0 AS employee#4, _2#1 AS salary#5]
+- LocalRelation [_1#0, _2#1]
== Analyzed Logical Plan ==
employee: string, salary: double, column1: string
Filter (column1#8 = poor)
+- Project [employee#4, salary#5, CASE WHEN (salary#5 > cast(200 as double)) THEN rich ELSE poor END AS column1#8]
+- Project [_1#0 AS employee#4, _2#1 AS salary#5]
+- LocalRelation [_1#0, _2#1]
Code 1st filters then adds new column
employees.filter(col("employee")==="E1").withColumn("column1", when(col("salary") > 200, lit("rich")).otherwise("poor")).explain(true)
Plan - 1st filters then projects
== Parsed Logical Plan ==
'Project [employee#4, salary#5, CASE WHEN ('salary > 200) THEN rich ELSE poor END AS column1#13]
+- Filter (employee#4 = E1)
+- Project [_1#0 AS employee#4, _2#1 AS salary#5]
+- LocalRelation [_1#0, _2#1]
== Analyzed Logical Plan ==
employee: string, salary: double, column1: string
Project [employee#4, salary#5, CASE WHEN (salary#5 > cast(200 as double)) THEN rich ELSE poor END AS column1#13]
+- Filter (employee#4 = E1)
+- Project [_1#0 AS employee#4, _2#1 AS salary#5]
+- LocalRelation [_1#0, _2#1]
Another evidence - it gives error when filter is called on a column before adding it (obviously)
employees.filter(col("column1")==="poor").withColumn("column1", when(col("salary") > 200, lit("rich")).otherwise("poor")).show()
org.apache.spark.sql.AnalysisException: cannot resolve '`column1`' given input columns: [employee, salary];;
'Filter ('column1 = poor)
+- Project [_1#0 AS employee#4, _2#1 AS salary#5]
+- LocalRelation [_1#0, _2#1]
这篇关于Spark withColumn及其执行顺序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!