RocketMQ学习笔记

参考资料:

安装启动(非集群模式)

  1. 官网下载二进制安装包(当然也可下载源码包后自己编译):下载地址

  2. 解压

    unzip rocketmq-all-4.3.0-bin-release.zip
    
  3. 修改配置

    • conf/broker.conf 中新增

      brokerIP1 = 192.168.195.88
      autoCreateTopicEnable = true  # 线上环境应该设为false
      
    • bin/runbroker.sh中修改JVM内存大小,默认是8G,一般自己电脑的上虚拟机可能没这么大

  4. 启动

    	# 后台启动NameServer
    	nohup sh bin/mqnamesrv -n 192.168.195.88:9876 &
    	# 查看日志,看是否启动成功
    	tail -f ~/logs/rocketmqlogs/namesrv.log
    
    	# 后台启动Broker
    	nohup sh bin/mqbroker -n 192.168.195.88:9876 -c conf/broker.conf &
    	# 查看日志,看是否启动成功
    	tail -f ~/logs/rocketmqlogs/broker.log
    
  5. 停止

    sh bin/mqshutdown broker
    sh bin/mqshutdown namesrv
    

    注意:

    • 记得在防火墙中开启9876(nameServer用)、10909(生产者用)、10911(消费者用)端口
    • 上面的192.168.195.88为我自己的机器的IP,不要使用localhost

常用命令

在RocketMQ的bin目录下有一个mqadmin脚本,它充当着控制台的角色,可以用来完成我们常用的操作。如不喜欢命令可安装第三方的可视化操控界面工具

  1. 创建topic

    sh mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t testTopic1
    # 参数 -n为nameServe服务地址  -b为broker服务地址  -t为topic的名字
    
  2. 查询所有topic

    sh mqadmin topicList -n localhost:9876
    # 参数 -n为nameServe服务地址
    
  3. 查看Topic统计信息

    sh mqadmin topicStatus -n localhost:9876 -t testTopic1
    # 参数 -n为nameServe服务地址 -t为topic的名字
    
  4. 查看消费组信息

    sh mqadmin consumerProgress -n localhost:9876 -g simple_push_consumer_group_01
    # 参数 -n为nameServe服务地址 -g为消费组的名字,无则表示查看所有的
    
  5. 查看所有命令:sh mqadmin

原理概述

系统架构

注意点:

  • NameServer提供服务发现和路由。 每个 NameServer 记录完整的路由信息,提供等效的读写服务,Broker启动后将自己注册至NameServer;随后每隔30s定期向NameServer上报Topic路由信息。每个 Broker 与NameServer 集群中的所有节点都会建立长连接。
  • Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。Producer 只能将消息发送到 Broker master。
  • Consumer 可同时和提供Topic服务的master和Slave建立长连接,即在master节点宕机时,消费者可以从slave节点读取消息。
  • Broker的主从切换问题,暂未研究

数据存储

RocketMQ的数据存储主要有三个内容:ConsumeQueue、CommitLog和IndexFile

ConsumeQueue是消息的逻辑队列,是由20字节定长的二进制数据单元组成,其中commitLogOffset(8 byte)、msgSize(4 byte)、tagsHashCode(8 byte);每个Topic和QueuId对应一个ConsumeQueue;单个文件大小约5.72M,每个文件由30W条数据组成,每个文件默认大小为600万个字节,当一个ConsumeQueue类型的文件写满了,则写入下一个文件。

CommitLog是消息存放的实际物理位置,每个Broker下所有的Topic下的消息队列共用同一个CommitLog的日志数据文件来存储,所有RocketMQ的写入是顺序的。单个CommitLog文件的默认大小为1G。

