TM发送 单个或批量 消息
以发送GlobalBeginRequest消息为例
TM在执行拦截器链路前将向TC发送GlobalBeginRequest 消息
io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
return response.getXid();
}
注意 消息TYPE_CODE 为 MessageType.TYPE_GLOBAL_BEGIN 值为 1
package io.seata.core.protocol.transaction;
import io.seata.core.protocol.MessageType;
import io.seata.core.rpc.RpcContext;
/**
* The type Global begin request.
*
* @author slievrly
*/
public class GlobalBeginRequest extends AbstractTransactionRequestToTC {
private int timeout = 60000;
private String transactionName;
/**
* Gets timeout.
*
* @return the timeout
*/
public int getTimeout() {
return timeout;
}
/**
* Sets timeout.
*
* @param timeout the timeout
*/
public void setTimeout(int timeout) {
this.timeout = timeout;
}
/**
* Gets transaction name.
*
* @return the transaction name
*/
public String getTransactionName() {
return transactionName;
}
/**
* Sets transaction name.
*
* @param transactionName the transaction name
*/
public void setTransactionName(String transactionName) {
this.transactionName = transactionName;
}
@Override
public short getTypeCode() {
return MessageType.TYPE_GLOBAL_BEGIN;
}
@Override
public AbstractTransactionResponse handle(RpcContext rpcContext) {
return handler.handle(this, rpcContext);
}
@Override
public String toString() {
StringBuilder result = new StringBuilder();
result.append("timeout=");
result.append(timeout);
result.append(",");
result.append("transactionName=");
result.append(transactionName);
return result.toString();
}
}
io.seata.tm.DefaultTransactionManager#syncCall
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
} catch (TimeoutException toe) {
throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
}
}
io.seata.core.rpc.netty.AbstractNettyRemotingClient#sendSyncRequest(java.lang.Object)
@Override
public Object sendSyncRequest(Object msg) throws TimeoutException {
// 获取seata服务端地址
String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
// TM RPC服务调用默认超时时间30s
long timeoutMillis = this.getRpcRequestTimeout();
// 将GlobalBeginRequest 封装为 RpcMessage
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
// send batch message
// put message into basketMap, @see MergedSendRunnable
// 是否开启seata客户端批量发送消息 1.5默认关闭
if (this.isEnableClientBatchSendRequest()) {
// send batch message is sync request, needs to create messageFuture and put it in futures.
// 批量发送消息需要将消息封装为 MessageFuture 对象 并添加到 futures Map集合中
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture);
// put message into basketMap
// 获取当前服务端地址对应的消息队列
BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
key -> new LinkedBlockingQueue<>());
// 将当前消息添加到队列中 一般不会添加失败 LinkedBlockingQueue 是无界队列
if (!basket.offer(rpcMessage)) {
LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
serverAddress, rpcMessage);
return null;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("offer message: {}", rpcMessage.getBody());
}
// 如果当前没有在发送队列消息 给mergeLock对象上锁成功 则唤醒所有等待发送消息的线程
// isSending 被volatile 修饰 保证可见性和有序性 但是不保证原子性
if (!isSending) {
synchronized (mergeLock) {
mergeLock.notifyAll();
}
}
try {
// MessageFuture 封装了 CompletableFuture 对象,此时会超时阻塞当前线程,超时时间30秒
// 等待 CompletableFuture.complete 完成获取结果
return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception exx) {
// 如果有异常抛出
LOGGER.error("wait response error:{},ip:{},request:{}",
exx.getMessage(), serverAddress, rpcMessage.getBody());
if (exx instanceof TimeoutException) {
throw (TimeoutException) exx;
} else {
throw new RuntimeException(exx);
}
}
} else {
// 如果没有开启客户端批量发送消息 先获取channel
Channel channel = clientChannelManager.acquireChannel(serverAddress);
// 同步发送消息 并将RPCMessage 封装为 MessageFuture,并设置超时时间 放入 futures Map集合中
// 由父类AbstractNettyRemoting的周期线程每隔3秒检查一次消息是否超时
// 发送消息时会添加 ChannelFutureListener 监听器,如果消息成功,则调用 CompletableFuture.complete 设置结果,
// 并将当前消息id对应的MessageFuture 从futures 中移除
return super.sendSync(channel, rpcMessage, timeoutMillis);
}
}
单个发送消息
如果是发送单个消息,则直接调用AbstractNettyRemoting.sendSync 向TC端发送消息
io.seata.core.rpc.netty.AbstractNettyRemoting#sendSync
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
if (timeoutMillis <= 0) {
throw new FrameworkException("timeout should more than 0ms");
}
if (channel == null) {
LOGGER.warn("sendSync nothing, caused by null channel.");
return null;
}
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
// 设置超时时间 用于检测是否超时 System.currentTimeMillis() - start > timeout
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture);
channelWritableCheck(channel, rpcMessage.getBody());
String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
doBeforeRpcHooks(remoteAddr, rpcMessage);
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
//根据消息id从futures中移除,不再进行消息超时检测
MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
if (messageFuture1 != null) {
//设置结果
messageFuture1.setResultMessage(future.cause());
}
//销毁连接
destroyChannel(future.channel());
}
});
try {
// 超时阻塞等待获取结果
Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
doAfterRpcHooks(remoteAddr, rpcMessage, result);
return result;
} catch (Exception exx) {
LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(),
rpcMessage.getBody());
if (exx instanceof TimeoutException) {
throw (TimeoutException) exx;
} else {
throw new RuntimeException(exx);
}
}
}
messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS) 会阻塞等待30s获取消息,超时则抛出异常 TimeoutException
批量发送消息
// send batch message is sync request, needs to create messageFuture and put it in futures.
// 批量发送消息需要将消息封装为 MessageFuture 对象 并添加到 futures Map集合中
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture);
// put message into basketMap
// 获取当前服务端地址对应的消息队列
BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
key -> new LinkedBlockingQueue<>());
// 将当前消息添加到队列中 一般不会添加失败 LinkedBlockingQueue 是无界队列
if (!basket.offer(rpcMessage)) {
LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
serverAddress, rpcMessage);
return null;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("offer message: {}", rpcMessage.getBody());
}
// 如果当前没有在发送队列消息 给mergeLock对象上锁成功 则唤醒所有等待发送消息的线程
// isSending 被volatile 修饰 保证可见性和有序性 但是不保证原子性
if (!isSending) {
synchronized (mergeLock) {
mergeLock.notifyAll();
}
}
try {
// MessageFuture 封装了 CompletableFuture 对象,此时会超时阻塞当前线程,超时时间30秒
// 等待 CompletableFuture.complete 完成获取结果
return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception exx) {
// 如果有异常抛出
LOGGER.error("wait response error:{},ip:{},request:{}",
exx.getMessage(), serverAddress, rpcMessage.getBody());
if (exx instanceof TimeoutException) {
throw (TimeoutException) exx;
} else {
throw new RuntimeException(exx);
}
}
如果批量发送消息,则会将消息放到basketMap 集合中,AbstractNettyRemotingClient会在初始化时,启动最大和核心都是一的单线程池线程池,提交MergedSendRunnable 任务,死循环不断遍历basketMap,获取等待发送的消息队列,最终由io.seata.core.rpc.netty.AbstractNettyRemoting#sendAsync 发送异步消息。需要注意的是,不管是发送同步消息还是异步消息,TM开启事务所属的线程都会因messageFuture.get 超时阻塞,只不过发送和获取返回消息都变成了异步。
public void run() {
// 死循环
while (true) {
//先上锁
synchronized (mergeLock) {
// 等待 1s 并释放当前锁
try {
mergeLock.wait(MAX_MERGE_SEND_MILLS);
} catch (InterruptedException e) {
}
}
isSending = true;
// 遍历Map集合
basketMap.forEach((address, basket) -> {
if (basket.isEmpty()) {
return;
}
MergedWarpMessage mergeMessage = new MergedWarpMessage();
// 弹出同一个seata服务器地址等待发送的所有消息,合并在一块发送
while (!basket.isEmpty()) {
RpcMessage msg = basket.poll();
// 获取消息体 与消息id 封装为 MergedWarpMessage
mergeMessage.msgs.add((AbstractMessage) msg.getBody());
mergeMessage.msgIds.add(msg.getId());
}
if (mergeMessage.msgIds.size() > 1) {
printMergeMessageLog(mergeMessage);
}
Channel sendChannel = null;
try {
// send batch message is sync request, but there is no need to get the return value.
// Since the messageFuture has been created before the message is placed in basketMap,
// the return value will be obtained in ClientOnResponseProcessor.
// 发送批量消息不会在此处阻塞等待消息的返回 将会采用异步的方式 由 ClientOnResponseProcessor 消息处理器获取返回消息
sendChannel = clientChannelManager.acquireChannel(address);
AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
} catch (FrameworkException e) {
if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
destroyChannel(address, sendChannel);
}
// fast fail
for (Integer msgId : mergeMessage.msgIds) {
MessageFuture messageFuture = futures.remove(msgId);
if (messageFuture != null) {
messageFuture.setResultMessage(
new RuntimeException(String.format("%s is unreachable", address), e));
}
}
LOGGER.error("client merge call failed: {}", e.getMessage(), e);
}
});
isSending = false;
}
}
接受异步消息由TM初始化时添加的ClientOnResponseProcessor 进行处理,将会遍历所有合并的消息,根据消息ID将其从futures中移除,并调用 future.setResultMessage 设置结果,此时TM发送消息时的阻塞状态将会被唤醒。
io.seata.core.rpc.processor.client.ClientOnResponseProcessor#process
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
//如果当前消息属于合并发送的消息
if (rpcMessage.getBody() instanceof MergeResultMessage) {
//获取消息体与消息ID,并将消息id对应的MessageFuture从futures中移除,不再进行超时检测
MergeResultMessage results = (MergeResultMessage) rpcMessage.getBody();
MergedWarpMessage mergeMessage = (MergedWarpMessage) mergeMsgMap.remove(rpcMessage.getId());
//遍历所有的消息
for (int i = 0; i < mergeMessage.msgs.size(); i++) {
int msgId = mergeMessage.msgIds.get(i);
MessageFuture future = futures.remove(msgId);
if (future == null) {
LOGGER.error("msg: {} is not found in futures, result message: {}", msgId,results.getMsgs()[i]);
} else {
//在此时设置消息结果 结束阻塞等待
future.setResultMessage(results.getMsgs()[i]);
}
}
// 与合并消息的处理是一致的
} else if (rpcMessage.getBody() instanceof BatchResultMessage) {
try {
BatchResultMessage batchResultMessage = (BatchResultMessage) rpcMessage.getBody();
for (int i = 0; i < batchResultMessage.getMsgIds().size(); i++) {
int msgId = batchResultMessage.getMsgIds().get(i);
MessageFuture future = futures.remove(msgId);
if (future == null) {
LOGGER.error("msg: {} is not found in futures, result message: {}", msgId, batchResultMessage.getResultMessages().get(i));
} else {
future.setResultMessage(batchResultMessage.getResultMessages().get(i));
}
}
} finally {
// In order to be compatible with the old version, in the batch sending of version 1.5.0,
// batch messages will also be placed in the local cache of mergeMsgMap,
// but version 1.5.0 no longer needs to obtain batch messages from mergeMsgMap
mergeMsgMap.clear();
}
} else {
MessageFuture messageFuture = futures.remove(rpcMessage.getId());
if (messageFuture != null) {
messageFuture.setResultMessage(rpcMessage.getBody());
} else {
if (rpcMessage.getBody() instanceof AbstractResultMessage) {
if (transactionMessageHandler != null) {
transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);
}
}
}
}
}
TC 处理 GlobalBeginRequest 消息
NettyChannel消息处理
io.seata.core.rpc.netty.NettyRemotingServer#init
@Override
public void init() {
// registry processor
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
}
}
io.seata.core.rpc.netty.NettyRemotingServer#registerProcessor
private void registerProcessor() {
// 1. registry on request message processor
ServerOnRequestProcessor onRequestProcessor =
new ServerOnRequestProcessor(this, getHandler());
ShutdownHook.getInstance().addDisposable(onRequestProcessor);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
//处理GlobalBeginRequest消息 ServerOnRequestProcessor
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
// 2. registry on response message processor
ServerOnResponseProcessor onResponseProcessor =
new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);
// 3. registry rm message processor
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
// 4. registry tm message processor
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
// 5. registry heartbeat message processor
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}
进一步跟进查看具体如何处理
io.seata.core.rpc.processor.server.ServerOnRequestProcessor#process
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
// channel是否已经注册
if (ChannelManager.isRegistered(ctx.channel())) {
onRequestMessage(ctx, rpcMessage);
} else {
try {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
}
ctx.disconnect();
ctx.close();
} catch (Exception exx) {
LOGGER.error(exx.getMessage());
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
}
}
}
进入io.seata.core.rpc.processor.server.ServerOnRequestProcessor#onRequestMessage
private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
Object message = rpcMessage.getBody();
RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message,
NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
} else {
try {
BatchLogHandler.INSTANCE.getLogQueue()
.put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:"
+ rpcContext.getTransactionServiceGroup());
} catch (InterruptedException e) {
LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);
}
}
//GlobalBeginRequest 继承 AbstractTransactionRequest 继承 AbstractMessage
if (!(message instanceof AbstractMessage)) {
return;
}
// 合并消息处理
// the batch send request message
if (message instanceof MergedWarpMessage) {
//是否开启了TC批量响应 默认false rpcContext 的 version 不为空并且大于等于 1.5.0
// 如果满足 则使用 MergedWarpMessage 来处理请求消息
if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion())
&& Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {
List<AbstractMessage> msgs = ((MergedWarpMessage)message).msgs;
List<Integer> msgIds = ((MergedWarpMessage)message).msgIds;
//遍历处理
for (int i = 0; i < msgs.size(); i++) {
AbstractMessage msg = msgs.get(i);
int msgId = msgIds.get(i);
//是否开启并发处理消息 默认关闭
if (PARALLEL_REQUEST_HANDLE) {
CompletableFuture.runAsync(
() -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));
} else {
//单个消息处理
handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);
}
}
} else {
// results 响应结果集 如果开启了并发处理消息 需要保证线程安全
// completableFutures 并发处理消息
List<AbstractResultMessage> results = new CopyOnWriteArrayList<>();
List<CompletableFuture<Void>> completableFutures = null;
for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) {
// 默认关闭 没有开启并发处理消息 如果开启了 则使用 completableFutures 来并发处理消息
if (PARALLEL_REQUEST_HANDLE) {
if (completableFutures == null) {
completableFutures = new ArrayList<>();
}
int finalI = i;
// 并发异步处理消息,并将结果添加到results中
completableFutures.add(CompletableFuture.runAsync(() -> {
results.add(finalI, handleRequestsByMergedWarpMessage(
((MergedWarpMessage)message).msgs.get(finalI), rpcContext));
}));
} else {
// 处理消息并按顺序添加到results集合中
results.add(i,
handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));
}
}
if (CollectionUtils.isNotEmpty(completableFutures)) {
try {
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("handle request error: {}", e.getMessage(), e);
}
}
MergeResultMessage resultMessage = new MergeResultMessage();
resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
}
} else {
// the single send request message
final AbstractMessage msg = (AbstractMessage) message;
AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
}
}
io.seata.core.rpc.processor.server.ServerOnRequestProcessor#handleRequestsByMergedWarpMessage
private AbstractResultMessage handleRequestsByMergedWarpMessage(AbstractMessage subMessage, RpcContext rpcContext) {
return transactionMessageHandler.onRequest(subMessage, rpcContext);
}
io.seata.server.coordinator.DefaultCoordinator#onRequest
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
if (!(request instanceof AbstractTransactionRequestToTC)) {
throw new IllegalArgumentException();
}
AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
transactionRequest.setTCInboundHandler(this);
return transactionRequest.handle(context);
}
io.seata.core.protocol.transaction.GlobalBeginRequest#handle
public AbstractTransactionResponse handle(RpcContext rpcContext) {
return handler.handle(this, rpcContext);
}
io.seata.server.AbstractTCInboundHandler#handle(io.seata.core.protocol.transaction.GlobalBeginRequest, io.seata.core.rpc.RpcContext)
public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
GlobalBeginResponse response = new GlobalBeginResponse();
exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
@Override
public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
try {
//真正处理beging消息
doGlobalBegin(request, response, rpcContext);
} catch (StoreException e) {
throw new TransactionException(TransactionExceptionCode.FailedStore,
String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()),
e);
}
}
}, request, response);
return response;
}
获取全局事务XID
io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
// 通过DefaultCore根据应用ID,事务分组名称,超时时间 创建并开启一个新的事务 返回全局事务XID 放入GlobalBeginResponse中
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
}
}
全局事务会话持久化
io.seata.server.coordinator.DefaultCore#begin
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
// 创建全局事务对象 由XID.generateXID 生成全局事务XID
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
timeout);
// 将xid放进ThreadLocal中
MDC.put(RootContext.MDC_KEY_XID, session.getXid());
// SessionHolder.getRootSessionManager() 根据SPI机制去查找
// SessionHolder 在 io.seata.server.Server.start 启动时初始化,获取当前配置的 会话持久机制模式 ,调用 SessionHolder.init(sessionStoreMode)进行初始化
// 此时我们通过 SessionHolder.getRootSessionManager() 将使用seata的SPI机制去 META-INF/services/ 与 META-INF/seata/ 目录下查找
// 文件名为 io.seata.server.session.SessionManager 的文件 在根据 @LoadLevel注解的name值加载需要的对象
// 例如 如果我们此时的session持久化模式为DB,那么 SessionHolder.getRootSessionManager() 将加载返回 DataBaseSessionManager 对象
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
//开启事务 标记GlobalSession 的事务状态为 1 ,并记录开启时间与激活状态 调用SessionLifecycleListener监听器的onBegin方法
session.begin();
// transaction start event
//发送事务开启事件
MetricsPublisher.postSessionDoingEvent(session, false);
//返回XID
return session.getXid();
}
Seata的SPI机制会根据 EnhancedServiceLoader.load(类, 名称) 方法参数一的类的全限定类名,从META-INF/services/ 与 META-INF/seata/ 路径下去匹配,然后根据匹配到的类的全限定类名,定位到具体的类,再根据参数二名称与@LoadLevel注解的name值进行匹配 ,确定要加载的对象,如下所示
加载 DataBaseSessionManager 对象后,添加其到session的生命监听器列表中,在执行session.begin方法时,调用监听器的onBegin方法,进而由父类AbstractSessionManager 执行 onBegin方法,并调用DataBaseSessionManager重写的addGlobalSession方法
io.seata.server.storage.db.session.DataBaseSessionManager#addGlobalSession
@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {
if (StringUtils.isBlank(taskName)) {
// 将全局事务会话信息写入数据库
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
if (!ret) {
throw new StoreException("addGlobalSession failed.");
}
} else {
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
if (!ret) {
throw new StoreException("addGlobalSession failed.");
}
}
}
io.seata.server.storage.db.store.DataBaseTransactionStoreManager#writeSession
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
// logStore 封装了 DataSource 根据不同类型的数据源生成相应的insert语句 持久化到数据库中
return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else {
throw new StoreException("Unknown LogOperation:" + logOperation.name());
}
}
io.seata.server.storage.db.store.LogStoreDataBaseDAO#insertGlobalTransactionDO
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
// 生成SQL插入 global_table (默认) 表中,持久化全局事务会话
String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
Connection conn = null;
PreparedStatement ps = null;
try {
int index = 1;
conn = logStoreDataSource.getConnection();
//自动提交
conn.setAutoCommit(true);
ps = conn.prepareStatement(sql);
ps.setString(index++, globalTransactionDO.getXid());
ps.setLong(index++, globalTransactionDO.getTransactionId());
ps.setInt(index++, globalTransactionDO.getStatus());
ps.setString(index++, globalTransactionDO.getApplicationId());
ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());
String transactionName = globalTransactionDO.getTransactionName();
transactionName = transactionName.length() > transactionNameColumnSize ?
transactionName.substring(0, transactionNameColumnSize) :
transactionName;
ps.setString(index++, transactionName);
ps.setInt(index++, globalTransactionDO.getTimeout());
ps.setLong(index++, globalTransactionDO.getBeginTime());
ps.setString(index++, globalTransactionDO.getApplicationData());
return ps.executeUpdate() > 0;
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(ps, conn);
}
}
最终生成的SQL如下所示
insert into global_table(xid, transaction_id, status, application_id, transaction_service_group, transaction_name,
timeout, begin_time, application_data, gmt_create, gmt_modified)
values (全局事务XID, 事务id, 事务状态,应用id, 事务分组, 事务名称, 超时时间, 开始时间, 应用数据, now(), now())
数据库中的数据
返回GlobalBeginResponse消息
在创建全局事务并开启后,拿到XID后封装到GlobalBeginResponse中,最终由remotingServer.sendAsyncResponse将GlobalBeginResponse消息返回给TM
TM处理 GlobalBeginResponse 消息
在seata服务端返回 GlobalBeginResponse 消息 后, TM还是由 io.seata.core.rpc.processor.client.ClientOnResponseProcessor#process 处理接收到的消息,通过调用future.setResultMessage 设置消息结果,并恢复阻塞的TM发送GlobalBeginRequest消息的线程,将结果返回
io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)
public void begin(int timeout, String name) throws TransactionException {
if (role != GlobalTransactionRole.Launcher) {
assertXIDNotNull();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNull();
String currentXid = RootContext.getXID();
if (currentXid != null) {
throw new IllegalStateException("Global transaction already exists," +
" can't begin a new global transaction, currentXid = " + currentXid);
}
// 获取到seata服务端返回到XID
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
//将xid绑定到RootContext中
RootContext.bind(xid);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [{}]", xid);
}
}
获取到XID后,将XID绑定到RootContext中,至此,全局事务的开启过程也就结束了。