1、RocketMQ消息队列简单介绍

  这里简单介绍一下RocketMQ的消息队列的模型

  一个topic对应多个队列如下图:

  RocketMQ4.3.x对顺序消息的理解-LMLPHP

  生产者和消费者分别向队列中发送和消费消息,生产者和消费者都可以是多个,通过组名进行群组约束。由于负载因素造成生产消息会生产到各个queue中。

  消费群组进行queue消费时首先因为负载因素,queue会分配给各自的消费实例中,如果消费组有变化会重新分配,导致queue分配乱序。

  另外一个消费者实例消费对应的queue时,消费者使用线程池进行处理消息。

  以上各种操作都会导致消息不一定先处理就会先完成,所以造成消息消费不是严格顺序处理的。

  所以在之前的版本中假如我们要求要严格按照顺序进行消息处理的话就必须进行单队列单线程进行消息消费处理

  4.0.0之后版本支持顺序消费处理,我们看一看他是如何处理的。。。

2、顺序消费原来介绍

  根据上面的简单介绍我们知道一个topic对应多个队列,我们生产消息的时候就可以针对队列的数量和消息的有效标识进行取模进行队列选择发送,如下示例代码

package org.apache.rocketmq.example.ordermessage;

import java.io.UnsupportedEncodingException;
import java.util.List;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.start();

            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                int orderId = i % 10;
                Message msg =
                    new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  比如我们使用订单号作为和queue的ID取模的关键字段,我们就可以保证这个订单号的消息都会发送给一个队列,这样我们就保证了生产者生产消息在业务上是有序的

  接下来我们看看消费端是怎样实现的。

  1、消费者与broker建立连接分配队列的时候会尝试给队列加锁,如果成功则获取queue消费权利,否则尝试下一个queue。

  2、如果消费模式为集群每20秒对分配给自己的队列自动加锁

  3、消息消费时对queue进行加锁,同一时刻只允许一个线程对一个queue进行消费

  4、根据消费时间进行队列和线程的切换默认60s(这个时间就是锁住队列的时间)

  5、消息重试次数超过最大次数之后将消息移入死信queue

  RocketMQ4.3.x对顺序消息的理解-LMLPHP

  根据以上几点可以保证一个队列中的消息可以按顺序进行消费。

3、总结

  RocketMQ4.0.0对顺序消息做了升级,但是牺牲了部分消费性能。因为要给队列加锁,并且只能一个队列同一时间只能有一个线程处理消息。

  至于为什么是60秒(时间可设置)进行线程切换可能是更好的利用cpu或者不因为某个队列消息异常拖慢其他队列消息处理吧,还有待深入研究。

  

  本人理解如有误请广大网友指正,谢谢!

01-09 20:48