问题描述
Spark 将并行处理数据,但不会并行处理操作.在我的 DAG 中,我想为每列调用一个函数,例如Spark 并行处理列 可以独立于其他列计算每列的值.有没有办法通过 spark-SQL API 实现这种并行性?利用窗口函数 Spark 动态 DAG比硬编码的 DAG 慢得多,而且与硬编码的 DAG 不同 有助于优化 DAG,但只能以串行方式执行.
Spark will process the data in parallel, but not the operations. In my DAG I want to call a function per column likeSpark processing columns in parallel the values for each column could be calculated independently from other columns. Is there any way to achieve such parallelism via spark-SQL API? Utilizing window functions Spark dynamic DAG is a lot slower and different from hard coded DAG helped to optimize the DAG by a lot but only executes in a serial fashion.
可以找到包含更多信息的示例https://github.com/geoHeil/sparkContrastCoding
An example which contains a little bit more information can be found https://github.com/geoHeil/sparkContrastCoding
下面的最小示例:
val df = Seq(
(0, "A", "B", "C", "D"),
(1, "A", "B", "C", "D"),
(0, "d", "a", "jkl", "d"),
(0, "d", "g", "C", "D"),
(1, "A", "d", "t", "k"),
(1, "d", "c", "C", "D"),
(1, "c", "B", "C", "D")
).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")
val inputToDrop = Seq("col3TooMany")
val inputToBias = Seq("col1", "col2")
val targetCounts = df.filter(df("TARGET") === 1).groupBy("TARGET").agg(count("TARGET").as("cnt_foo_eq_1"))
val newDF = df.toDF.join(broadcast(targetCounts), Seq("TARGET"), "left")
newDF.cache
def handleBias(df: DataFrame, colName: String, target: String = target) = {
val w1 = Window.partitionBy(colName)
val w2 = Window.partitionBy(colName, target)
df.withColumn("cnt_group", count("*").over(w2))
.withColumn("pre2_" + colName, mean(target).over(w1))
.withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
.drop("cnt_group")
}
val joinUDF = udf((newColumn: String, newValue: String, codingVariant: Int, results: Map[String, Map[String, Seq[Double]]]) => {
results.get(newColumn) match {
case Some(tt) => {
val nestedArray = tt.getOrElse(newValue, Seq(0.0))
if (codingVariant == 0) {
nestedArray.head
} else {
nestedArray.last
}
}
case None => throw new Exception("Column not contained in initial data frame")
}
})
现在我想将我的 handleBias
函数应用于所有列,不幸的是,这不是并行执行的.
Now I want to apply my handleBias
function to all the columns, unfortunately, this is not executed in parallel.
val res = (inputToDrop ++ inputToBias).toSet.foldLeft(newDF) {
(currentDF, colName) =>
{
logger.info("using col " + colName)
handleBias(currentDF, colName)
}
}
.drop("cnt_foo_eq_1")
val combined = ((inputToDrop ++ inputToBias).toSet).foldLeft(res) {
(currentDF, colName) =>
{
currentDF
.withColumn("combined_" + colName, map(col(colName), array(col("pre_" + colName), col("pre2_" + colName))))
}
}
val columnsToUse = combined
.select(combined.columns
.filter(_.startsWith("combined_"))
map (combined(_)): _*)
val newNames = columnsToUse.columns.map(_.split("combined_").last)
val renamed = columnsToUse.toDF(newNames: _*)
val cols = renamed.columns
val localData = renamed.collect
val columnsMap = cols.map { colName =>
colName -> localData.flatMap(_.getAs[Map[String, Seq[Double]]](colName)).toMap
}.toMap
推荐答案
虽然确实如此,但对您的情况没有真正的帮助.您可以生成多个独立的DataFrames
,每个都有自己的添加项,但这并不意味着您可以自动将其合并为一个执行计划.
While it is true it doesn't really help your case. You can generate a number of independent DataFrames
, each one with its own additions, but it doesn't mean you can automatically combine this into a single execution plan.
handleBias
的每个应用程序将您的数据混洗两次,输出 DataFrames
与父 DataFrame
的数据分布不同.这就是为什么当您折叠
列的列表时,每个添加都必须单独执行.
Each application of handleBias
shuffles your data twice and output DataFrames
don't have the same data distribution as the parent DataFrame
. This is why when you fold
over the list of columns each addition has to be performed separately.
理论上您可以设计一个可以这样表达的管道(使用伪代码):
Theoretically you could design a pipeline which can be expressed (with pseudocode) like this:
添加唯一 ID:
add unique id:
df_with_id = df.withColumn("id", unique_id())
独立计算每个df并转换为宽格式:
dfs = for (c in columns)
yield handle_bias(df, c).withColumn(
"pres", explode([(pre_name, pre_value), (pre2_name, pre2_value)])
)
合并所有部分结果:
union all partial results:
combined = dfs.reduce(union)
从长格式转换为宽格式的枢轴:
pivot to convert from long to wide format:
combined.groupBy("id").pivot("pres._1").agg(first("pres._2"))
但我怀疑是否值得大惊小怪.您使用的进程非常繁重,并且需要大量的网络和磁盘 IO.
but I doubt it is worth all the fuss. The process you use is extremely heavy as it is and requires a significant network and disk IO.
如果总级别数 (sum count(distinct x)) for x in columns)
) 相对较低,您可以尝试使用例如 aggregateByKey 单次计算所有统计数据
和 Map[Tuple2[_, _], StatCounter]
否则考虑下采样到可以在本地计算统计数据的级别.
If number of total levels (sum count(distinct x)) for x in columns)
) is relatively low you can try to compute all statistics with a single pass using for example aggregateByKey
with Map[Tuple2[_, _], StatCounter]
otherwise consider downsampling to the level where you can compute statistics locally.
这篇关于spark 将函数应用于并行列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!