def handleBias(df: DataFrame, colName: String, target: String = target) = {val w1 = Window.partitionBy(colName)val w2 = Window.partitionBy(colName, target)df.withColumn("cnt_group", count("*").over(w2)).withColumn("pre2_" + colName, mean(target).over(w1)).withColumn("pre_" + colName, 合并(min(col("cnt_group")/col("cnt_foo_eq_1")).over(w1), lit(0D))).drop("cnt_group")}
这可以很好地编写,如上图所示的 spark-SQL 和 for 循环.但是,这会导致很多混乱(
应用缓存后,DAG 会简单一点缓存handleBiasOriginal("col1", df)....
除了窗口函数之外,您还看到哪些优化 SQL 的可能性?如果 SQL 是动态生成的,那最好不过了.
为简单起见,我假设 target
始终是二进制 ({0, 1}) 并且您使用的所有剩余列都是 StringType
import org.apache.spark.sql.functions._val 爆炸 = 爆炸(数组((columnsToDrop ++ columnsToCode).map(c =>struct(lit(c).alias("k"), col(c).alias("v"))): _*)).alias("级别")val long = df.select(exploded, $"TARGET")
,重塑并收集:import org.apache.spark.util.StatCounterval 查找 = long.as[((String, String), Int)].rdd//您可以使用前缀分区器(仅依赖于 _._1 的分区器)//避免重新洗牌 groupByKey.aggregateByKey(StatCounter())(_合并_,_合并_).map { case ((c, v), s) =>(c, (v, s)) }.groupByKey.mapValues(_.toMap).collectAsMap
org.apache.spark.util.StatCounter =(计数:3,平均值:0.666667,标准差:0.471405,最大值:1.000000,最小值:0.000000)
假设,此信息是完整的(您将获得两个类别的计数/分数).您可以使用这样的查找来生成 SQL 表达式或将其传递给
DataFrame API
- 将数据转换为 RDD API 的 long.
val stats = long.groupBy($"level.k", $"level.v").agg(mean($"TARGET"), sum($"TARGET"))
根据您的偏好,您可以重新调整它以启用高效连接或转换为本地集合,类似于 RDD 解决方案.
A function should be executed for multiple columns in a data frame
def handleBias(df: DataFrame, colName: String, target: String = target) = {
val w1 = Window.partitionBy(colName)
val w2 = Window.partitionBy(colName, target)
df.withColumn("cnt_group", count("*").over(w2))
.withColumn("pre2_" + colName, mean(target).over(w1))
.withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
This can be written nicely as shown above in spark-SQL and a for loop. However this is causing a lot of shuffles (spark apply function to columns in parallel).
A minimal example:
val df = Seq(
(0, "A", "B", "C", "D"),
(1, "A", "B", "C", "D"),
(0, "d", "a", "jkl", "d"),
(0, "d", "g", "C", "D"),
(1, "A", "d", "t", "k"),
(1, "d", "c", "C", "D"),
(1, "c", "B", "C", "D")
).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")
val columnsToDrop = Seq("col3TooMany")
val columnsToCode = Seq("col1", "col2")
val target = "TARGET"
val targetCounts = df.filter(df(target) === 1).groupBy(target)
val newDF = df.join(broadcast(targetCounts), Seq(target), "left")
val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(newDF) {
(currentDF, colName) => handleBias(currentDF, colName)
result.drop(columnsToDrop: _*).show
How can I formulate this more efficient using RDD API? aggregateByKey
should be a good idea but is still not very clear to me how to apply it here to substitute the window functions.
(provides a bit more context / bigger example https://github.com/geoHeil/sparkContrastCoding)
Initially, I started with Spark dynamic DAG is a lot slower and different from hard coded DAG which is shown below. The good thing is, each column seems to run independent /parallel. The downside is that the joins (even for a small dataset of 300 MB) get "too big" and lead to an unresponsive spark.
handleBiasOriginal("col1", df)
.join(handleBiasOriginal("col2", df), df.columns)
.join(handleBiasOriginal("col3TooMany", df), df.columns)
.drop(columnsToDrop: _*).show
def handleBiasOriginal(col: String, df: DataFrame, target: String = target): DataFrame = {
val pre1_1 = df
.filter(df(target) === 1)
.groupBy(col, target)
.agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
val pre2_1 = df
.agg(mean(target).alias("pre2_" + col))
.join(pre1_1, Seq(col), "left")
.join(pre2_1, Seq(col), "left")
This image is with spark 2.1.0, the images from Spark dynamic DAG is a lot slower and different from hard coded DAG are with 2.0.2
The DAG will be a bit simpler when caching is applied df.cache handleBiasOriginal("col1", df). ...
What other possibilities than window functions do you see to optimize the SQL?At best it would be great if the SQL was generated dynamically.
The main point here is to avoid unnecessary shuffles. Right now your code shuffles twice for each column you want to include and the resulting data layout cannot be reused between columns.
For simplicity I assume that target
is always binary ({0, 1}) and all remaining columns you use are of StringType
. Furthermore I assume that the cardinality of the columns is low enough for the results to be grouped and handled locally. You can adjust these methods to handle other cases but it requires more work.
Reshape data from wide to long:
import org.apache.spark.sql.functions._ val exploded = explode(array( (columnsToDrop ++ columnsToCode).map(c => struct(lit(c).alias("k"), col(c).alias("v"))): _* )).alias("level") val long = df.select(exploded, $"TARGET")
, reshape and collect:import org.apache.spark.util.StatCounter val lookup = long.as[((String, String), Int)].rdd // You can use prefix partitioner (one that depends only on _._1) // to avoid reshuffling for groupByKey .aggregateByKey(StatCounter())(_ merge _, _ merge _) .map { case ((c, v), s) => (c, (v, s)) } .groupByKey .mapValues(_.toMap) .collectAsMap
You can use
to get statistics for individual columns and levels. For example:lookup("col1")("A")
org.apache.spark.util.StatCounter = (count: 3, mean: 0.666667, stdev: 0.471405, max: 1.000000, min: 0.000000)
Gives you data for
, levelA
. Based on the binaryTARGET
assumption this information is complete (you get count / fractions for both classes).You can use lookup like this to generate SQL expressions or pass it to
and apply it on individual columns.
DataFrame API
- Convert data to long as for RDD API.
Compute aggregates based on levels:
val stats = long .groupBy($"level.k", $"level.v") .agg(mean($"TARGET"), sum($"TARGET"))
Depending on your preferences you can reshape this to enable efficient joins or convert to a local collection and similarly to the RDD solution.
这篇关于Spark 将 sql 窗口函数迁移到 RDD 以获得更好的性能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!