YarnAllocator从字面意思来看,也应该知道是在Yarn集群中分配Container的。
private[yarn] class YarnAllocator(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource],
resolver: SparkRackResolver,
clock: Clock = new SystemClock)
其中driverUrl就是Driver的地址。当用YarnAllocator分配Container来运行Executors时,这些Executors要联系的Driver地址就是构造函数里的driverRef参数。
requestTotalExecutorsWithPreferredLocalities方法是分配多个Executor的,先将分配请求保存在队列里,然后在守护线程中异步的创建Executor。
def requestTotalExecutorsWithPreferredLocalities(
requestedTotal: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
nodeBlacklist: Set[String]): Boolean = synchronized {
this.numLocalityAwareTasks = localityAwareTasks
this.hostToLocalTaskCounts = hostToLocalTaskCount
if (requestedTotal != targetNumExecutors) {
logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
targetNumExecutors = requestedTotal
allocatorBlacklistTracker.setSchedulerBlacklistedNodes(nodeBlacklist)
true
} else {
false
}
}
targetNumExecutors就是说明要申请创建多少个Executor的意思。具体的实际创建动作是在runAllocatedContainers中执行的。
launcherPool.execute(() => {
try {
new ExecutorRunnable(
Some(container),
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources
).run()
updateInternalState()
} catch {
}
这一段就是在分配的Container上创建Executor的过程,用ExecutorRunner来包装的。其中的driverUrl就是构造函数中带过来的driver的地址。
可见每个driver都会创建一个属于自己的单独的YarnAllocator。
顺便说一句,很多人以前Spark是集群,实际上Spark本身只是一种计算方式,可以看成它只是jar包。Spark的driver运行时才会去申请Executor,向Yarn申请或者向Standalone集群申请。Standalone集群是指Master和Worker,不是指Spark core,更不是指SparkContext。
这里只是谈了YarnAllocator的大致的主要的两个动作,涉及到Yarn的Container的知识请读者自行补脑吧,这里不展开了。