案例设计

seata 官方给出了一系列 demo 样例,不过我在用的过程中发现总有这个那个的问题,所以自己维护了一份基于 dubbo 的 demo 在 github 上,适配的 seata 版本是 0.8.0。
案例的设计直接参考官方 quick start给出的案例:

整个案例分为三个服务,分别是存储服务、订单服务和账户服务,这些服务通过 dubbo 进行发布和调用,内部调用逻辑如上面图所示。
整个 demo 的工程样例如下所示:

undo_log 表

这个案例除了在数据库需要建立业务表以外,还要额外建立一张 undo_log 表,这个表的主要作用是记录事务的前置镜像和后置镜像。
全局事务进行到提交阶段,则删除该表对应的记录,全局事务如果需要回滚,则会利用这个表里记录的镜像数据,恢复数据。
undo_log 表里的数据实际上是“朝生夕死”的,数据不需要在表里存活太久。表结构如下所示:

CREATE TABLE `undo_log` (
                          `id` bigint(20) NOT NULL AUTO_INCREMENT,
                          `branch_id` bigint(20) NOT NULL,
                          `xid` varchar(100) NOT NULL,
                          `context` varchar(128) NOT NULL,
                          `rollback_info` longblob NOT NULL,
                          `log_status` int(11) NOT NULL,
                          `log_created` datetime NOT NULL,
                          `log_modified` datetime NOT NULL,
                          `ext` varchar(100) DEFAULT NULL,
                          PRIMARY KEY (`id`),
                          UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

服务逻辑

每个服务都对应了一个 starter 类,这个类主要用来在 spring 环境下,将该服务启动,并通过 dubbo 发布出去,以账户服务为例:

/**
 * The type Dubbo account service starter.
 */
public class DubboAccountServiceStarter {
    /**
     * 2. Account service is ready . A buyer register an account: U100001 on my e-commerce platform
     *
     * @param args the input arguments
     */
    public static void main(String[] args) {
        ClassPathXmlApplicationContext accountContext = new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-account-service.xml"});
        accountContext.getBean("service");
        JdbcTemplate accountJdbcTemplate = (JdbcTemplate) accountContext.getBean("jdbcTemplate");
        accountJdbcTemplate.update("delete from account_tbl where user_id = 'U100001'");
        accountJdbcTemplate.update("insert into account_tbl(user_id, money) values ('U100001', 999)");

        new ApplicationKeeper(accountContext).keep();
    }
}

首先通过 ClassPathXmlApplicationContext 读取 dubbo-account-service.xml 这个 spring 配置文件并启动 spring 容器环境,并通过 spring 的 jdbc template 对账户表的数据进行初始化。
dubbo-account-service.xml 配置文件中进行了各类 bean 的配置,包括 dubbo 与 spring 结合时的标准配置:

    <bean id="accountDataSourceProxy" class="io.seata.rm.datasource.DataSourceProxy">
        <constructor-arg ref="accountDataSource" />
    </bean>

    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="accountDataSourceProxy" />
    </bean>

    <dubbo:application name="dubbo-demo-account-service" />
    <dubbo:registry address="zookeeper://localhost:2181" />
    <dubbo:protocol name="dubbo" port="20881" />
    <dubbo:service interface="io.seata.samples.dubbo.service.AccountService" ref="service" timeout="10000"/>

    <bean id="service" class="io.seata.samples.dubbo.service.impl.AccountServiceImpl">
        <property name="jdbcTemplate" ref="jdbcTemplate"/>
    </bean>

    <bean class="io.seata.spring.annotation.GlobalTransactionScanner">
        <constructor-arg value="dubbo-demo-account-service"/>
        <constructor-arg value="my_test_tx_group"/>
    </bean>

这份配置里主要有两个需要引起注意的关键点

  1. jdbcTemplate 这个 bean 所依赖的数据源 bean,是一个类名为 io.seata.rm.datasource.DataSourceProxy 的数据源类,通过它的名字可以很明显地看出这是一个代理模式的应用,因为 seata 为完成全局事务的逻辑,需要在普通的 sql 操作前后添加一些逻辑,比如说 sql 执行前对 sql 进行语法解析,生成前置镜像,sql 执行后生成后置镜像,通过代理的方式,可以方便地对 connection,statement 等进行代理包装,在调用的时候进行拦截,加入自己的逻辑。
  2. 配置文件中还有一个 io.seata.spring.annotation.GlobalTransactionScanner 类型的 bean,这个 bean 是支撑 seata 能在 spring 环境中通过注解的方式来划定事务边界的基础。在 spring 容器启动时,会扫描 @GlobalTransactional 注解是否存在,这个注解标识了全局事务的开始和结束,也就是我们常说的“事务的边界”

业务逻辑

业务逻辑的具体详情在 BusinessServiceImpl 类中可以看到:

    @Override
    @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
    public void purchase(String userId, String commodityCode, int orderCount) {
        LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
        storageService.deduct(commodityCode, orderCount);
        orderService.create(userId, commodityCode, orderCount);
        // throw new RuntimeException("xxx");
    }

先调用存储服务,减少库存,然后调用订单服务,新建订单。这两个动作属于一个整体的事务,任何一个动作失败,都需要撤销所有的操作。
这个方法也有两个需要注意的点:

  1. 该方法上声明了 @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx") 这样的注解,用于让上文提到的 GlobalTransactionScanner 扫描的时候发现这是一个全局事务。
  2. 方法的最后有一行代码抛出了 RuntimeException,这主要是为了模仿全局事务的失败,并让 seata 走全局事务回滚逻辑。

事务扫描与边界定义

上文提到的 GlobalTransactionScanner 类,会在 spring 容器启动的时候,也被初始化。
在它的 afterPropertiesSet 方法被调用时,会触发 seata client 的初始化

    @Override
    public void afterPropertiesSet() {
        if (disableGlobalTransaction) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Global transaction is disabled.");
            }
            return;
        }
        initClient();

    }

