RocketMQ事务流程关键

代码实现

首先假设我们有这样一个需求:

用户请求订单微服务 order-service 接口删除订单(退货),删除订单后需要发送消息给用户服务 account-service,用户微服务收到消息后会给用户账户增加余额。

这个需求跟钱相关,肯定要保证消息的事务性,接下来我们根据上面的原理实现整个流程。

基础配置

生产者order-servcie和account-service都要引入RocketMQ相关依赖,增加RocketMQ的相关配置

<dependency>
 <groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
# within rocketmq
rocketmq:
  name-server: xxx.xx.x.xx:9876; xxx.xx.x.xx:9876
  producer:
    group: cloud-group

发送半消息

order-service在执行删除订单操作时发送一条半消息给MQServer,发送半消息主要是使用 rocketMQTemplate.sendMessageInTransaction() 方法,发送事务消息。

@Override
public void delete(String orderNo) {
 Order order = orderMapper.selectByNo(orderNo);
 //如果订单存在且状态为有效,进行业务处理
 if (order != null && CloudConstant.VALID_STATUS.equals(order.getStatus())) {
  String transactionId = UUID.randomUUID().toString();
  //如果可以删除订单则发送消息给rocketmq,让用户中心消费消息
  rocketMQTemplate.sendMessageInTransaction("add-amount",
    MessageBuilder.withPayload(
      UserAddMoneyDTO.builder()
        .userCode(order.getAccountCode())
        .amount(order.getAmount())
        .build()
    )
    .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
    .setHeader("order_id",order.getId())
    .build()
    ,order
  );
 }
}

首先先校验一下订单状态,然后发送消息给MQServer,这个逻辑大家都看得懂,主要是关注 sendMessageInTransaction() 方法,源码如下:

public TransactionSendResult sendMessageInTransaction(String destination, Message<?> message, Object arg) throws MessagingException {
 try {
  if (((TransactionMQProducer)this.producer).getTransactionListener() == null) {
   throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
  } else {
   org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
   return this.producer.sendMessageInTransaction(rocketMsg, arg);
  }
 } catch (MQClientException var5) {
  throw RocketMQUtil.convert(var5);
 }
}

该方法有三个参数:

注意,这里我们生成了一个transactionId,并放在header中跟消息一起发送(这里实际也可以构造成一个对象,放在arg里进行发送),作用后面再讲!

执行本地事务与回查

MQServer收到半消息后会告诉生产者order-service确认收到半消息,这时候order-service需要执行本地事务,执行完本地事务后再告诉MQServer本地事务的执行状态,确认消息究竟是Commit还是Rollback。如果在告诉MQServer本地执行状态的时候出异常了还需要让MQServer能够回查到,怎么实现这一些列操作呢?RocketMQ进阶 - 事务消息-LMLPHP

RocketMQ提供了 RocketMQLocalTransactionListener 接口,本地事务监听器,这个接口类的实现如下:RocketMQ进阶 - 事务消息-LMLPHP

第一个方法 executeLocalTransaction 为执行本地事务;第二个方法 checkLocalTransaction 为检查本地事务的执行状态,也就是回查动作。有了这个接口类我们的执行逻辑清楚了,但是还有个问题:本地事务已经执行完成了,怎么去回查本地事务的执行结果呢?RocketMQ进阶 - 事务消息-LMLPHP

我们可以在执行本地事务的时候同时生成一个事务日志,让本地事务与日志事务在同一个方法中,同时添加 @Transactional 注解,保证两个操作事务是一个原子操作。这样如果事务日志表中有这个本地事务的信息,那就代表本地事务执行成功,需要Commit,相反如果没有对应的事务日志,则表示没执行成功,需要Rollback

思路既然理顺了,咱们就开撸。RocketMQ进阶 - 事务消息-LMLPHP

@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddUserAmountListener implements RocketMQLocalTransactionListener {
    private final OrderService orderService;
    private final RocketMqTransactionLogMapper rocketMqTransactionLogMapper;
    /**
     * 执行本地事务
     */

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        log.info("执行本地事务");
        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer orderId = Integer.valueOf((String)headers.get("order_id"));
        log.info("transactionId is {}, orderId is {}",transactionId,orderId);

        try{
            //执行本地事务,并记录日志
            orderService.changeStatuswithRocketMqLog(orderId, CloudConstant.INVALID_STATUS,transactionId);
            //执行成功,可以提交事务
            return RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 本地事务的检查,检查本地事务是否成功
     */

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {

        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.info("检查本地事务,事务ID:{}",transactionId);
        //根据事务id从日志表检索
        QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("transaction_id",transactionId);
        RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);
        if(null != rocketmqTransactionLog){
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}
@Transactional(rollbackFor = RuntimeException.class)
@Override
public void changeStatuswithRocketMqLog(Integer id,String status,String transactionId){
    //将订单状态置位无效
 orderMapper.changeStatus(id,status);
    //插入事务表
 rocketMqTransactionLogMapper.insert(
   RocketmqTransactionLog.builder()
     .transactionId(transactionId)
     .log("执行删除订单操作")
   .build()
 );
}

这一块的代码逻辑都是在生产端,即Order-Server,大家不要搞错了

消费消息

Rollback的消息MQServer会给我们处理,我们只要关注Commit状态时消费端可以正常消费即可。在 account-service监听消息,如果收到消息则给用户账户增加余额。

@Slf4j
@Service
@RocketMQMessageListener(topic = "add-amount",consumerGroup = "cloud-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired) )
public class AddUserAmountListener implements RocketMQListener<UserAddMoneyDTO{
    private final AccountMapper accountMapper;
    /**
     * 收到消息的业务逻辑
     */

    @Override
    public void onMessage(UserAddMoneyDTO userAddMoneyDTO) {
        log.info("received message: {}",userAddMoneyDTO);
        accountMapper.increaseAmount(userAddMoneyDTO.getUserCode(),userAddMoneyDTO.getAmount());
        log.info("add money success");
    }
}

测试

RocketMQ进阶 - 事务消息-LMLPHP订单表有这样一条记录,用户为jianzh5,amount为200

RocketMQ进阶 - 事务消息-LMLPHP用户表的记录,执行完成后jianzh5的账户应该变成250

小结

使用RocketMQ实现事务消息的过程还是很复杂的,需要好好理解开头的那张图,只有理解了事务消息的交互过程才能编写相应的代码!


收藏 等于白嫖点赞 才是真情!

RocketMQ进阶 - 事务消息-LMLPHP




本文分享自微信公众号 - JAVA日知录(javadaily)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

09-13 07:06
查看更多