本文图片和部分总结来自于参考资料,半原创,侵删
问题
- Rocketmq 重试是否有超时问题,假如超时了如何解决,是重新发送消息呢?还是一直等待
- 假如某个 msg 进入了重试队列(%RETRY_XXX%),然后成功消费了
概述
文章介绍了RocketMQ 的重试机制和消息重试的机制。
定时任务
定时任务概述
rocketmq为定时任务创建一个单独的 topic ,而 rocketmq的定时任务是定的时间是分等级的,而不同等级对应topic内不同的队列,然后通过一个“执行定时任务的服务”定时执行多个队列内的任务,执行时需要更改该定时任务实际要发送的 topic 和 tag 。
发送例子
发送例子
Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */); msg.setDelayTimeLevel(i + 1);
时间等级
public class MessageStoreConfig { private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; }
写入定时任务
写入的时候是在写入commitLog 的时候写入的,这一点很重要,因为这也是实现消费失败重试的基础。 CommitLog 会将这条消息的话题和队列 ID 替换成专门用于定时的话题和相应的级别对应的队列 ID。真实的话题和队列 ID 会作为属性放置到这条消息中,后面处理的时候会自己从这个队列id 进行发送消息。
public class CommitLog { public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // Delay Delivery if (msg.getDelayTimeLevel() > 0) { topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); // 替换 Topic 和 QueueID msg.setTopic(topic); msg.setQueueId(queueId); } } }
处理定时任务
执行定时任务的服务,ScheduleMessageService 的 start 方法
public void start() { for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { //Timer 持有多个定时任务,然后时间到了就执行该任务, // 但是 Timer 内部只有一个线程在执行任务,也就不能保证时间的正确性(因为当一个线程在执行的时候,某个任务的时间已经到了) // 注意,是为每个延时时间等级建一个任务Task this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); }
class DeliverDelayedMessageTimerTask extends TimerTask { public void executeOnTimeup() { // ... for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 是否到时间 long countdown = deliverTimestamp - now; if (countdown <= 0) { // 取出消息 MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); // 修正消息,设置上正确的话题和队列 ID MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); // 重新存储消息 PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore .putMessage(msgInner); } else { // countdown 后投递此消息 ScheduleMessageService.this .timer .schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown); // 更新偏移量 } } // end of for // 更新偏移量 } }
同时该定时任务也进行持久化,一个是消费进度,一个消息对应的位移量
消息消费重试
RocketMQ中遇到以下情况就会进行消息重试 :
- 抛出异常
- 返回 NULL 状态
- 返回 RECONSUME_LATER 状态
- 超时 15 分钟没有响应
consumer 注册订阅重试队列
consumer 在启动的时候就会订阅“%RETRY_XXX%”的topic,为的就是当某个topic消费失败处理重试消息。如下图所示 :
public class DefaultMQPushConsumerImpl implements MQConsumerInner { public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: // ... this.copySubscription(); // ... this.serviceState = ServiceState.RUNNING; break; } } private void copySubscription() throws MQClientException { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: break; case CLUSTERING: // 重试话题组 final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), retryTopic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; default: break; } } }
超时消费
我们思考一个问题,假如消费者掉线了,那么消息直接发不过去了,而要是消费者的消费逻辑执行了太久的业务逻辑,那么应该有一个动作来触发消费超时,进行重试.
ConsumeMessageConcurrentlyService 的 start 方法。
public void start() { this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { @Override public void run() { cleanExpireMsg(); } }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); }
这个定时周期任务每过 getConsumeTimeout 时间就会扫描消费超时的任务,调用 sendMessageBack 方法,该方法会调用 RPC发送消息给 broker ,消费失败进行重试。
上一篇我们讲到消息消费的过程,当集群模式下,消息消费成功会本地的消息消费进度,而失败了会调用RPC 发送消息给broker ,而broker 处理的逻辑在 SendMessageProcessor
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { SendMessageContext mqtraceContext; switch (request.getCode()) { //消费者消费失败的情况 case RequestCode.CONSUMER_SEND_MSG_BACK: return this.consumerSendMsgBack(ctx, request); default: SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { return null; } mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); RemotingCommand response; if (requestHeader.isBatch()) { response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); } this.executeSendMessageHookAfter(response, mqtraceContext); return response; } }
批量处理的问题
批量处理一批数据要是返回 RECONSUME_LATER ,那么这批数据就会重新发给 broker ,进行消息重试,所以在业务逻辑的时候就要考虑消费者重新消费的幂等性。
ConsumeRequest的 run 方法
@Override public void run() { .... try { ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs); if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } //NO.1 业务实现 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } ... if (!processQueue.isDropped()) { //NO.2 处理消息消费的结果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
ack 机制
public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ) { int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return; switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: //发送给broker , 该批数据进行消息重试 List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } //删除消息树中的已消费的消息节点,并返回消息树中最小的节点,更新最小的节点为当前进度!! long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
可以看到消息消费完成后,更新的进度都是对应的 processqueue中对应的消息树里的最小节点(即偏移量最小的节点),那么有可能存在这样的问题,下面来自 参考
这钟方式和传统的一条message单独ack的方式有本质的区别。性能上提升的同时,会带来一个潜在的重复问题——由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如 2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况。
在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度职能维持在2101,直到2101也消费结束了,本地的消费进度才能标记2200消费结束了(注:consumerOffset=2201)。
在这种设计下,就有消费大量重复的风险。如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被kill)。这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次。
总结
从参考资料中我学习到了自己学习与别人的差异是总结的能力,通过浓缩代码片段,总结核心的逻辑步骤,加深对逻辑的理解。
参考资料
- https://www.jianshu.com/p/5843cdcd02aa
- http://jaskey.github.io/blog/2017/01/25/rocketmq-consume-offset-management/