关于 seata client 的初始化的细节,可以看看我写的另外一篇文章《阿里开源分布式事务组件 seata :seata client 通信层解析》
初始化客户端做的事情主要是建立与 seata server 的连接,并注册 TM 和 RM。接下来,在 wrapIfNecessary 方法里,实现对注解的扫描,并对添加了注解的方法添加 interceptor。
这篇文章里我们暂时不讨论 TCC 模式,只讨论 AT 模式,也暂不讨论全局事务锁 GlobalLock 的实现,先忽略这些有关的逻辑,只关注事务处理逻辑。

                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
                    if (!existsAnnotation(new Class[] {serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }
                    if (interceptor == null) {
                        interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                    }

在这里,interceptor 的实现是 GlobalTransactionalInterceptor,也就是说,以上文的案例为例子,当 BusinessServiceImpl 的 purchase 方法被调用的时候,实际上这个方法会被拦截器拦截,执行拦截器里的逻辑:

    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

        final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
        final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
        if (globalTransactionalAnnotation != null) {
            return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
        } else if (globalLockAnnotation != null) {
            return handleGlobalLock(methodInvocation);
        } else {
            return methodInvocation.proceed();
        }
    }

    private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
                                           final GlobalTransactional globalTrxAnno) throws Throwable {
        try {
            return transactionalTemplate.execute(new TransactionalExecutor() {
                @Override
                public Object execute() throws Throwable {
                    return methodInvocation.proceed();
                }

                public String name() {
                    String name = globalTrxAnno.name();
                    if (!StringUtils.isNullOrEmpty(name)) {
                        return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }

                @Override
                public TransactionInfo getTransactionInfo() {
                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
                    transactionInfo.setName(name());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
                case RollbackDone:
                    throw e.getOriginalException();
                case BeginFailure:
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case CommitFailure:
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackFailure:
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                default:
                    throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);
            }
        }
    }

在执行 handleGlobalTransaction 方法时,实际上采用模板模式,委托给了 TransactionalTemplate 类去执行标准的事务处理流程。如下所示:

   /**
     * Execute object.
     *
     * @param business the business
     * @return the object
     * @throws TransactionalExecutor.ExecutionException the execution exception
     */
    public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. get or create a transaction
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 1.1 get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        try {

            // 2. begin transaction
            beginTransaction(txInfo, tx);

            Object rs = null;
            try {

                // Do Your Business
                rs = business.execute();

            } catch (Throwable ex) {

                // 3.the needed business exception to rollback.
                completeTransactionAfterThrowing(txInfo,tx,ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            commitTransaction(tx);

            return rs;
        } finally {
            //5. clear
            triggerAfterCompletion();
            cleanUp();
        }
    }

事务处理逻辑实际上是一种模板,将事务相关的处理逻辑放在 try 块里,发现异常后执行回滚,正常执行则执行提交。
在这里有个需要注意的地方是,seata 不把提交这个动作放在 try 块里,因为在 seata 里,全局事务的提交实际上是可以异步执行的。
因为全局事务如果进行到提交这一阶段,那么意味着各个分支事务已经执行过本地提交,全局事务的提交阶段仅仅是删除 undo_log 里的记录,这个记录删除或者不删除,实际上不会改变全局事务已经正常完成的事实。所以它可以用程序异步去做,或者以人工介入的方式去做,所以 seata 认为,全局事务提交失败,不需要执行回滚流程。

03-05 23:19