IndexFile即消息索引,如果一个消息包含key值的话,会使用IndexFile存储消息索引,其每个单元的数据构成为keyHash(4 byte)、commitLogOffset(8 byte)、timestamp(4 byte)、nextIndexOffset(4byte)。IndexFile主要是用来根据key来查询消息。

  • Producer端发送消息最终写入的是CommitLog,写入CommitLog有同步刷盘和异步刷盘两种方式:

    同步刷盘:只有在消息真正持久化至磁盘后,Broker端才会真正地返回给Producer端一个成功的ACK响应。

    异步刷盘:只要消息写入PageCache即可将成功的ACK返回给Producer端。

  • Consumer端先从ConsumeQueue读取持久化消息的offset,随后再从CommitLog中进行读取消息的真正实体内容。所以实际上读取操作是随机而不是顺序的,所以这也是消费速度是比Kafka低的原因。

    更加详细的介绍请参考该连接的第五节

生产者

RocketMQ发送消息有三种方式

  • 同步

    消息发送后,等待服务端的ack响应,这种方式最可靠,但效率最低

  • 异步

    消息发送注册回调函数,不需等待服务端的响应

  • 单向

    消息发送后,不关心服务端是否成功接受

如果Producer发送消息失败,会自动重试,重试的策略:

  1. 重试次数 < retryTimesWhenSendFailed(可配置)
  2. 总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)
  3. 同时满足上面两个条件后,Producer会选择另外一个队列发送消息

消费者

RocketMQ消费消息主要有两种方式:

  • pull模式

    由消费者客户端主动向服务端拉取消息。

    一般情况下,如果我们没有控制好pull的频率,频率过低时,则可能消费速度太低导致消息的积压,频率过高时,则可能发送过多无效或低效pull请求,增加了服务端负载。

    为了解决这个问题,RocketMQ在没有足够的消息时(如服务端没有可消费的消息),并不会立即返回响应,而是保持并挂起当前请求,待有足够的消息时在返回。并且我们需要指定offset的起点和终点,并且需要我们自己保存好本次消费的offset点,下次消费的时候好从上次的offset点开始拉取消息。

    pull模式我们并不经常使用。

  • push模式

    由服务端主动地将消息推送给消费者。

    push模式下,慢消费的情况可能导致消费者端的缓冲区溢出。

    但是在RocketMQ中并不是真正的push,而是基于长轮训的pull模式的来实现的伪push。具体的实现是:Consumer端每隔一段时间主动向broker发送拉消息请求,broker在收到Pull请求后,如果有消息就立即返回数据,Consumer端收到返回的消息后,再回调消费者设置的Listener方法。如果broker在收到Pull请求时,消息队列里没有数据,broker端会阻塞请求直到有数据传递或超时才返回。

消费重试

即消费失败后,隔一段时间重新消费该消息。

  1. 重试队列

    RocketMQ会为每个消费组都设置一个Topic名称为%RETRY%+consumerGroup的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。Consumer端出现异常失败时,失败的消息会重新发送给服务端的重试队列。

  2. 死信队列

    重试队列中超过配置的“最大重试消费次数”后就会移入到这个死信队列中。在RocketMQ中,SubscriptionGroupConfig配置常量默认地设置了两个参数,一个是retryQueueNums为1(重试队列数量为1个),另外一个是retryMaxTimes为16(最大重试消费的次数为16次)。Broker端通过校验判断,如果超过了最大重试消费次数则会将消息移至这里所说的死信队列。这里,RocketMQ会为每个消费组都设置一个Topic命名为%DLQ%+consumerGroup的死信队列。

    一般在实际应用中,移入至死信队列的消息,需要人工干预处理。

注意点:

  • RocketMQ的的默认延迟级别分为16个,所以一条消息最大的重试次数为16;

    // 源码位置:org.apache.rocketmq.store.config.MessageStoreConfig.class
    // 如需修改,则需要修改broker的配置,官方并不建议修改
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    
    //如果我们在发送消息时设置消息的延迟级别为3,则表示消息10s后才能被消费者发现
    msg.setDelayTimeLevel(3);
    
  • RocketMQ的Message中的reconsumeTimes属性,表示该消息当前已重试的的次数,我们可以通过如下方法来控制重试的次数

    int reconsumeTimes = msg.getReconsumeTimes();
    // 只重试三次
    if(reconsumeTimes >= 3){
        // 表示消息已成功消费,不在放入重试队列中
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } else {
        // 表示消息消费失败,放入重试队列中,指定延迟时间后重新消费
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    

    RocketMQ自身的重试机制,默认消息的初始延迟级别就为3,好像并没法修改。源码如下:

    // 源码位置:org.apache.rocketmq.client.impl.consumer.ProcessQueue.class
    public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
        ...
        pushConsumer.sendMessageBack(msg, 3);
        ...
    }
    

