我有一个RabbitMQ客户端应用程序,它监听特定的队列。客户端创建DefaultConsumer的实例,并实现handleDelivery方法。这是代码
protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
public void receiveMessages() {
try {
// channel.basicQos(pollCount);
Message message = new Message();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String response = new String(body, "UTF-8");
if (response != null) {
message.setId(NUID.nextGlobal());
message.setPayload(response);
message.setDeliveryTag(deliveryTag);
messages.add(message);
logger.info("Message received: ", message.getPayload());
}
}
};
logger.debug("**********Channel status: " + channel.isOpen());
channel.basicConsume(queueName, false, consumer);
} catch (Exception e) {
logger.error("Exception while getting messages from Rabbit ", e);
}
}
每隔500毫秒就经常通过线程调用一次receiveMessages()方法,并将消息排放到另一个List中以供使用。由于对receiveMessages()进行了这项民意调查,因此我观察到,通过如图所示的Rabbit控制台查看时,消费者标签正在不断创建和增长。看到那些不断增长的消费者标签是否正常?
最佳答案
看到那些不断增长的消费者标签是否正常?
不,您的代码有错误。您只需要使用长期运行的使用者,或者在使用完毕后就必须取消其使用者。
我看不到需要对“ receiveMessages
”进行“轮询”的任何方法,只需让它独立运行即可,它将按您的期望将消息添加到同步队列中。
注意:RabbitMQ团队监视rabbitmq-users
mailing list,并且有时仅在StackOverflow上回答问题。