让我帮助澄清有关洗牌的深度以及Spark如何使用洗牌管理器。我报告了一些非常有用的资源:

https://trongkhoanguyenblog.wordpress.com/

https://0x0fff.com/spark-architecture-shuffle/

https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md

阅读它们,我了解到有不同的洗牌经理。我想关注其中两个:hash managersort manager(这是默认管理器)。

为了公开我的问题,我想从一个非常普通的转变开始:

val rdd = reduceByKey(_ + _)


此转换会导致地图端聚合,然后重新组合以将所有相同的键引入相同的分区。

我的问题是:


是在内部使用mapPartition转换实现Map-Side聚合,从而使用合并器函数聚合所有相同的键,还是使用AppendOnlyMapExternalAppendOnlyMap实现?
如果将AppendOnlyMapExternalAppendOnlyMap映射用于聚合,是否还将它们用于减少发生在ResultTask中的侧面聚合?
这两种地图(AppendOnlyMapExternalAppendOnlyMap)的目的是什么?
是在所有洗牌管理器中还是仅在sortManager中使用AppendOnlyMapExternalAppendOnlyMap
我读到AppendOnlyMapExternalAppendOnlyMap填满后,溢出到文件中,此步骤究竟发生了什么?
使用Sort shuffle管理器,我们使用appendOnlyMap进行汇总和合并分区记录,对吗?然后,当执行内存已满时,我们开始对映射进行排序,将其溢出到磁盘上,然后清理该映射,我的问题是:溢出到磁盘和随机写入之间有什么区别?它们基本上是在本地文件系统上创建文件,但是对它们的处理不同,Shuffle写入记录不会放入appendOnlyMap中。
您能否深入说明执行reduceByKey时会发生什么,并向我说明完成该过程所涉及的所有步骤?例如,类似于地图端聚合,混排等等的所有步骤。

最佳答案

遵循reduceByKey的逐步说明:


reduceByKey调用combineByKeyWithTag,具有身份合并器和相同的合并值并创建值
combineByKeyWithClassTag创建一个Aggregator并返回ShuffledRDD。 “ map”和“ reduce”侧聚合都使用内部机制,而不使用mapPartitions
AgregatorExternalAppendOnlyMap用于combineValuesByKey(“地图边减少”)和combineCombinersByKey(“减少边减少”)
两种方法都使用ExternalAppendOnlyMap.insertAllMethod
ExternalAppendOnlyMap keeps track of spilled parts和当前的内存映射(SizeTrackingAppendOnlyMap
insertAll方法更新内存中的映射,如果当前映射的估计大小超出阈值,则checks on insert。它使用继承的Spillable.maybeSpill方法。如果超过阈值,则此方法调用spill as a side effect,而insertAll初始化clean SizeTrackingAppendOnlyMap
spill从块管理器调用spillMemoryIteratorToDisk which gets DiskBlockObjectWriter object


insertAll步骤适用于地图,并通过相应的Aggregator函数(其中包括随机播放阶段)来减少侧面聚合。

从Spark 2.0开始,只有基于排序的管理器:SPARK-14667

10-05 23:56