1、DagScheduler分析
DagScheduler功能主要是负责RDD的各个stage的分解和任务提交。Stage分解是从触发任务调度过程的finalStage开始倒推寻找父stage,如果父stage没有提交任务则循环提交缺失的父stage。每个stage有一个父RDD的概念,根据分区数的多少创建多个任务(Task)。
Task的调度实际是通过TaskSchedulerImp来完成的,TaskSchedulerImp里根据环境部署的不同又会使用不同的Backend,比如Yarn集群、独立集群等其Backend是不一样的,这里先有个概念,先不深究Backend。
这里先看看DagScheduler的核心逻辑把。里面首先要研究的一个方法:
def submitMissingTasks(stage: Stage, jobId: Int)
该方法就是提交stage执行,为什么叫这个名称呢?说明这里的stage是需先需要提交执行的,没有其他依赖的stage还未执行了。
submitMissingTasks方法会根据RDD的依赖关系创建两种task,ResultTask和ShuffleMapTask。
一步步来,只看关键代码,因为整体代码太多了不利于理解关键逻辑。
1.1 生成序列化的taskBinary
taskBinaryBytes = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
taskBinaryBytes待会是要封装成对像分发到远端Executor上执行的,所以必须是可序列化的。
两者最主要区别就是:ShuffleMapStage的入参是依赖的shuffleDep;而ResultStage的入参是函数的定义func。
1.2 生成task
现在有了taskBinaryBytes,下一步就是生成Task了。
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
两种Task类型:ShuffleMapTask和ResultTask。这里要主要的是对Task而言,有多少分区(partition)就会生成多少个Task,Task是到分区维度的,而不是到RDD维度的,这个概念一定要明确。
1.3 提交Task
最后一步就是提交任务执行。这里就要用到taskScheduler了,当然了,这里的taskScheduler目前就是指TaskSchedulerImp。
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
DagScheduler里还有一个方法这里可以提一下,就是:
submitWaitingChildStages(stage)
这个方法是提交等待当前stage执行的等待stage,这样DAG的整个调度过程就完整了。
2、Task执行
两种Task类型:ShuffleMapTask和ResultTask。
2.1 ResultTask
我们先看ResultTask的执行,它相对比较简单,核心方式是runTask,核心代码:
override def runTask(context: TaskContext): U = {
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
func(context, rdd.iterator(partition, context))
}
反序列化出来RDD和func,然后执行rdd的iterator方法获取数据集,并在此数据集上执行func函数,要注意实际上这是一次迭代过程而不是多次迭代过程。
2.2 ShuffleMapTask
ShuffleMapTask任务的执行相对复杂些。
核心方法还是runTask,核心代码:
override def runTask(context: TaskContext): MapStatus = {
val ser = SparkEnv.get.closureSerializer.newInstance()
val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
val rdd = rddAndDep._1
val dep = rddAndDep._2
dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition)
}
首先反序列化出RDD和依赖项ShuffleDependency。然后用ShuffleWriterProcessor写数据到RDD。
这里的dep其实没太大意义,主要就是来判断是否要进行合并使用的,不影响理解整个shuffle流程,所以我们可以先不要管dep:
dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition)
这里的rdd实际就是ShuffleMapTask所要生成的数据集。这句代码到底是什么意思呢? ShuffleWriterProcessor实际上是将数据集写到了BlockManager上去的,先看看ShuffleWriterProcessor的含义。
2.3 ShuffleWriterProcessor
ShuffleWriterProcessor的关键方法的定义先看一下。
def write(rdd: RDD[_],dep: ShuffleDependency[_, _, _],
partitionId: Int, context: TaskContext,partition: Partition): MapStatus = {
var writer: ShuffleWriter[Any, Any] = null
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](
dep.shuffleHandle,
partitionId,
context,
createMetricsReporter(context))
writer.write(
rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
}
ShuffleManager实际上就是BlockManager,管理块空间的。
Write是Shuffle写入器,写到BlockManager去;rdd.iterator(partition, context)就是当前Shuffle类型的RDD定义的数据集,dep是rdd计算数据集时依赖的RDD(这里的dep没多大意思先不管)。
这段代码的作用就是将shuffle rdd数据集输出到BlockManager上,在读取RDD的数据时,如果该RDD是shuffle类型,则需要到BlockManager上去读取,这里就是这个作用。
2.4 Shuffle RDD的相关概念
Shuffle类的RDD是指这类RDD的compute方法是依赖于其他RDD的,这里的其他RDD可以是多个。执行shuffle的RDD的计算过程的时候,是将一到多个依赖RDD的迭代器的输出作为数据源迭代器,在此之上执行自己的操作。所以shuffle RDD的compute方法里一定会用到依赖RDD的iterator方法。
可以看看CoGroupedRDD的源码,就能很快的理解shuffle的含义。
附录:github源码地址:
https://github.com/apache/spark/tree/master/core/src/main/scala/org/apache/spark