opic的有序消息已经成为mq的标配。而RocketMQ中是这样区分消息类型的, 普通消息也叫做无序消息,简单来说就是没有顺序的消息,而有序消息就是按照一定的先后顺序的消息类型。举个例子,producer 依次发送 order id 为 1、2、3 的消息到 broker,consumer 接到的消息顺序也就是 1、2、3 ,而不会出现普通消息那样的 2、1、3 等情况。
一、有序消息该如何实现
理论上:我们都知道消息首先由 producer 到 broker,再从 broker 到 consumer,分这两步走。那么要保证消息的有序,势必这两步都是要保证有序的,即要保证消息是按有序发送到 broker,broker 也是有序将消息投递给 consumer,两个条件必须同时满足,缺一不可。
1.1、全局有序消息
由于一个 topic 只有一个 queue ,即使我们有多个 producer 实例和 consumer 实例也很难提高消息吞吐量。就好比过独木桥,大家只能一个挨着一个过去,效率低下。
1.2、局部有序消息
常见做法就是将 order id 进行处理,将 order id 相同的消息发送到 topicB 的同一个 queue,假设我们 topicB 有 2 个 queue,那么我们可以简单的对 id 取余,奇数的发往 queue0,偶数的发往 queue1,消费者按照 queue 去消费时,就能保证 queue0 里面的消息有序消费,queue1 里面的消息有序消费。
二、RocketMQ的topic的补充
opic 只是消息的逻辑分类,内部实现其实是由 queue 组成。当 producer 把消息发送到某个 topic 时,默认是会消息发送到具体的 queue 上。由于一个 topic 可以有多个 queue,所以在性能比全局有序高得多。假设 queue 数是 n,理论上性能就是全局有序的 n 倍,当然 consumer 也要跟着增加才行。在实际情况中,这种局部有序消息是会比全局有序消息用的更多。
/** * 有序消息 */ public class OrderedProducer { public static final String NAME_SERVER_ADDR = "192.168.32.128:9876"; public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException, UnsupportedEncodingException { // 1:创建生产者对象,并指定组名 DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST"); // 2:指定NameServer地址 producer.setNamesrvAddr(NAME_SERVER_ADDR); // 3:启动生产者 producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); // 设置异步发送失败重试次数,默认为2 // 4:定义消息队列选择器 MessageQueueSelector messageQueueSelector = new MessageQueueSelector() { /** * 消息队列选择器,保证同一条业务数据的消息在同一个队列 * @param mqs topic中所有队列的集合 * @param msg 发送的消息 * @param arg 此参数是本示例中producer.send的第三个参数 * @return */ @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; // id == 1001 int index = id % mqs.size(); // 分区顺序:同一个模值的消息在同一个队列中 return mqs.get(index); // 全局顺序:所有的消息都在同一个队列中 // return mqs.get(mqs.size() - 1); } }; String[] tags = new String[]{"TagA", "TagB", "TagC"}; List<Map> bizDatas = getBizDatas(); // 5:循环发送消息 for (int i = 0; i < bizDatas.size(); i++) { Map bizData = bizDatas.get(i); // keys:业务数据的ID,比如用户ID、订单编号等等 Message msg = new Message("TopicTest", tags[i % tags.length], "" + bizData.get("msgType"), bizData.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送有序消息 SendResult sendResult = producer.send(msg, messageQueueSelector, bizData.get("msgType")); System.out.printf("%s, body:%s%n", sendResult, bizData); } // 6:关闭生产者 producer.shutdown(); } public static List<Map> getBizDatas() { List<Map> orders = new ArrayList<Map>(); HashMap orderData = new HashMap(); orderData.put("msgType", 1001); orderData.put("userId", "张三"); orderData.put("desc", "存钱1000"); orders.add(orderData); orderData = new HashMap(); orderData.put("msgType", 2001); orderData.put("userId", "张三"); orderData.put("desc", "取钱1000"); orders.add(orderData); orderData = new HashMap(); orderData.put("msgType", 3001); orderData.put("userId", "张三"); orderData.put("desc", "存钱2000"); orders.add(orderData); orderData = new HashMap(); orderData.put("msgType", 4001); orderData.put("userId", "张三"); orderData.put("desc", "存钱3000"); orders.add(orderData); orderData = new HashMap(); orderData.put("msgType", 5001); orderData.put("userId", "张三"); orderData.put("desc", "存钱4000"); orders.add(orderData); orderData = new HashMap(); orderData.put("msgType", 6001); orderData.put("userId", "张三"); orderData.put("desc", "取钱5000"); orders.add(orderData); orderData = new HashMap(); orderData.put("msgType", 7001); orderData.put("userId", "张三"); orderData.put("desc", "取钱6000"); orders.add(orderData); orderData = new HashMap(); orderData.put("msgType", 8001); orderData.put("userId", "张三"); orderData.put("desc", "取钱2000"); orders.add(orderData); orderData = new HashMap(); orderData.put("msgType", 9001); orderData.put("userId", "张三"); orderData.put("desc", "存钱9000"); orders.add(orderData); return orders; } }