Spark TaskSchedulerImpl TaskSet处理
- SPARK 源码分析技术分享(bilibilid视频汇总套装视频):
- github:
- csdn(汇总视频在线看):
- Spark TaskSchedulerImpl TaskSet原理分析(bilibili视频) :
- Spark TaskSchedulerImpl TaskSet原码分析(bilibili视频) :
<iframe src="//" scrolling="no" border="0" frameborder="no" framespacing="0" allowfullscreen="true"> </iframe>
<iframe src="//" scrolling="no" border="0" frameborder="no" framespacing="0" allowfullscreen="true"> </iframe>
- 在DAGScheduler.scal中件中的submitMissingTasks()方法中调用 taskScheduler.submitTasks
- 把任务集通过任务调度器进行提交
taskScheduler.submitTasks(new TaskSet(
tasks.toArray,, stage.latestInfo.attemptId, jobId, properties))
- 任务调度器实现
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + + " with " + tasks.length + " tasks")
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${{}.mkString(",")}")
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
hasReceivedTask = true
- 把任务集放到TaskSetManager(任务集管理器)中
- TaskSetManager(任务集管理器)继承 Schedulable,(可调度元素,就是把到调度池队列中的一个元素,供调度使用)
val manager = createTaskSetManager(taskSet, maxTaskFailures)
- 把任务集管理器增加到指定调度类型(FIFO,PAIR)的调度池中,也就是调度池中的调度队列中schedulableQueue
- 此时,相当于需要调度的任务已有了,存放在调度池中,下面是用具体的调度算法,按指定的顺序调度池中的任务
- 任务调度器的submitTasks()方法中调用 backend.reviveOffers()方法,backend为SparkDeploySchedulerBackend,继承CoarseGrainedSchedulerBackend,所以调用的是CoarseGrainedSchedulerBackend中的reviveOffers()方法
- 相当于是给Driver发送消息ReviveOffers
override def reviveOffers() {
- driverEndpoint 中receive()方法处理消息,调用makeOffers()方法
case ReviveOffers =>
- scheduler.resourceOffers(workOffers)会计算出需要启动的任务序列
- resourceOffers()方法中调用方法得到调度任务的队列(按指定顺序的) rootPool.getSortedTaskSetQueue()
- launchTasks()方法把启动任务消息发送给executor
// Make fake resource offers on all executors
private def makeOffers() {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
- 计算当前stage转换的TaskSet中的部分任务,发送执行任务的消息executor处理
* Called by cluster manager to offer resources on slaves. We respond by asking our active task
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
* that tasks are balanced across the cluster.
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
var newExecAvail = false
for (o <- offers) {
if (!hostToExecutors.contains( {
hostToExecutors( = new HashSet[String]()
if (!executorIdToTaskCount.contains(o.executorId)) {
hostToExecutors( += o.executorId
executorIdToHost(o.executorId) =
executorIdToTaskCount(o.executorId) = 0
newExecAvail = true
for (rack <- getRackForHost( {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) +=
// Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
// this here to avoid a separate thread and added synchronization overhead, and also because
// updating the blacklist is only relevant when task offers are being made.
val filteredOffers = { blacklistTracker =>
offers.filter { offer =>
!blacklistTracker.isNodeBlacklisted( &&
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
val shuffledOffers = Random.shuffle(filteredOffers)
// Build a list of tasks to assign to each worker.
val tasks = => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(,, taskSet.runningTasks))
if (newExecAvail) {
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
for (taskSet <- sortedTaskSets) {
var launchedAnyTask = false
var launchedTaskAtCurrentMaxLocality = false
for (currentMaxLocality <- taskSet.myLocalityLevels) {
do {
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
if (!launchedAnyTask) {
if (tasks.size > 0) {
hasLaunchedTask = true
return tasks
- TaskSchedulerImpl中方法resourceOffers()中调用 resourceOfferSingleTaskSet()方法
- 就算看当前的任务集中任务,按worker机器的cpu内核数进行分配每次发送几个任务给executor进行启动
- 例: stage中TaskSet包含的任务个数是3个,worker 机器的cpu内核数为2,此时就需要把TastSet中的任务3个,拆分成两次,每一次是2个任务,第二次是1个任务,并行任务按cpu内核最大数来决定
for (taskSet <- sortedTaskSets) {
var launchedAnyTask = false
var launchedTaskAtCurrentMaxLocality = false
for (currentMaxLocality <- taskSet.myLocalityLevels) {
do {
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
if (!launchedAnyTask) {
- 看具体的方法 taskSet.resourceOffer().从当前stage的taskSet还剩下未处理的任务中,取出worker机器分配的cpu数取新的任务,发送给executor执行
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
// nodes and executors that are blacklisted for the entire application have already been
// filtered out by this point
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToTaskCount(execId) += 1
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
} catch {
case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
return launchedTask
- 该方法重点看 dequeueTask()方法
* Respond to an offer of a single executor from the scheduler by finding a task
* NOTE: this function is either called with a maxLocality which
* would be adjusted by delay scheduling algorithm or it will be with a special
* NO_PREF locality which will be not modified
* @param execId the executor Id of the offered resource
* @param host the host Id of the offered resource
* @param maxLocality the maximum locality we want to schedule the tasks at
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
blacklist.isNodeBlacklistedForTaskSet(host) ||
if (!isZombie && !offerBlacklisted) {
val curTime = clock.getTimeMillis()
var allowedLocality = maxLocality
if (maxLocality != TaskLocality.NO_PREF) {
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
// We're not allowed to search for farther-away tasks
allowedLocality = maxLocality
dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
// Found a task; do some bookkeeping and return a task description
val task = tasks(index)
val taskId = sched.newTaskId()
// Do various bookkeeping
copiesRunning(index) += 1
val attemptNum = taskAttempts(index).size
val info = new TaskInfo(taskId, index, attemptNum, curTime,
execId, host, taskLocality, speculative)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
// NO_PREF will not affect the variables related to delay scheduling
if (maxLocality != TaskLocality.NO_PREF) {
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
// Serialize and return the task
val startTime = clock.getTimeMillis()
val serializedTask: ByteBuffer = try {
Task.serializeWithDependencies(task,,, ser)
} catch {
// If the task cannot be serialized, then there's no point to re-attempt the task,
// as it will always fail. So just abort the whole task-set.
case NonFatal(e) =>
val msg = s"Failed to serialize task $taskId, not attempting to retry it."
logError(msg, e)
abort(s"$msg Exception during serialization: $e")
throw new TaskNotSerializableException(e)
if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
!emittedTaskSizeWarning) {
emittedTaskSizeWarning = true
logWarning(s"Stage ${task.stageId} contains a task of very large size " +
s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
// We used to log the time it takes to serialize the task, but task size is already
// a good proxy to task serialization time.
// val timeTaken = clock.getTime() - startTime
val taskName = s"task ${} in stage ${}"
logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)")
sched.dagScheduler.taskStarted(task, info)
new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
taskName, index, serializedTask)
} else {
- 该方法重点看 TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)
- allPendingTasks中的数据是在TaskSetManager实例方法中调用,并且会按反序增加(......2,1,0)
- 调用 dequeueTaskFromList方法,移除最后一个任务,也就是任务集索引中排在最前的任务
for (i <- (0 until numTasks).reverse) {
* Dequeue a pending task for a given node and return its index and locality level.
* Only search for tasks matching the given locality constraint.
* @return An option containing (task index within the task set, locality, is speculative?)
private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value, Boolean)] =
for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL, false))
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL, false))
if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {
return Some((index, TaskLocality.ANY, false))
// find a speculative task if all others tasks have been scheduled
dequeueSpeculativeTask(execId, host, maxLocality).map {
case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
- 判断当前stage的TaskSet中是示还有未被处理的Task,如果还有就继续找出来发送给Executor执行
* Dequeue a pending task from the given list and return its index.
* Return None if the list is empty.
* This method also cleans up any tasks in the list that have already
* been launched, since we want that to happen lazily.
private def dequeueTaskFromList(
execId: String,
host: String,
list: ArrayBuffer[Int]): Option[Int] = {
var indexOffset = list.size
while (indexOffset > 0) {
indexOffset -= 1
val index = list(indexOffset)
if (!isTaskBlacklistedOnExecOrNode(index, execId, host)) {
// This should almost always be list.trimEnd(1) to remove tail
if (copiesRunning(index) == 0 && !successful(index)) {
return Some(index)
- 粗粒度调度器调用启动任务的方法
- 给executor 发送 消息 LaunchTask()
// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
"spark.akka.frameSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
} catch {
case e: Exception => logError("Exception in error callback", e)
else {
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
- CoarseGrainedExecutorBackend 收到消息: LaunchTask()
- receive() 消息处理
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,, taskDesc.serializedTask)
- executor 通过线程池调用 TaskRunner
- TaskRunner的run()会被调用
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
runningTasks.put(taskId, tr)
- launchTask 的run()执行后会调用 statusUpdate()方法,发送任务状态为已完成
- CoarseGrainedExecutorBacker中的 statusUpdate()方法会给Driver发送消息StatusUpdate()
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
val msg = StatusUpdate(executorId, taskId, state, data)
driver match {
case Some(driverRef) => driverRef.send(msg)
case None => logWarning(s"Drop $msg because has not yet connected to driver")
Pool 中有removeScheduler()方法
- 该方法被调用 -> TaskSchedulerImpl中 taskSetFinished()
override def removeSchedulable(schedulable: Schedulable) {
TaskSchedulerImpl中 taskSetFinished()
- 该方法被调用 -> TaskSetManager.maybeFinishTaskSet()
* Called to indicate that all task attempts (including speculated tasks) associated with the
* given TaskSetManager have completed, so state associated with the TaskSetManager should be
* cleaned up.
def taskSetFinished(manager: TaskSetManager): Unit = synchronized {
taskSetsByStageIdAndAttempt.get(manager.taskSet.stageId).foreach { taskSetsForStage =>
taskSetsForStage -= manager.taskSet.stageAttemptId
if (taskSetsForStage.isEmpty) {
taskSetsByStageIdAndAttempt -= manager.taskSet.stageId
logInfo(s"Removed TaskSet ${}, whose tasks have all completed, from pool" +
s" ${}")
- runningTasks 为0时才被调用,说明此时任务都已经运行完了
- && 条件(tasksSuccessful == numTasks) 即,所有的任务都运行成功才说明是整个任务集已完成,TaskSetManager.handleSuccessfulTask()方法中,每一个任务完成成功后,会tasksSuccessful += 1
- 该方法被调用 TaskSetManager.handleSuccessfulTask()
private def maybeFinishTaskSet() {
if (isZombie && runningTasks == 0) {
if (tasksSuccessful == numTasks) {
- 被TaskSetGetter.enqueueSuccessfulTask()调用
- TaskSetGetter.enqueueSuccessfulTask()被TaskSchedulerImpl.statusUpdate()方法调用
- executor 在执行完任务后,触发发送消息: StatusUpdate
* Marks the task as successful and notifies the DAGScheduler that a task has ended.
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
// Note: "result.value()" only deserializes the value when it's called at the first time, so
// here "result.value()" just returns the value and won't block other threads.
tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
if (!successful(index)) {
tasksSuccessful += 1
logInfo(s"Finished task ${} in stage ${} (TID ${info.taskId}) in" +
s" ${info.duration} ms on ${} (executor ${info.executorId})" +
s" ($tasksSuccessful/$numTasks)")
// Mark successful and stop if all the tasks have succeeded.
successful(index) = true
if (tasksSuccessful == numTasks) {
isZombie = true
} else {
logInfo("Ignoring task-finished event for " + + " in stage " + +
" because task " + index + " has already completed successfully")
- CoarseGrainedSchedulerBackend.DriverEndpoint的receive()方法中处理消息:StatusUpdate()
- 调用方法 scheduler.statusUpdate(taskId, state, data.value)
override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
synchronized {
try {
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
// We lost this entire executor, so remember that it's gone
val execId = taskIdToExecutorId(tid)
if (executorIdToTaskCount.contains(execId)) {
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
failedExecutor = Some(execId)
taskIdToTaskSetManager.get(tid) match {
case Some(taskSet) =>
if (TaskState.isFinished(state)) {
taskIdToExecutorId.remove(tid).foreach { execId =>
if (executorIdToTaskCount.contains(execId)) {
executorIdToTaskCount(execId) -= 1
if (state == TaskState.FINISHED) {
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
case None =>
("Ignoring update with state %s for TID %s because its task set is gone (this is " +
"likely the result of receiving duplicate task finished status updates)")
.format(state, tid))
} catch {
case e: Exception => logError("Exception in statusUpdate", e)
// Update the DAGScheduler without holding a lock on this, since that can deadlock
if (failedExecutor.isDefined) {