目录
- 一、概述
- 二、shuffle的定义
- 三、ShuffleMananger发展概述
- 四、HashShuffleManager的运行原理
- 4.1 未经优化的HashShuffleManager
- 4.2 优化后的HashShuffleManager
- 五、SortShuffleManager运行原理
- 5.1 普通运行机制
- 5.2 bypass运行机制
- 六、shuffle相关参数调优
- spark.shuffle.file.buffer
- spark.reducer.maxSizeInFlight
- spark.shuffle.io.retryWait
- spark.shuffle.memoryFraction(已经弃用)
- spark.shuffle.manager(已经弃用)
- spark.shuffle.sort.bypassMergeThreshold
- spark.shuffle.consolidateFiles(已经弃用)
一、概述
大多数Spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘、序列化、网络数据传输等操作.因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优.但是也必须提醒大家的是,影响一个Spark作业性能的因素,主要还是代码开资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已.因此大家务必把握住调优的基本原则,千万不要舍本逐末
二、shuffle的定义
Spark的运行主要分为2部分:
一部分是驱动程序,其核心是SparkContext;
另一部分是Worker节点上Task,它是运行实际任务的.程序运行的时候,Driver和Executor进程相互交互: 运行什么任务,即Driver会分配Task到Executor,Driver跟Executor进行网络传输;任务数据从哪儿获取,即Task要从Driver抓取其上游的Task的数据结果,所有有这个过程汇总就不断的产生网络结果.其中,下一个Stage向上一个Stage要数据这个过程,我们就称之为shuffle
.
三、ShuffleManager发展概述
在Spark的源码中,负责shuffle过程的执行、计算和处理的组建主要就是ShuffleManager,也即shuffle管理器.而随着Spark的版本的发展,ShuffleManager也在不断迭代,变得越来越先进.
在Spark1.2版本以前
,默认的shuffle计算引擎是HashShuffleManager.该ShuffleManager的HashShuffleManager有着一个非常严重的弊端
,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能
.
因此在Spark1.2以后的版本中
,默认的ShuffleManager改成了SortShuffleManager
.SortShuffleManager相较于HashShuffleManager来说,有了一定的改进
.主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件
,因此每个Task就只有一个磁盘文件
.在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取
每个磁盘文件中的部分数据即可.
下面我们详细分析一下HashShuffleManager和SortShuffleManager的原理.
四、HashShuffleManager的运行原理
4.1 未经优化的HashShuffleManager
图解说明

文字说明
上面说明了未经优化的HashShuffleManager的原理.这里我们先明确一个假设前提: 每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程.
我们先从shuffle write开始说起.shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据key进行"分类".所谓"分类",就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task.在将数据写入磁盘之前,会先将数据写入内存缓冲区中,当内存缓冲区填满之后,才会溢写到磁盘磁盘文件中去.
那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?很简单,下一个stage的task由多少个,当前stage的没饿过task就要创建多少份磁盘文件.比如下一个stage共有100个task,那么当前stage的每个task都要创建100份磁盘文件.如果当前stage有50个task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件.由此看见,未经优化的shuffle write操作所产生的磁盘文件的数量是及其惊人的.
接着我们来说手shuffle read. shuffle read,通常就是一个stage刚开始时要做的事情.此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作.由于shuffle write的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程汇总,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可.
shuffle read的拉取过程时一边拉取一遍进行聚合的.每个shuffle task都会有一个自己的buffer缓冲区,每次都只能拉取于buffer缓冲区相同大小的数据,然后通过内存中的一个Map进行聚合等操作.聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲区中进行聚合操作.依次类推,知道最后将所有数据都拉取完,并得到最终的结果.
4.2 优化后的HashShuffleManager
图解说明

文字说明
上图说明了优化后的HashShuffleManager的原理.这里说的优化,是指我们可以设置一个参数,spark.shuffle.consolidateFiles.该参数默认值为false,将其设置为true即可开启优化机制.通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项.
开启consolidate机制后,在shuffle write过程汇总,task就不是为下游stage的每个task创建一个磁盘文件了.此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的.一个Executor上有多少个CPU core,就可以并行执行多少个task.而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内.
当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件.也就是说,此时task会讲数据吸入已有的磁盘文件中,而不会写入新的磁盘文件中.因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能.
假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个task.那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的.但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为: CPU core的数量 * 下一个stage的task数量.也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件.
就是说,将第一个阶段的所有磁盘文件进行归并,归并成一个文件.然后进入第二阶段.
spark.shuffle.consolidateFiles | false | If set to "true", consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance for shuffles with large numbers of reduce tasks. It is recommended to set this to "true" when using ext4 or xfs filesystems. On ext3, this option might degrade performance on machines with many (>8) cores due to filesystem limitations. |
spark.shuffle.consolidateFiles在1.6.0版本已移
五、SortShuffleManager运行原理
SortShuffleManager的运行机制主要分为两种,一种是普通运行机制,另一种是bypass运行机制.当shuffle read task的数量小于等于sprak.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制.
5.1 普通运行机制
图解说明

5.2 bypass运行机制
图解说明

文字说明
上图说明了bypass SortShuffleManager的原理.bypass运行机制的触发条件如下:
shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值.
不是聚合类的shuffle算子(比如reduceByKey)
此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash,然后根据key的hash值,将key写入对应的磁盘文件之中.当然,写入磁盘文件时也是先写入内存缓冲区,缓冲区写满之后再溢写到磁盘文件.最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件.
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一摸一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已.因此少量的最终磁盘文件,也让该机制相对为经优化的HashShuffleManager来说,shuffle read的性能会更好.
而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同
;第二,不会进行排序
.也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省了这部分的性能开销
.