StandaloneAppClient是什么?这个很容易搞混淆。其实StandaloneAppClient不是SparkApplication,它主要是用在ScheduleBackend中的。
独立集群环境中,ScheduleBackend是用的StandaloneScheduleBackend,它继承了CoarseGrainedSchedulerBackend类。
StandaloneScheduleBackend里面用了一个叫StandaloneAppClient的类,这个StandaloneAppClient很具有迷惑性,其实它的主要功能是替换CoarseGrainedSchedulerBackend的资源申请的方法,改为向Master申请资源,我们看看相关代码片段就行了。
先看他启动的时候:
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
for (masterAddress <- masterRpcAddresses) yield {
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = try {
if (registered.get) {
return
}
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
masterRef.send(RegisterApplication(appDescription, self))
} catch {
}
})
}
}
向Master发送RegisterApplication消息,将本appDesc注册给Master,这个和DriverDescription注册到Master是有点区别的。
再比如资源申请的代码:
def requestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
if (endpoint.get != null && appId.get != null) {
endpoint.get.ask[Boolean](RequestExecutors(appId.get, requestedTotal))
} else {
logWarning("Attempted to request executors before driver fully initialized.")
Future.successful(false)
}
}
就是向Master发送RequestExecutor消息申请Executor资源。
这里为啥要注册Application到Master呢?主要是当Master失效或者Master更改时,能通知到Application,这样就能重新连接新的Master了,重新运行spark程序。否则就很脆弱,很容易崩溃,这是我的理解哦,不一定正确,^~^