前言
先前在《阿里开源分布式事务组件 seata :demo 环境搭建以及运行流程简析》 这篇文章中已经提到过:
seata 客户端在处理事务逻辑的时候,实际上采用模板模式,委托给了 TransactionalTemplate 类去执行标准的事务处理流程,如下代码所示:
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 1.1 get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
try {
// 2. begin transaction
beginTransaction(txInfo, tx);
Object rs = null;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3.the needed business exception to rollback.
completeTransactionAfterThrowing(txInfo,tx,ex);
throw ex;
}
// 4. everything is fine, commit.
commitTransaction(tx);
return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
}
在客户端的事务处理流程中,流程比较清晰,处理流程也不复杂,除了客户端自身采用了一些机制,其实 seata 把比较“重”的逻辑都放在了 server 端。
比如说,开启一个全局事务时,事务 id 如何生成,事务的信息如何存储,这些是不需要客户端的关心的。
当我们使用标准的 JDBC 规范来处理单库数据库事务时,代码几乎都和下面是同一个模板:
conn.setAutocommit(false);
try {
// 在这里使用 connection 进行 sql 操作;
conn.xxxxxx
//如果一切正常,则直接进行提交
conn.commit();
} catch (Exception e) {
conn.rollback();
}
虽然分布式事务和单机事务在“分布式” 和 “单机” 上区别很大,但在 “事务” 这个角度,却是相同的。
我们可以看到 seata 客户端的事务处理逻辑,跟单机事务的处理逻辑大同小异。
有差异的两个地方主要是:
- seata 中的全局事务如果提交失败,是不需要进行回滚,会有别的补救措施。
- 针对事务主体执行期间发生的异常,是不一定要回滚的,遇到有些异常可以直接提交,而有时候又可以直接忽略,不过这块跟具体场景有关系,默认出现了异常就是要回滚的。
事务信息
TransactionTemplate 处理事务时,必须知道事务的配置信息,包括:
- 超时时间
- 事务标识名称
- 回滚时机或者不回滚时机
这些信息主要通过在方法上的 @GlobalTransactional 注解携带进来,我们可以看一下这个注解包含了哪些属性:
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Inherited
public @interface GlobalTransactional {
int timeoutMills() default TransactionInfo.DEFAULT_TIME_OUT;
String name() default "";
Class<? extends Throwable>[] rollbackFor() default {};
String[] rollbackForClassName() default {};
Class<? extends Throwable>[] noRollbackFor() default {};
String[] noRollbackForClassName() default {};
}
这些信息由 TransactionExecutor 携带给 TransactionTemplate。
TransactionExecutor 这个类除了携带事务信息 TransactionInfo,它还携带了事务的执行主体,即标识了 @GlobalTransactional 注解的方法的方法体。
调用 TransactionExecutor 的 invoke 方法,就相当于执行了事务的执行主体。
TransactionExecutor 是一个接口,我们通过观察它的匿名实现类的构造方式,就能正式上面说的观点,例如 GlobalTransactionInterceptor 的 handleGlobalTransaction 方法:
private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable {
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
public String name() {
String name = globalTrxAnno.name();
if (!StringUtils.isNullOrEmpty(name)) {
return name;
}
return formatMethod(methodInvocation.getMethod());
}
@Override
public TransactionInfo getTransactionInfo() {
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
transactionInfo.setName(name());
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.rollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException e) {
TransactionalExecutor.Code code = e.getCode();
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
throw e.getCause();
default:
throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);
}
}
全局事务的创建
- 事务的发起者,需要创建全局事务,向 seata server 申请全局事务 id
- 事务的参与者,则需要获取已经创建好的全局事务,以便将自己注册为正确全局事务下的一个事务分支,认清自己的身份....
GlobalTransactionContext 这个类负责上述说的两个功能,事务发起者调用它提供的 getCurrentOrCreate 方法创建全局事务,事务的参与者调用它的 getCurrent 方法获取当前所处的全局事务(实际上参与者也是根据全局事务 xid 进行创建)。
通过这些方法获取到的是一个 GlobalTransaction 接口的具体实现实例。代表了一个全局事务。
GlobalTransaction 提供了事务的操作流程 api,例如基本的 begin、commit 和 rollback,另外还有事务的状态,还有 server 分配的全局事务 id;
接口定义如下所示:
public interface GlobalTransaction {
void begin() throws TransactionException;
void begin(int timeout) throws TransactionException;
void begin(int timeout, String name) throws TransactionException;
void commit() throws TransactionException;
void rollback() throws TransactionException;
GlobalStatus getStatus() throws TransactionException;
String getXid();
}
无论是参与者还是发起者,都需要在创建 GlobalTransaction 实例的时候,把它自己的身份讲清楚。由 GlobalTransactionRole 来定义这些角色。
全局事务 xid 的传播
全局事务的参与者和发起者是不要求部署在同一个操作系统环境下的,否则就分布式事务就谈不上分布式了。
因此,全局事务的发起者在向 server 注册了全局事务 id 后,必须通过某种方式把全局事务 xid 通过服务的调用链传下去。
如果 seata 是在 dubbo 的环境下运行的,那么通过 dubbo 调用方式(例如说泛化调用、或者可变参数、或者直接修改底层协议),就能够在服务调用时,将 xid 在整个服务调用链上传播。
在 seata 中,RootContext 这个类,就是用来保存参与者和发起者之间传播的 xid 的。传播流程大致如下:
- 发起者开启全局事务后,将 xid 塞进 RootContext 里
- 服务框架将 xid 沿着服务调用链一直传播,塞进每个参与者进程的 RootContext 里。
- 参与者发现 RootContext 里的 xid 存在时,它便知道自己处于全局事务中,并且知道 xid 是什么。
通过 GlobalTransactionContext 的 getCurrent 方法,我们可以看到这些事实:
/**
* Get GlobalTransaction instance bind on current thread.
*
* @return null if no transaction context there.
*/
private static GlobalTransaction getCurrent() {
String xid = RootContext.getXID();
if (xid == null) {
return null;
}
return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
}
与 server 端如何交互
GlobalTransaction 定义的事务操作 api,其具体实现类需要真正与服务端通信了。以 begin 这个 api 为例,它的默认实现类 DefaultGlobalTransaction 是这样实现的:
public void begin(int timeout, String name) throws TransactionException {
if (role != GlobalTransactionRole.Launcher) {
check();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [" + xid + "]");
}
return;
}
if (xid != null) {
throw new IllegalStateException();
}
if (RootContext.getXID() != null) {
throw new IllegalStateException();
}
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
RootContext.bind(xid);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [" + xid + "]");
}
}
实际上除了一些必要的条件检查和其它附带操作,它把与服务端具体的通信委托给了 TransactionManager 去做,那我们再看看 TransactionManager 做了什么事:
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
return response.getXid();
}
逻辑也十分简单,就是发送一个请求,然后接收一个响应。
不得不说,seata 在代码逻辑的抽象和结构层次这方面做得很好。
钩子及异常处理
seata 客户端通过 TransactionHook 这个接口,在一个事务的处理过程中,允许为它的各个阶段添加钩子逻辑
public interface TransactionHook {
void beforeBegin();
void afterBegin();
void beforeCommit();
void afterCommit();
void beforeRollback();
void afterRollback();
void afterCompletion();
}
由于全局事务的提交和回滚依然是由可能失败的,比如说全局事务如果提交失败,意味着 undo_log 表里的数据目前是没有删除成功的,可能需要记录并在后面重试删除;
如果回滚失败,意味着当前这个全局事务属于异常结束,需要特殊处理、甚至人工介入。
seata 通过 FailureHandler 这个接口,提供了一个可扩展的点:
public interface FailureHandler {
void onBeginFailure(GlobalTransaction tx, Throwable cause);
void onCommitFailure(GlobalTransaction tx, Throwable cause);
void onRollbackFailure(GlobalTransaction tx, Throwable cause);
}