消费者拉取消息(Pull)示例
消费者使用Pull方式拉取消息的流程和Push消息的流程基本类似,包括创建消费者对象、设置组名、启动消费者消费。代码如下:
package com.wjw;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class PullConsumer {
// 存储队列offset
private static final Map<MessageQueue, Long> OFFSET_TABLE = new HashMap<>();
public static void main(String[] args) throws Exception{
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("group A");
// 启动消费者
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("Target Topic");
for (MessageQueue mq : mqs) {
System.out.println("Consume message from " + mq);
// 拉取消息
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, OFFSET_TABLE.get(mq), 32);
System.out.println("pullResult : " + pullResult);
// 设置该MQ的offset
OFFSET_TABLE.put(mq, pullResult.getNextBeginOffset());
}
consumer.shutdown();
}
}
将上面的流程概括一下:
- 创建Pull模式的消费者对象
- 启动消费者消费
- 调用
fetchSubscribeMessageQueues
方法,根据Topic名称查询对应的MQ,主动拉取消息 - 循环遍历MQ,对于遍历到的每个MQ,取出一条消息
fetchSubscribeMessageQueues
获取所有MQ的方法源码如下,该方法位于org/apache/rocketmq/client/impl/MQAdminImpl.java
中:
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
try {
// 从注册中心获取路由信息
TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
// 如果路由信息不为空则获取路由信息中的队列集合
if (topicRouteData != null) {
Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
if (!mqList.isEmpty()) {
return mqList;
} else {
throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);
}
}
} catch (Exception e) {
throw new MQClientException(
"Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
e);
}
throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
}
上述代码首先从注册中心中获取TopicRouteData
,其中存储了路由信息:
orderTopicConf
:顺序消息配置
queueDatas
:队列数据数组brokerAddr
:Broker数据数组filterServerTable
:Broker地址和Filter Server之间的映射
如果拿到的TopicRouteData
不为空,则提取TopicRouteData
内的QueueData生成MQ,这个MQ就是当前订阅的Topic
下的。如果队列集合不为空,就会直接返回。
拉取消息的核心代码
拉取消息的核心方法是pullSyncImpl
,在这个方法里实现了消息的拉取
private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.isRunning();
if (null == mq) {
throw new MQClientException("mq is null", null);
}
if (offset < 0) {
throw new MQClientException("offset < 0", null);
}
if (maxNums <= 0) {
throw new MQClientException("maxNums <= 0", null);
}
this.subscriptionAutomatically(mq.getTopic());
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
// 拉取消息
PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
mq,
subscriptionData.getSubString(),
subscriptionData.getExpressionType(),
isTagType ? 0L : subscriptionData.getSubVersion(),
offset,
maxNums,
sysFlag,
0,
this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
timeoutMillis,
CommunicationMode.SYNC,
null
);
// 对消息数据进行处理
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
// 如果namespace不是空的,则重置没有命名空间的Topic。
this.resetTopic(pullResult.getMsgFoundList());
// 把消息数据设置到上下文对象ConsumeMessageContext里
if (!this.consumeMessageHookList.isEmpty()) {
ConsumeMessageContext consumeMessageContext = null;
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(this.groupName());
consumeMessageContext.setMq(mq);
consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
consumeMessageContext.setSuccess(false);
this.executeHookBefore(consumeMessageContext);
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
return pullResult;
}