让我帮助澄清有关洗牌的深度以及Spark如何使用洗牌管理器。我报告了一些非常有用的资源:
https://trongkhoanguyenblog.wordpress.com/
https://0x0fff.com/spark-architecture-shuffle/
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md
阅读它们,我了解到有不同的洗牌经理。我想关注其中两个:hash manager
和sort manager
(这是默认管理器)。
为了公开我的问题,我想从一个非常普通的转变开始:
val rdd = reduceByKey(_ + _)
此转换会导致地图端聚合,然后重新组合以将所有相同的键引入相同的分区。
我的问题是:
是在内部使用mapPartition转换实现Map-Side聚合,从而使用合并器函数聚合所有相同的键,还是使用
AppendOnlyMap
或ExternalAppendOnlyMap
实现?如果将
AppendOnlyMap
或ExternalAppendOnlyMap
映射用于聚合,是否还将它们用于减少发生在ResultTask
中的侧面聚合?这两种地图(
AppendOnlyMap
或ExternalAppendOnlyMap
)的目的是什么?是在所有洗牌管理器中还是仅在sortManager中使用
AppendOnlyMap
或ExternalAppendOnlyMap
?我读到
AppendOnlyMap
或ExternalAppendOnlyMap
填满后,溢出到文件中,此步骤究竟发生了什么?使用Sort shuffle管理器,我们使用appendOnlyMap进行汇总和合并分区记录,对吗?然后,当执行内存已满时,我们开始对映射进行排序,将其溢出到磁盘上,然后清理该映射,我的问题是:溢出到磁盘和随机写入之间有什么区别?它们基本上是在本地文件系统上创建文件,但是对它们的处理不同,Shuffle写入记录不会放入appendOnlyMap中。
您能否深入说明执行reduceByKey时会发生什么,并向我说明完成该过程所涉及的所有步骤?例如,类似于地图端聚合,混排等等的所有步骤。
最佳答案
遵循reduceByKey
的逐步说明:reduceByKey
调用combineByKeyWithTag
,具有身份合并器和相同的合并值并创建值combineByKeyWithClassTag
创建一个Aggregator
并返回ShuffledRDD
。 “ map”和“ reduce”侧聚合都使用内部机制,而不使用mapPartitions
。Agregator
将ExternalAppendOnlyMap
用于combineValuesByKey
(“地图边减少”)和combineCombinersByKey
(“减少边减少”)
两种方法都使用ExternalAppendOnlyMap.insertAllMethod
ExternalAppendOnlyMap
keeps track of spilled parts和当前的内存映射(SizeTrackingAppendOnlyMap
)insertAll
方法更新内存中的映射,如果当前映射的估计大小超出阈值,则checks on insert。它使用继承的Spillable.maybeSpill
方法。如果超过阈值,则此方法调用spill
as a side effect,而insertAll
初始化clean SizeTrackingAppendOnlyMap
spill
从块管理器调用spillMemoryIteratorToDisk
which gets DiskBlockObjectWriter
object。insertAll
步骤适用于地图,并通过相应的Aggregator
函数(其中包括随机播放阶段)来减少侧面聚合。
从Spark 2.0开始,只有基于排序的管理器:SPARK-14667