你才不是只会理论的女同学-seata实践篇-LMLPHP

本文主要内容为seata的实践篇,理论知识不懂的请参考前文:

我还不懂什么是分布式事务

主要介绍两种最常用的TCC和AT模式。

环境信息:

mysql:5.7.32

seata-server:1.4.1

SpringCloud:Hoxton.SR10

SpringBoot:2.3.8.RELEASE

注册中心:Eureka

涉及服务:

你才不是只会理论的女同学-seata实践篇-LMLPHP

Seata-server

1、在file.conf中修改

mode = "db"

然后配置DB信息:

  ## database store property
  db {
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://127.0.0.1:3306/seata"
    user = "root"
    password = "123456"
    minConn = 5
    maxConn = 100
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  }

2、在register.conf

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "eureka"
  loadBalance = "RandomLoadBalance"
  loadBalanceVirtualNodes = 10
  eureka {
    serviceUrl = "http://eureka-chengdu:8761/eureka,http://eureka-hangzhou:8762/eureka"
    application = "seata-server"
    weight = "1"
  }

3、客户端修改

这里所指的客户端包含所有的资源管理器,包含所有需要seata-server管理的服务

在服务启动yml中增加:

seata:
  enabled: true
  # 事务群组(可以每个应用独立取名,也可以使用相同的名字)
  tx-service-group: my_tx_group
  client:
    rm-report-success-enable: true
    # 异步提交缓存队列长度(默认10000)
    rm-async-commit-buffer-limit: 1000
    # 一阶段全局提交结果上报TC重试次数(默认1次,建议大于1)
    tm-commit-retry-count:   3
    # 一阶段全局回滚结果上报TC重试次数(默认1次,建议大于1)
    tm-rollback-retry-count: 3
    support:
      # 数据源自动代理开关(默认false关闭)
      spring-datasource-autoproxy: false
  service:
    vgroup-mapping:
      # TC 集群(必须与seata-server保持一致)
      my_tx_group: seata-server
    grouplist:
      default: seata-server:8091
  registry:
    type: eureka
    eureka:
      serviceUrl: http://eureka-chengdu:8761/eureka/,http://eureka-hangzhou:8762/eureka/

TCC模式

TCC模式实践需要四个服务,除了seata-server外,其他服务调用关系如下:

你才不是只会理论的女同学-seata实践篇-LMLPHP

business服务是全局事务的发起者,需要增加@GlobalTransactional注解

@Override
@GlobalTransactional
public String processTcc(Map<String, String> params) {
    String xid = RootContext.getXID();
    System.out.println(("---》》》》xid:" + xid));
    uploadFeign.upload(params);
    downloadFeign.download(params);
    return xid;
}

business服务会通过feign远程调用upload和download服务,这两个服务都要声明TCC的三个接口,并通过TwoPhaseBusinessAction注解声明。

upload服务:

@LocalTCC
public interface TccService {
    @TwoPhaseBusinessAction(name = "upload", commitMethod = "commitTcc", rollbackMethod = "cancel")
    String upload(@BusinessActionContextParameter(paramName = "params") Map<String, String> params);

    boolean commitTcc(BusinessActionContext context);

    boolean cancel(BusinessActionContext context);
}

具体实现,这里模拟了TCC结果并放到Result中,通过restful接口可以查看,实际业务需要考虑防悬挂空回滚问题,例子只是简单描述如何使用TCC模式:

@Slf4j
@Service
public class TccServiceImpl implements TccService {
    @Value("${spring.application.name}")
    private String appName;

    @PostConstruct
    private void initAppName() {
        Result.getResult().setAppName(appName);
    }

    @Override
    public String upload(Map<String, String> params) {
        String xid = RootContext.getXID();
        System.out.println(("---》》》》xid: " + xid));
        return "success";
    }

    @Override
    public boolean commitTcc(BusinessActionContext context) {
        String xbid = context.getXid();
        System.out.println(("---》》》》xid: " + xbid + "提交成功"));
        Result.getResult().setActionResult(context.getXid(), +context.getBranchId(), "Commit", context.getActionContext("params"));
        return true;
    }

    @Override
    public boolean cancel(BusinessActionContext context) {
        System.out.println(("---》》》》xid: " + context.getXid() + "回滚成功"));
        Result.getResult().setActionResult(context.getXid(), context.getBranchId(), "Rollback", context.getActionContext("params"));
        return true;
    }
}

download服务

download服务也同样需要声明一个TCC接口,实现上在Try阶段模拟每三次调用,,延迟30s失败一次场景

@Override
public String download(Map<String, String> params) {
    String xid = RootContext.getXID();
    System.out.println(("---》》》》xid: " + xid));
    if (count.incrementAndGet() % 3 == 0) {
        try {
            TimeUnit.SECONDS.sleep(30);
        } catch (InterruptedException e) {
            log.warn("InterruptedException", e);
        }
        throw new RuntimeException("服务异常");
    }
    return "success";
}

