• RocketMQ事务流程关键

    1、事务消息在一阶段对用户不可见
    事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的,也就是说消费者不能直接消费。这里RocketMQ的实现方法是原消息的主题与消息消费队列,然后把主题改成RMQ_SYS_TRANS_HALF_TOPIC ,这样由于消费者没有订阅这个主题,所以不会被消费。

    2、如何处理第二阶段的失败消息?
    在本地事务执行完成后会向MQServer发送Commit或Rollback操作,此时如果在发送消息的时候生产者出故障了,那么要保证这条消息最终被消费,MQServer会像服务端发送回查请求,确认本地事务的执行状态。
    当然了rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,RocketMQ默认回滚该消息。

    3、消息状态 事务消息有三种状态:

    实现

    我们构建这样一个需求:用户请求订单微服务 order-service 接口删除订单(退货),删除订单后需要发送消息给用户服务account-service,用户微服务收到消息后会给用户账户增加余额。这个需求跟钱相关,肯定要保证消息的事务性,接下来我们根据上面的原理实现整个流程。

    基础配置

    生产者order-servcie和account-service都要引入RocketMQ相关依赖,增加RocketMQ的相关配置

    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    </dependency>
    # within rocketmq
    rocketmq:
    name-server: xxx.xx.x.xx:9876; xxx.xx.x.xx:9876
    producer:
    group: cloud-group

    发送半消息

    order-service在执行删除订单操作时发送一条半消息给MQServer,发送半消息主要是使用rocketMQTemplate.sendMessageInTransaction() 方法,发送事务消息。

    @Override
    public void delete(String orderNo) {
    Order order = orderMapper.selectByNo(orderNo);
    //如果订单存在且状态为有效,进行业务处理
    if (order != null && CloudConstant.VALID_STATUS.equals(order.getStatus())) {
    String transactionId = UUID.randomUUID().toString();
    //如果可以删除订单则发送消息给rocketmq,让用户中心消费消息
    rocketMQTemplate.sendMessageInTransaction("add-amount",
    MessageBuilder.withPayload(
    UserAddMoneyDTO.builder()
    .userCode(order.getAccountCode())
    .amount(order.getAmount())
    .build()
    )
    .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
    .setHeader("order_id",order.getId())
    .build()
    ,order
    );
    }
    }

    首先先校验一下订单状态,然后发送消息给MQServer,这个逻辑大家都看得懂,主要是关注sendMessageInTransaction() 方法,源码如下:

    public TransactionSendResult sendMessageInTransaction(String destination, Message<?> message, Object arg) throws MessagingException {
    try {
    if (((TransactionMQProducer)this.producer).getTransactionListener() == null) {
    throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
    } else {
    org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
    return this.producer.sendMessageInTransaction(rocketMsg, arg);
    }
    } catch (MQClientException var5) {
    throw RocketMQUtil.convert(var5);
    }
    }

    该方法有三个参数:

    注意,这里我们生成了一个transactionId,并放在header中跟消息一起发送(这里实际也可以构造成一个对象,放在arg里进行发送),作用后面再讲!

    执行本地事务与回查

    MQServer收到半消息后会告诉生产者order-service确认收到半消息,这时候order-service需要执行本地事务,执行完本地事务后再告诉MQServer本地事务的执行状态,确认消息究竟是Commit还是Rollback。如果在告诉MQServer本地执行状态的时候出异常了还需要让MQServer能够回查到,怎么实现这一些列操作呢?RocketMQ进阶-事务消息-LMLPHP

    RocketMQ提供了 RocketMQLocalTransactionListener 接口,本地事务监听器,这个接口类的实现如下:RocketMQ进阶-事务消息-LMLPHP

    第一个方法executeLocalTransaction 为执行本地事务;
    第二个方法checkLocalTransaction 为检查本地事务的执行状态,也就是回查动作。
    有了这个接口类我们的执行逻辑清楚了,但是还有个问题:本地事务已经执行完成了,怎么去回查本地事务的执行结果呢?RocketMQ进阶-事务消息-LMLPHP

    我们可以在执行本地事务的时候同时生成一个事务日志,让本地事务与日志事务在同一个方法中,同时添加@Transactional 注解,保证两个操作事务是一个原子操作。这样如果事务日志表中有这个本地事务的信息,那就代表本地事务执行成功,需要Commit,相反如果没有对应的事务日志,则表示没执行成功,需要Rollback

    思路既然理顺了,咱们就开撸。RocketMQ进阶-事务消息-LMLPHP

    @Slf4j
    @RocketMQTransactionListener
    @RequiredArgsConstructor(onConstructor = @__(@Autowired))
    public class AddUserAmountListener implements RocketMQLocalTransactionListener {
    private final OrderService orderService;
    private final RocketMqTransactionLogMapper rocketMqTransactionLogMapper;
    /**
    * 执行本地事务
    */

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
    log.info("执行本地事务");
    MessageHeaders headers = message.getHeaders();
    //获取事务ID
    String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
    Integer orderId = Integer.valueOf((String)headers.get("order_id"));
    log.info("transactionId is {}, orderId is {}",transactionId,orderId);

    try{
    //执行本地事务,并记录日志
    orderService.changeStatuswithRocketMqLog(orderId, CloudConstant.INVALID_STATUS,transactionId);
    //执行成功,可以提交事务
    return RocketMQLocalTransactionState.COMMIT;
    }catch (Exception e){
    return RocketMQLocalTransactionState.ROLLBACK;
    }
    }

    /**
    * 本地事务的检查,检查本地事务是否成功
    */

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {

    MessageHeaders headers = message.getHeaders();
    //获取事务ID
    String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
    log.info("检查本地事务,事务ID:{}",transactionId);
    //根据事务id从日志表检索
    QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();
    queryWrapper.eq("transaction_id",transactionId);
    RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);
    if(null != rocketmqTransactionLog){
    return RocketMQLocalTransactionState.COMMIT;
    }
    return RocketMQLocalTransactionState.ROLLBACK;
    }
    }
    @Transactional(rollbackFor = RuntimeException.class)
    @Override
    public void changeStatuswithRocketMqLog(Integer id,String status,String transactionId){
    //将订单状态置位无效
    orderMapper.changeStatus(id,status);
    //插入事务表
    rocketMqTransactionLogMapper.insert(
    RocketmqTransactionLog.builder()
    .transactionId(transactionId)
    .log("执行删除订单操作")
    .build()
    );
    }

    这一块的代码逻辑都是在生产端,即Order-Server,大家不要搞错了

    消费消息

    Rollback的消息MQServer会给我们处理,我们只要关注Commit状态时消费端可以正常消费即可。在account-service监听消息,如果收到消息则给用户账户增加余额。

    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "add-amount",consumerGroup = "cloud-group")
    @RequiredArgsConstructor(onConstructor = @__(@Autowired) )
    public class AddUserAmountListener implements RocketMQListener<UserAddMoneyDTO> {
    private final AccountMapper accountMapper;
    /**
    * 收到消息的业务逻辑
    */

    @Override
    public void onMessage(UserAddMoneyDTO userAddMoneyDTO) {
    log.info("received message: {}",userAddMoneyDTO);
    accountMapper.increaseAmount(userAddMoneyDTO.getUserCode(),userAddMoneyDTO.getAmount());
    log.info("add money success");
    }
    }

    测试

    RocketMQ进阶-事务消息-LMLPHP订单表有这样一条记录,用户为jianzh5,amount为200

    RocketMQ进阶-事务消息-LMLPHP用户表的记录,执行完成后jianzh5的账户应该变成250

    总结

    使用RocketMQ实现事务消息的过程还是很复杂的,需要好好理解开头的那张图,只有理解了事务消息的交互过程才能编写相应的代码!


    朕已阅 RocketMQ进阶-事务消息-LMLPHP


    本文分享自微信公众号 - JAVA日知录(javadaily)。
    如有侵权,请联系 support@oschina.cn 删除。
    本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

    09-13 13:07
    查看更多