简单的代码示例

Producer

public class Producer {

    public static void main(String[] args) {
        // 初始化一个生产者,生产组为simple_producer_group
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("simple_producer_group_01");
        // 设置NameServer地址
        defaultMQProducer.setNamesrvAddr("192.168.195.88:9876");
        try {
            // 启动
            defaultMQProducer.start();

            Producer producer = new Producer();
            producer.syncSend(defaultMQProducer);
            producer.asyncSend(defaultMQProducer);
            producer.onewaySend(defaultMQProducer);
        } catch (Exception e) {
            log.error("异常:", e);
        } finally {
            defaultMQProducer.shutdown();
        }
    }

    /**
     * 同步方式
     */
    private void syncSend(DefaultMQProducer producer) throws InterruptedException, RemotingException,MQClientException, MQBrokerException, UnsupportedEncodingException {
        for(int i=0; i<100; i++){
            String messageBody = "syncSend message" + i ;
            Message msg = new Message("testTopic1", "*",
                                      messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
            /*
                设置消息延迟级别,默认有16个级别:
                1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
                注意,如果设置为3,意思是10s再被消费
             */
            //msg.setDelayTimeLevel(2);
            SendResult sendResult = producer.send(msg, 10000);
            log.info("onewaySend发送结果:{}", sendResult);
        }
    }

    /**
     * 异步方式
     */
    private void asyncSend(DefaultMQProducer producer) throws UnsupportedEncodingException, RemotingException, MQClientException, InterruptedException {
        for(int i=0; i<100; i++){
            String messageBody = "asyncSend message" + i ;
            Message msg = new Message("testTopic1", "*",
                                      messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("asyncSend发送成功:{}", sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    log.info("asyncSend发送异常:{}", e);
                }
            });
        }
    }

    /**
     * 单向方式
     */
    private void onewaySend(DefaultMQProducer producer) throws RemotingException, MQClientException, InterruptedException, UnsupportedEncodingException {
        for(int i=0; i<100; i++){
            String messageBody = "onewaySend message" + i ;
            Message msg = new Message("testTopic1", "*",
                                      messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.sendOneway(msg);
            log.info("onewaySend发送完成");
        }
    }
}

Consumer

public class Consumer {

    public static void main(String[] args) throws Exception {
        new Consumer().pushMode();
        //new Consumer().pullMode();

    }

