前言
之前在《阿里开源分布式事务组件 seata :seata server 通信层解析》这篇文章中,站在 server 端的角度简单分析过 seata 的网络通信模块。
上篇文章中提到了一些概念,例如说:异步转同步的 Future 机制,防洪机制,以及消息队列机制等,并非只是服务端才具有的功能特性,实际上对于客户端来说也同样适用,它们是整个通用的通信机制。
这篇文章讲讲 client 角度的通信细节,以及在上篇文章中未提及的一些关键点,算是对 seata 整个 client-server 通信模块的一个总结。
已经在上篇文章提及的概念在这篇文章中就不再赘述。
客户端角色
在 seata 中,它对客户端是有角色区分的,它把客户端分为两种角色,分别是 TM 和 RM。
在 seata 的设计中, TM 是全局事务的发起者, RM 则是全局事务的参与者。
这两种角色在进行启动的时候,都要向 server 进行注册,这个注册也可以等同于一个认证的作用,只有已经注册的客户端才可以进行其它操作。
不过注册的意义不仅仅在于认证,特别是对于 RM 来说,很关键的一点是要向 server 注册它的 resourceId,这一点十分重要,它关系到 RM 的高可用问题。
如果某个 RM 停机了,而全局事务还未进行完,这个时候就可以采用委托给其它 RM 的方式,保证全局事务正常结束,这里一个关键的地方,就是 resourceId。
这篇文章暂时不在这里展开,后面再单独写一写这里的设计。
在 《阿里开源分布式事务组件 seata :seata server 通信层解析》这篇文章中,我们已经看到,无论是客户端还是服务端的实现,它们都以 AbstractRpcRemoting 作为抽象骨架。
AbstractRpcRemoting 这个类包含了客户端和服务端的公共逻辑,包括
- 消息的发送和接收逻辑
- future 机制
- 防洪机制
连接池
这些在前面的文章里已经有提到过了。
客户端的实现细节主要是在 AbstractRpcRemotingClient 类里,它继承自 AbstractRpcRemoting 类。
因为使用的是 netty,所以必然会有 Bootstrap 逻辑,seata 把这一块的逻辑单独放到 RpcClientBootstrap 这个类里去,然后只对外提供一个 getNewChannel 方法。
这样来看,创建连接的细节已经被良好的封装,并且 RpcClientBootstrap 的职责也非常明确,那就是建立新连接。
不过大多数客户端都会引入连接池的设计,以达到对连接的重用,避免频繁创建和销毁连接带来的额外开销。seata 也用了连接池管理机制。
不过 seata 的连接池管理是借助 apache 的 common pools 来实现的,common pools 是一个对象池,这里把一条连接 channel 当作一个对象,就变成了连接池了。
借助 common pools 实现对象池,需要实现 KeyedPoolableObjectFactory 接口对应的相关方法,关键是以下三个方法:
V makeObject(K key) throws Exception; //方法定义了新对象是如何创建的。
void destroyObject(K key, V obj) throws Exception; //方法定义了对象应该怎么样去销毁。
boolean validateObject(K key, V obj); //方法定义了如何检测对象的有效性。
另外还要指定对象的类型,以及用于存取对象的 key 的类型。
NettyPoolableFactory 这个类便实现了 KeyedPoolableObjectFactory 接口,并且定义了对象的类型是 Channel,而 key 是 NettyPoolKey。
makeObject 的实现主要分解为几个步骤,第一个是建立新连接:
InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());
if (LOGGER.isInfoEnabled()) {
LOGGER.info("NettyPool create channel to " + key);
}
Channel tmpChannel = clientBootstrap.getNewChannel(address);
long start = System.currentTimeMillis();
Object response;
Channel channelToServer = null;
if (null == key.getMessage()) {
throw new FrameworkException(
"register msg is null, role:" + key.getTransactionRole().name());
}
接下来,对连接进行注册,如果注册失败,则会抛出异常,并关闭刚刚建立的连接,这意味着一个连接建立以后,必须注册成功才能使用。如下代码所示:
try {
response = rpcRemotingClient.sendAsyncRequestWithResponse(tmpChannel, key.getMessage());
if (!isResponseSuccess(response, key.getTransactionRole())) {
rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());
} else {
channelToServer = tmpChannel;
rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response,
key.getMessage());
}
} catch (Exception exx) {
if (tmpChannel != null) {
tmpChannel.close();
}
throw new FrameworkException(
"register error,role:" + key.getTransactionRole().name() + ",err:" + exx.getMessage());
}
在上面我们也可以看到,注册消息是通过 NettyPoolKey 携带进来的。
销毁对象池的对象,在这里,其实就是关闭连接,而验证对象的有效性,其实就是验证连接的有效性。这两个方法的实现比较简单:
@Override
public void destroyObject(NettyPoolKey key, Channel channel) throws Exception {
if (null != channel) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("will destroy channel:" + channel);
}
channel.disconnect();
channel.close();
}
}
@Override
public boolean validateObject(NettyPoolKey key, Channel obj) {
if (null != obj && obj.isActive()) {
return true;
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("channel valid false,channel:" + obj);
}
return false;
}
连接管理
NettyClientChannelManager 这个类集成连接池机制和建立新连接的实现以后,成为了一个连接管理器。
当根据内部逻辑,需要向服务端发送信息时,都会通过 NettyClientChannelManager 获取连接。
虽然 NettyClientChannelManager 已经集成了一个连接池,不过它还是内部维护了 channel 的缓存
private final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<>();
当外部调用它来获取连接时,它先从自己的缓存里找,如果找到了,先验证channel 是否有效性,如果有效,直接返回这个 channel。
如果 channel 已经无效,或者没有缓存,那么就会委托连接池去建立新连接。这些细节在它的 acquireChannel 方法里:
/**
* Acquire netty client channel connected to remote server.
*
* @param serverAddress server address
* @return netty channel
*/
Channel acquireChannel(String serverAddress) {
Channel channelToServer = channels.get(serverAddress);
if (channelToServer != null) {
channelToServer = getExistAliveChannel(channelToServer, serverAddress);
if (null != channelToServer) {
return channelToServer;
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("will connect to " + serverAddress);
}
channelLocks.putIfAbsent(serverAddress, new Object());
synchronized (channelLocks.get(serverAddress)) {
return doConnect(serverAddress);
}
}
这里也可以看到,如果需要建立新连接,是需要加锁做同步的,NettyClientChannelManager 内部也维护了一串锁。
下面看看建立连接的细节,前面我们说到 注册消息是通过 NettyPoolKey 携带进来的。
在建立连接时,需要生成 NettyPoolKey, seata 在这里应用了 java8 的一个新的特性 Function,即函数式接口。
也就是说,一个 NettyPoolKey 如何生成,由外部传入一个 函数 来决定的。并不需要 NettyClientChannelManager 本身去关心,这个生成方式相关的变量就是
private Function<String, NettyPoolKey> poolKeyFunction;
它作为 NettyClientChannelManager 的构造函数的一个参数被传进来。
对于 RM Client 来说,它的 NettyPoolKey 的生成方式如下所示:
@Override
protected Function<String, NettyPoolKey> getPoolKeyFunction() {
return (serverAddress) -> {
String resourceIds = customerKeys == null ? getMergedResourceKeys() : customerKeys;
if (LOGGER.isInfoEnabled()) {
LOGGER.info("RM will register :" + resourceIds);
}
RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup);
message.setResourceIds(resourceIds);
return new NettyPoolKey(NettyPoolKey.TransactionRole.RMROLE, serverAddress, message);
};
}
它同时也决定了 NettyPoolKey 里面携带的 message 到底是 RegisterRMRequest 还是 RegisterTMRequest。
同理,对于 TM Client 来说,它也有它自己的 NettyPoolKey 生成规则:
@Override
protected Function<String, NettyPoolKey> getPoolKeyFunction() {
return (severAddress) -> {
RegisterTMRequest message = new RegisterTMRequest(applicationId, transactionServiceGroup);
return new NettyPoolKey(NettyPoolKey.TransactionRole.TMROLE, severAddress, message);
};
}
函数式编程的特点突然给人一种 IoC 的感觉。
那么回到 NettyClientChannelManager 建立连接的逻辑,上面我们已经解释过一些函数式和 NettyPoolKey 的背景了,直接看它的 doConnect 方法:
private Channel doConnect(String serverAddress) {
Channel channelToServer = channels.get(serverAddress);
if (channelToServer != null && channelToServer.isActive()) {
return channelToServer;
}
Channel channelFromPool;
try {
NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);
if (null != previousPoolKey && previousPoolKey.getMessage() instanceof RegisterRMRequest) {
RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();
((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());
}
channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));
channels.put(serverAddress, channelFromPool);
} catch (Exception exx) {
LOGGER.error(FrameworkErrorCode.RegisterRM.getErrCode(), "register RM failed.", exx);
throw new FrameworkException("can not register RM,err:" + exx.getMessage());
}
return channelFromPool;
}
可以看到,其实逻辑没什么特别的,就是去 对象池里新生成个对象,创建一个新的连接而已。不过这里也有可以注意的地方:
NettyClientChannelManager 不仅缓存了连接,维护了锁,它也缓存 NettyPoolKey,如果它发现要新建连接,并且 NettyPoolKey 在之前就缓存过的话,它会看看是否是 RM Client 建立的连接,如果是,那么需要更新一下 resource ids。前面我们有说到,RM 注册时,注册 resource ids 是非常重要的。
另外,NettyClientChannelManager 里还有一个 reconnect 方法,它是在 AbstractRpcRemotingClient 初始化的时候,作为一个定时任务运行的
@Override
public void init() {
clientBootstrap.start();
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
super.init();
它的作用,实际上是去注册中心查找当前服务组下有哪些可用的 server,定时检查与这些 server 之间的连接是否可用,如果不可用,则立马新建连接并缓存。
消息打包
在这个里我们也可以看到 mergeSendExecutorService 这个线程池,之前的文章《阿里开源分布式事务组件 seata :seata server 通信层解析》讲过这里,实际上它是消息队列机制,用来合并消息,再进行发送。我们在之前的文章里也提示过,seata server端的实现里,并没有定时去清除 basketMap,实际上是因为 server 端在通信时,根本没有用到这个 basketMap。
而客户端则有可能会用到,所以这里专门有一个线程池,去清空 basketMap 里的打包消息。
对于 seata 的通信模式和细节大体上就这些,本文到次结束。