测试

1、通过restful接口调用两次,返回全局事务ID

你才不是只会理论的女同学-seata实践篇-LMLPHP

2、查看download和upload服务结果,可以看到结果都是成功。

你才不是只会理论的女同学-seata实践篇-LMLPHP

upload结果:

你才不是只会理论的女同学-seata实践篇-LMLPHP

3、通过日志查看

seata-server:

你才不是只会理论的女同学-seata实践篇-LMLPHP

business服务日志:

你才不是只会理论的女同学-seata实践篇-LMLPHP

第三次调用

第三次模拟的是download服务失败场景,所以能看到download结果是失败回滚

你才不是只会理论的女同学-seata实践篇-LMLPHP

但是upload服务并没有异常,来看下是否能够和download保证事务的一致性,结果都是回滚呢?

从结果看到两个服务结果是相同的,从而也看出保证了事务一致性。

你才不是只会理论的女同学-seata实践篇-LMLPHP

从seata-server日志也能看到回滚成功的信息:

你才不是只会理论的女同学-seata实践篇-LMLPHP

AT模式

AT模式实践需要四个服务,除了seata-server外,其他服务调用关系如下:

你才不是只会理论的女同学-seata实践篇-LMLPHP

模拟电商场景,下订单、减库存,处理成功后,第三次调用时延时30s抛出异常

业务触发测,也就是全局事务发起服务business服务:

    @Override
    @GlobalTransactional(rollbackFor = Exception.class)
    public String processAt(String userId, int orderMoney, String commodityCode, int count) throws InterruptedException {
        String xid = RootContext.getXID();
        System.out.println("---》》》》xid:" + xid);
        System.out.println(("------->创建订单开始"));
        orderFeign.create(userId, commodityCode, count, orderMoney);
        System.out.println(("------->创建订单结束"));

        System.out.println(("------->扣减库存开始"));
        storageFeign.deduct(commodityCode, count);
        System.out.println(("------->扣减库存结束"));

        if (visitCount.incrementAndGet() % 3 == 0) {
            TimeUnit.SECONDS.sleep(30);
            throw new RuntimeException("process failed");
        }
        return xid;
    }

order服务、storage服务

除了配置中增加seata外,与普通的入库服务是一样的

@Service
public class OrderDao {
    @Autowired
    private JdbcTemplate jdbcTemplate;

    public boolean createOrder(Order order) {
        String sql = "INSERT INTO product_order (user_id,product_id,count,money,status) VALUES('" + order.getUserId() + "', " + order.getProductId() + "," + order.getCount() + ", " + order.getMoney() + ",0);";
        return jdbcTemplate.update(sql) > 0;
    }
}

调用之前:数据:

order数据库为空,

storage数据库中库存字段为100:

你才不是只会理论的女同学-seata实践篇-LMLPHP

调用两次后:

你才不是只会理论的女同学-seata实践篇-LMLPHP

数据库结果:

你才不是只会理论的女同学-seata实践篇-LMLPHP

你才不是只会理论的女同学-seata实践篇-LMLPHP

第三次调用是模拟延时30s后失败场景,也就是书库更新后,处理数据失败,应该从数据库看到数据回滚过程,延迟30s也是为了更好的观察结果,也可以用debug方式观察结果

你才不是只会理论的女同学-seata实践篇-LMLPHP

你才不是只会理论的女同学-seata实践篇-LMLPHP

也可以看到

你才不是只会理论的女同学-seata实践篇-LMLPHP

rollback_info中:

你才不是只会理论的女同学-seata实践篇-LMLPHP

30s后重新查询书库可以看到storage库存变回98,order记录减少为2条,同时undo_log和seata相关表中数据被清空。

你才不是只会理论的女同学-seata实践篇-LMLPHP

从seata-server日志也能看到回滚信息。

华为

同时看下华为的分布式事务解决方案,相比于seata直观的就是多了交互命令行,从上面例子也可以看出seata目前还只能通过数据库查看结果

其他和seata类似提供了TCC和非侵入两种方案

你才不是只会理论的女同学-seata实践篇-LMLPHP

seata-golang

参考容器时代:seata-golang 接入指南

总结

例子中涉及的代码已上传到github

https://github.com/stevenniu9527/nerry

如果有时间还是建议自己敲一遍代码,看别人的东西都会觉得很简单,一看就会

但是当自己实操时就会发现各种奇奇怪怪的异常,一用就废

纸上得来终觉浅,绝知此事要躬行,

比如例子中子pom依赖为什么不需要配置版本、eureka两个怎么互为副本的、seata相关表中具体数据是什么、debug和延迟30s是否会对seata有影响

这些问题自己敲一遍会有更深的理解,更何况代码量是如此的少。

最后觉得写的还行,求关注,求点赞,求转发

04-07 01:11
查看更多