    /**
     * push 消费模式
     */
    private void pushMode() throws MQClientException {
        // 设置消费组
        DefaultMQPushConsumer consumer
            = new DefaultMQPushConsumer("simple_push_consumer_group_01");
        // 设置nameServer地址
        consumer.setNamesrvAddr("192.168.195.88:9876");
        // 设置从头开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 指定topic
        consumer.subscribe("testTopic1", "*");
        // 设置批量消费消息的数量,默认1
        consumer.setConsumeMessageBatchMaxSize(1);
        // 注册消息监听
        consumer.registerMessageListener((List<MessageExt> msgs,
                                          ConsumeConcurrentlyContext context) -> {
            MessageExt msg = msgs.get(0);
            try {
                log.info("收到消息:{}",
                         new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
            } catch (UnsupportedEncodingException e) {
                log.info("异常:", e);
            }
            // 消息的重试次数
            int reconsumeTimes = msg.getReconsumeTimes();
            log.info("reconsumeTimes={}, delayTimeLevel={}", reconsumeTimes,
                     msg.getDelayTimeLevel());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            // 只重试三次
           /* if(reconsumeTimes >= 3){
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } else {
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }*/
        });
        // 运行
        consumer.start();
        log.info("消费者启动成功");
    }


    /** 用于pull模式中记录offset使用*/
    private static final Map<MessageQueue, Long> OFFSET_TABLE = new HashMap<>();

    /**
     * pull 消费模式, 不建议使用
     */
    private void pullMode() throws MQClientException {
        DefaultMQPullConsumer consumer
            = new DefaultMQPullConsumer("simple_pull_consumer_group_01");
        consumer.setNamesrvAddr("192.168.195.88:9876");
        consumer.start();
        // 获取topic中的所有队列
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("testTopic1");
        for (MessageQueue mq : mqs) {
            log.info("消费队列:{}", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                            consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    log.info("pullResult:{}", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            pullResult.getMsgFoundList().forEach(msgExt -> {
                                try {
                                    log.info("收到消息:{}", new String(msgExt.getBody(),
                                                                 RemotingHelper.DEFAULT_CHARSET));
                                } catch (UnsupportedEncodingException e) {
                                    log.info("异常:", e);
                                }
                            });
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case NO_MATCHED_MSG:
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    log.info("异常:", e);
                }
            }
        }
    }

    /**
     * 获取偏移量
     */
    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSET_TABLE.get(mq);
        if (offset != null) {
            return offset;
        }
        return 0;
    }

    /**
     * 保存偏移量
     */
    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSET_TABLE.put(mq, offset);
    }
}

分布式事务解决方案

逻辑是这样的:

  1. 事务发起方发送事务消息到MQ中,但此时该消息对方不可见
  2. 消息发送成功后,执行本地事务
  3. 根据本地事务执行结果,向MQ发送确认消息commit 或rollback;如果为rollbalke,MQ则删除事务消息不在下发,如为commit,则事务消息变为对方可见
  4. 对方消费事务消息,如果消费失败则进行重试,重试仍未成功则需要人工处理
  5. 如果确认消息发送失败,或者执行本地事务时事务发起方宕掉,这时MQ 将会不停的询问其同组的其他producer来获取状态。

实例代码(Springboot项目):

@Component
@Slf4j
public class TransactionProducer implements InitializingBean {
    @Autowired
    private TransactionListener transactionListener;

    private TransactionMQProducer producer;

    @Override
    public void afterPropertiesSet() throws Exception {
        producer = new TransactionMQProducer("transaction_producer_group_01");
        producer.setNamesrvAddr("192.168.195.88:9876");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
                r -> {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
        		});

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        try {
            producer.start();
        } catch (MQClientException e) {
            log.error("TransactionProducer 启动异常:", e);
        }

        // 添加关闭钩子
        Runtime.getRuntime().addShutdownHook(new Thread(() -> producer.shutdown()));
    }

    /**
     * 发送事务消息
     */
    public void produce() {
        try {
            String body= "transaction_test_1";
            Message msg = new Message("transactionTopicTest1", "user",
                                      UUID.randomUUID().toString(),
                                      body.getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            log.info("发送事务消息结果:{}", sendResult);
        } catch (MQClientException | UnsupportedEncodingException e) {
            log.info("发送事务消息异常:{}", e);
        }
    }

}
@Component
@Slf4j
public class TransactionListenerImpl implements TransactionListener {
    @Resource(name = "jdbcTemplate1")
    private JdbcTemplate jdbcTemplate;

    /** 生产中可以用redis或数据库代替 */
    private ConcurrentHashMap<String, LocalTransactionState> localTrans
        = new ConcurrentHashMap<>();

