Spark数据本地化是在哪个阶段计算首选位置的?
先看一下DAGScheduler的注释,可以看到DAGScheduler除了Stage和Task的划分外,还做了缓存的跟踪和首选运行位置的计算。
DAGScheduler注释:
DAGScheduler的运行时机
DAGScheduler运行时机:Driver端初始化SparkContext时。DAGScheduler是在整个Spark Application的入口即 SparkContext中声明并实例化的。在实例化DAGScheduler之前,巳经实例化了SchedulerBackend和底层调度器 TaskScheduler。
如果是SQL任务的话,SparkSQL通过Catalyst(Spark SQL的核心是Catalyst优化器)将SQL先翻译成逻辑计划再翻译成物理计划,再转换成RDD的操作。之后运行时再通过DAGScheduler做RDD任务的划分和调度。
DAGScheduler如何划分Stage的?
用户提交的计算任务是一个由RDD依赖构成的DAG,Spark会把RDD的依赖以shuffle依赖为边界划分成多个Stage,这些Stage之间也相互依赖,形成了Stage的DAG。然后,DAGScheduler会按依赖关系顺序执行这些Stage。
要是把RDD依赖构成的DAG看成是逻辑执行计划(logic plan),那么,可以把Stage看成物理执行计划,为了更好的理解这个概念,我们来看一个例子。
下面的代码用来对README.md文件中包含整数值的单词进行计数,并打印RDD之间的依赖关系(Lineage):
scala> val counts = sc.textFile("README.md")
.flatMap(x=>x.split("\\W+"))
.filter(_.matches(".*\\d.*"))
.map(x=>(x,1))
.reduceByKey(_+_)
// 调用一个action函数,用来触发任务的提交和执行
scala> counts.collect()
// 打印RDD的依赖关系(Lineage)
scala> counts.toDebugString
res7: String =
(2) ShuffledRDD[17] at reduceByKey at <console>:24 []
+-(2) MapPartitionsRDD[16] at map at <console>:24 []
| MapPartitionsRDD[15] at filter at <console>:24 []
| MapPartitionsRDD[14] at flatMap at <console>:24 []
| README.md MapPartitionsRDD[13] at textFile at <console>:24 []
| README.md HadoopRDD[12] at textFile at <console>:24 []
DAGScheduler会根据Shuffle划分前后两个Stage:即StageShuffleMapStage和ResultStage
ShuffleMapStage
先看下ShuffleMapStage的注释,核心就是再讲ShuffleMapStage是做ShuffleWrite的Stage,Stage中是算子的pipline。
ShuffleMapStages是在DAG执行过程中产生的Stage,用来为Shuffle产生数据。ShuffleMapStages发生在每个Shuffle操作之前,在Shuffle之前可能有多个窄转换操作,比如:map,filter,这些操作可以形成流水线(pipeline)。当执行ShuffleMapStages时,会产生Map的输出文件,这些文件会被随后的Reduce任务使用。
ShuffleMapStages也可以作为Jobs,通过DAGScheduler.submitMapStage函数单独进行提交。对于这样的Stages,会在变量mapStageJobs
中跟踪提交它们的ActiveJobs。 要注意的是,可能有多个ActiveJob尝试计算相同的ShuffleMapStages。
它为一个shuffle过程产生map操作的输出文件。它也可能是自适应查询规划/自适应调度工作的最后阶段。
ResultStage
再看ResultStage的注释
ResultStage是Job的最后一个Stage,该Stage是基于执行action函数的rdd来创建的。该Stage用来计算一个action操作的结果。该类的声明如下:
private[spark] class ResultStage(
id: Int,
rdd: RDD[_],
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
parents: List[Stage], //依赖的父Stage
firstJobId: Int,
callSite: CallSite)
extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {
为了计算action操作的结果,ResultStage会在目标RDD的一个或多个分区上使用函数:func
。需要计算的分区id集合保存在成员变量:partitions
中。但对于有些action操作,比如:first(),take()等,函数:func
可能不会在所有分区上使用。
另外,在提交Job时,会先创建ResultStage。但在提交Stage时,会先递归找到该Stage依赖的父级Stage,并先提交父级Stage。如下图所示:
举个例子:
思考题
如下rdd运算,为什么最终只划分了3个Stage
scala> val rdd1 = sc.textFile("/root/tmp/a.txt",3).flatMap(x=>x.split(",")).map(x=>(x,1)).reduceByKey((a,b)=>a+b)
val rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:1
scala> val rdd2 = sc.textFile("/root/tmp/a.txt",3).flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
val rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:1
scala> val rdd3 = rdd1.join(rdd2)
val rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[12] at join at <console>:1
scala> val rdd4 = rdd3.groupByKey()
val rdd4: org.apache.spark.rdd.RDD[(String, Iterable[(Int, Int)])] = MapPartitionsRDD[13] at groupByKey at <console>:1
scala> rdd4.collect().foreach(println)
(c,Seq((2,2)))
(d,Seq((1,1)))
(a,Seq((2,2)))
(b,Seq((1,1)))
scala> rdd4.toDebugString
val res8: String =
(3) MapPartitionsRDD[13] at groupByKey at <console>:1 []
| MapPartitionsRDD[12] at join at <console>:1 []
| MapPartitionsRDD[11] at join at <console>:1 []
| CoGroupedRDD[10] at join at <console>:1 []
| ShuffledRDD[4] at reduceByKey at <console>:1 []
+-(3) MapPartitionsRDD[3] at map at <console>:1 []
| MapPartitionsRDD[2] at flatMap at <console>:1 []
| /root/tmp/a.txt MapPartitionsRDD[1] at textFile at <console>:1 []
| /root/tmp/a.txt HadoopRDD[0] at textFile at <console>:1 []
| ShuffledRDD[9] at reduceByKey at <console>:1 []
+-(3) MapPartitionsRDD[8] at map at <console>:1 []
| MapPartitionsRDD[7] at flatMap at <console>:1 []
| /root/tmp/a.txt MapPartitionsRDD[6] at textFile at <console>:1 []
| /root/t...