文章目录
零. RpcService服务概述
RpcService负责创建和启动Flink集群环境中RpcEndpoint组件的RpcServer,且RpcService在启动集群时会提前创建好。AkkaRpcService作为RpcService的唯一实现类,基于Akka的ActorSystem进行封装,为不同的RpcEndpoint创建相应的ActorRef实例。
RpcService主要包含如下两个重要方法。
1. AkkaRpcService的创建和初始化
在创建和启动ClusterEntrypoint及TaskManagerRunner的过程中,会调用AkkaRpcServiceUtils.createRpcService()方法创建默认的AkkaRpcService,接着启动RpcServer。
创建AkkaRpcService主要包括如下步骤。
2.通过AkkaRpcService初始化RpcServer
在集群运行时中创建了共用的AkkaRpcService服务,相当于创建了Akka系统中的ActorSystem,接下来就是使用AkkaRpcService启动各个RpcEndpoint中的RpcServer实例。(AkkaRpcService服务作为共用的rpc服务,启动其他各个组件的RpcServer实例?)
这里先看通过AkkaRpcService初始化RpcEndpoint对应的RpcServer的逻辑。如下在org.apache.flink.runtime.rpc.RpcEndpoint的构造器中,执行了RpcServer的初始化
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
this.rpcService = checkNotNull(rpcService, "rpcService");
this.endpointId = checkNotNull(endpointId, "endpointId");
// 初始化RpcEndpoint中的RpcServer
this.rpcServer = rpcService.startServer(this);
this.mainThreadExecutor = new MainThreadExecutor(rpcServer,
this::validateRunsInMainThread);
}
具体看下rpcService.startServer(this)
启动rpcServer的逻辑
- 创建好InvocationHandler代理类后,通过反射的方式(Proxy.newProxyInstance())创建代理类。创建的代理类会被转换为RpcServer实例,再返回给RpcEndpoint使用。
在RpcServer创建的过程中可以看出,实际上包含了创建RpcEndpoint中的Actor引用类ActorRef和AkkaInvocationHandler动态代理类。最后将动态代理类转换为RpcServer接口返回给RpcEndpoint实现类,此时实现的组件就能够获取到RpcServer服务
,且通过RpcServer代理了所有的RpcGateways接口
,提供了本地方法调用和远程方法调用两种模式。
@Override
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
checkNotNull(rpcEndpoint, "rpc endpoint");
final SupervisorActor.ActorRegistration actorRegistration =
registerAkkaRpcActor(rpcEndpoint);
final ActorRef actorRef = actorRegistration.getActorRef();
final CompletableFuture<Void> actorTerminationFuture =
actorRegistration.getTerminationFuture();
//启动RpcEndpoint对应的RPC服务
LOG.info(
"Starting RPC endpoint for {} at {} .",
rpcEndpoint.getClass().getName(),
actorRef.path());
final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
final String hostname;
Option<String> host = actorRef.path().address().host();
if (host.isEmpty()) {
hostname = "localhost";
} else {
hostname = host.get();
}
//解析EpcEndpoint实现的所有RpcGateway接口
Set<Class<?>> implementedRpcGateways =
new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));
//额外添加RpcServer和AkkaBasedEndpoint类
implementedRpcGateways.add(RpcServer.class);
implementedRpcGateways.add(AkkaBasedEndpoint.class);
final InvocationHandler akkaInvocationHandler;
//根据是否是FencedRpcEndpoint创建不同的动态代理对象
if (rpcEndpoint instanceof FencedRpcEndpoint) {
// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
akkaInvocationHandler =
new FencedAkkaInvocationHandler<>(
akkaAddress,
hostname,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
actorTerminationFuture,
((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
captureAskCallstacks);
implementedRpcGateways.add(FencedMainThreadExecutable.class);
} else {
akkaInvocationHandler =
new AkkaInvocationHandler(
akkaAddress,
hostname,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
actorTerminationFuture,
captureAskCallstacks);
}
// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader ClassLoader classLoader = getClass().getClassLoader();
@SuppressWarnings("unchecked")
RpcServer server =
(RpcServer)
Proxy.newProxyInstance(
classLoader,
implementedRpcGateways.toArray(
new Class<?>[implementedRpcGateways.size()]),
akkaInvocationHandler);
return server;
}
3. ResourceManager中RPC服务的启动
RpcServer在RpcEndpoint的构造器中完成初始化后,接下来就是启动RpcEndpoint和RpcServer,这里以ResourceManager为例进行说明。
在启动集群时,看下如何启动ResourceManager的RPC服务的。如下调用链
ClusterEntrypoint.startCluster->runCluster
->dispatcherResourceManagerComponentFactory.create
->resourceManager.start();
=>
public final void start() {
rpcServer.start();
}
继续探索RPC服务是如何启动的
首先在DefaultDispatcherResourceManagerComponentFactory中调用ResourceManager.start()方法启动ResourceManager实例,此时在ResourceManager.start()方法中会同步调用RpcServer.start()方法,启动ResourceManager所在RpcEndpoint中的RpcServer,如下。
在flink1.12中省略了AkkaInvocationHandler的干预。
经过以上步骤,指定组件的RpcEndpoint节点就正常启动
,此时RpcServer会作为独立的线程运行在JobManager或TaskManager进程中,处理本地和远程提交的RPC请求
。
4. 实现相互通讯能力
如代码所示,在AkkaRpcService.connect()方法中,完成了RpcConnection对象的创建。
@Override
public <C extends RpcGateway> CompletableFuture<C> connect(
final String address, final Class<C> clazz) {
return connectInternal(
address,
clazz,
(ActorRef actorRef) -> {
Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);
return new AkkaInvocationHandler(
addressHostname.f0,
addressHostname.f1,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
null,
captureAskCallstacks);
});
}
具体看AkkaRpcService.connectInternal()方法逻辑。
private <C extends RpcGateway> CompletableFuture<C> connectInternal(
final String address,
final Class<C> clazz,
Function<ActorRef, InvocationHandler> invocationHandlerFactory) {
checkState(!stopped, "RpcService is stopped");
LOG.debug(
"Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.",
address,
clazz.getName());
//获取actorRef实例
final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);
//进行handshake操作,确保需要连接的RpcEndpoint节点正常
final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =
actorRefFuture.thenCompose(
(ActorRef actorRef) ->
FutureUtils.toJava(
//调用Patterns.ask()方法,向actorRef对应的
//RpcEndpoint节点发送RemoteHandshakeMessage消息,
//确保连接的RpcEndpoint节点正常,如果成功,则
//RpcEndpoint会返回HandshakeSuccessMessage消息。
Patterns.ask(
actorRef,
new RemoteHandshakeMessage(
clazz, getVersion()),
configuration.getTimeout().toMilliseconds())
.<HandshakeSuccessMessage>mapTo(
ClassTag$.MODULE$
.<HandshakeSuccessMessage>apply(
HandshakeSuccessMessage
.class))));
//创建RPC动态代理类
return actorRefFuture.thenCombineAsync(
handshakeFuture,
(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);
// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and // all Flink // code is loaded dynamically (for example from an OSGI bundle) through a custom // ClassLoader ClassLoader classLoader = getClass().getClassLoader();
@SuppressWarnings("unchecked")
C proxy =
(C)
Proxy.newProxyInstance(
classLoader, new Class<?>[] {clazz}, invocationHandler);
return proxy;
},
actorSystem.dispatcher());
}
经过以上步骤,实现了创建RpcEndpoint组件之间的RPC连接,此时集群RPC组件之间可以进行相互访问,例如JobMaster可以向ResourceManager发送Slot资源请求。
RPC 服务启动的 Akka actor 能接收来自RpcGateway RPC 调用。
参考:《Flink设计与实现:核心原理与源码解析》–张利兵