    /**
     * 执行本地事务
     * @param msg 事务消息
     * @param arg 自定义参数
     * @return 执行结果
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            jdbcTemplate.execute("insert into user(name) values('transaction_test_1')");
            localTrans.put(msg.getTransactionId(), LocalTransactionState.COMMIT_MESSAGE);
            log.info("本地事务执行成功");
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e){
            localTrans.put(msg.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE);
            log.error("本地事务执行失败");
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        LocalTransactionState state = localTrans.get(msg.getTransactionId());
        log.info("执行事务回查,transactionId:{}, transactionState:{}",
                 msg.getTransactionId(), state);
        if (null != state) {
            return state;
        }
        return LocalTransactionState.UNKNOW;
    }
}
@Component
@Slf4j
public class TransactionConsumer implements InitializingBean {
    @Resource(name = "jdbcTemplate2")
    private JdbcTemplate jdbcTemplate;

    @Override
    public void afterPropertiesSet() throws Exception {
        DefaultMQPushConsumer consumer
            = new DefaultMQPushConsumer("transaction_consumer_group_01");
        consumer.setNamesrvAddr("192.168.195.88:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("transactionTopicTest1", "user");
        consumer.registerMessageListener((List<MessageExt> msgs,
                                          ConsumeConcurrentlyContext context) -> {
            try {
                MessageExt msg = msgs.get(0);
                log.info("收到消息:{}", msg);
                // 正式项目中,我们应该保存该msgId,防止消息的重复消费
                // String msgId = msg.getMsgId();
                String body = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                jdbcTemplate.execute("insert into user(name) values('"+body+"')");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {
                log.info("异常:", e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        consumer.start();
        log.info("TransactionConsumer启动成功");
    }
}
/**
 * 数据源配置,用的Mysql和Druid
 */
@Configuration
public class DbConfig {

    @Bean(name = "dataSource1")
    DataSource dataSource1() {
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setUrl("jdbc:mysql://192.168.195.88:3306/jta1?useUnicode=true&characterEncoding=UTF8&useSSL=false");
        dataSource.setUsername("root");
        dataSource.setPassword("root");
        return dataSource;
    }

    @Bean(name = "dataSource2")
    DataSource dataSource2() {
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setUrl("jdbc:mysql://192.168.195.88:3306/jta2?useUnicode=true&characterEncoding=UTF8&useSSL=false");
        dataSource.setUsername("root");
        dataSource.setPassword("root");
        return dataSource;
    }

    @Bean("jdbcTemplate1")
    JdbcTemplate first(@Qualifier("dataSource1") DataSource dataSource) {
        JdbcTemplate jdbcTemplate = new JdbcTemplate();
        jdbcTemplate.setDataSource(dataSource);
        return jdbcTemplate;
    }

    @Bean("jdbcTemplate2")
    JdbcTemplate second(@Qualifier("dataSource2") DataSource dataSource) {
        JdbcTemplate jdbcTemplate = new JdbcTemplate();
        jdbcTemplate.setDataSource(dataSource);
        return jdbcTemplate;
    }
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {
    @Autowired
    TransactionProducer transactionProducer;

    /**
     * 测试分布式事务
     */
    @Test
    public void testTransactionProducer() throws InterruptedException {
        transactionProducer.produce();
        Thread.sleep(10000000);
    }
}

其他的一些东西

以下内容没有主要研究,主要摘自官网,有需要时再详细研究。

有序消息

原理和Kafka要做到有序差不多,即把消息放入一个队列中,然后使用一个消费者去进行消费。

广播消息

即每个消费者消费都所有的消息

//设置广播模式属性即可
consumer.setMessageModel(MessageModel.BROADCASTING);

批量发送消息

批量发送消息可提高生产者的性能。注意的是:同一批次的消息应该具有:相同的topic,相同的waitStoreMsgOK和没有延时计划,并且一批消息的总大小不应超过1M。

消息过滤器

RocketMQ在消息过滤上是比较强大的,虽然我们可能不会经常用到它。Message除了可以Tag来加以区分外,我们还可以为它添加额外的属性。如下:

在发送消息时:

// 消息添加额外属性
msg.putUserProperty("a", "3");

在消费消息时:

// 只有消费具有属性a,a>= 0且a <= 3的消息
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");

还有非常多的过滤方式如:in and or not is null = < > 等等

消息重复

RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。

05-04 00:34