http://activemq.apache.org/destination-options.html

1. consumer 的配置参数如下图:

ActiveMQ 到底是推还是拉?-LMLPHP

配置consumer的示例:

public void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://localhost:61616"); // Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
connection.setExceptionListener(this);
// Create a Session
ActiveMQSession session =
          (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
// 此时加入参数
ActiveMQQueue destination =
          (ActiveMQQueue) session.createQueue("TEST.FOO?consumer.prefetchSize=10");
// Create a MessageConsumer from the Session to the Topic or Queue
ActiveMQMessageConsumer consumer =
          (ActiveMQMessageConsumer) session.createConsumer(destination);
// 打印出prefetchSize参数值
System.out.println("prefetchSize=" + consumer.getPrefetchNumber());
// Wait for a message
Message message = consumer.receive(); if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received: " + text);
} else {
System.out.println("Received: " + message);
} consumer.close();
session.close();
connection.close();
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}

在创建Queue的时候,配置以url形式跟在队列名后面:session.createQueue("TEST.FOO?consumer.prefetchSize=10")

consumer的prefetchSize参数默认为1000。

consumer 有推和拉2种方式获取消息:当 prefetchSize = 0 时,pull;当 prefetchSize > 0 时,push。

2. broker分发消息的逻辑在org.apache.activemq.broker.region.Queue.doActualDispatch方法中:

private PendingList doActualDispatch(PendingList list) throws Exception {
List<Subscription> consumers;
consumersLock.writeLock().lock(); try {
if (this.consumers.isEmpty()) {
// slave dispatch happens in processDispatchNotification
return list;
}
consumers = new ArrayList<Subscription>(this.consumers);
} finally {
consumersLock.writeLock().unlock();
} Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size()); for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
MessageReference node = iterator.next();
Subscription target = null;
for (Subscription s : consumers) {
if (s instanceof QueueBrowserSubscription) {
continue;
}
if (!fullConsumers.contains(s)) {
if (!s.isFull()) {
if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
// Dispatch it.
s.add(node);
LOG.trace("assigned {} to consumer {}", node.getMessageId(), s.getConsumerInfo().getConsumerId());
iterator.remove();
target = s;
break;
}
} else {
// no further dispatch of list to a full consumer to
// avoid out of order message receipt
fullConsumers.add(s);
LOG.trace("Subscription full {}", s);
}
}
} if (target == null && node.isDropped()) {
iterator.remove();
} // return if there are no consumers or all consumers are full
if (target == null && consumers.size() == fullConsumers.size()) {
return list;
} // If it got dispatched, rotate the consumer list to get round robin
// distribution.
if (target != null && !strictOrderDispatch && consumers.size() > 1
&& !dispatchSelector.isExclusiveConsumer(target)) {
consumersLock.writeLock().lock();
try {
if (removeFromConsumerList(target)) {
addToConsumerList(target);
consumers = new ArrayList<Subscription>(this.consumers);
}
} finally {
consumersLock.writeLock().unlock();
}
}
} return list;
}

2层for循环,外面是消息,里面是consumer,只要consumer没有饱和,broker一直会给consumer分发消息。

对于一个consumer而言,未确认的消息数大于等于prefetchSize,则认为该consumer是饱的

// PrefetchSubscription
public boolean isFull() {
// 未确认的消息数 = 已发送给该consumer的消息数 - 收到确认的消息数
return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
}

因为 consumer 的 prefetchSize 参数默认为1000,所以 activeMQ 默认是推。而且是一条一条地推。

3. consumer获取消息有同步和异步两种方式:consumer.receive() 或 consumer.setMessageListener(listener)

对于 receive 方式,如果prefetchSize = 0 并且本地没有缓存消息,则发送一个pull 命令给broker;

否则,则从本地缓存中取消息。

// ActiveMQMessageConsumer
public Message receive() throws JMSException {
checkClosed();
checkMessageListener(); sendPullCommand(0);
MessageDispatch md = dequeue(-1);
if (md == null) {
return null;
} beforeMessageIsConsumed(md);
afterMessageIsConsumed(md, false); return createActiveMQMessage(md);
} protected void sendPullCommand(long timeout) throws JMSException {
clearDeliveredList();
if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
MessagePull messagePull = new MessagePull();
messagePull.configure(info);
messagePull.setTimeout(timeout);
session.asyncSendPacket(messagePull);
}
}

consumer 本地消息缓存在

// These are the messages waiting to be delivered to the client
protected final MessageDispatchChannel unconsumedMessages;

消息进入缓存有2条路线,调用栈分别如下:

(1)

ActiveMQ 到底是推还是拉?-LMLPHP

(2)

ActiveMQ 到底是推还是拉?-LMLPHP

consumer.setMessageListener 异步获取消息的调用栈如下:

ActiveMQ 到底是推还是拉?-LMLPHP

04-29 23:21