我有一个RabbitMQ客户端应用程序,它监听特定的队列。客户端创建DefaultConsumer的实例,并实现handleDelivery方法。这是代码

    protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();

    public void receiveMessages() {
        try {
//            channel.basicQos(pollCount);
            Message message = new Message();
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    long deliveryTag = envelope.getDeliveryTag();
                    String response = new String(body, "UTF-8");
                    if (response != null) {
                        message.setId(NUID.nextGlobal());
                        message.setPayload(response);
                        message.setDeliveryTag(deliveryTag);
                        messages.add(message);
                        logger.info("Message received: ", message.getPayload());
                    }
                }
            };
            logger.debug("**********Channel status: " + channel.isOpen());
            channel.basicConsume(queueName, false, consumer);
        } catch (Exception e) {
            logger.error("Exception while getting messages from Rabbit ", e);

        }
    }


每隔500毫秒就经常通过线程调用一次receiveMessages()方法,并将消息排放到另一个List中以供使用。由于对receiveMessages()进行了这项民意调查,因此我观察到,通过如图所示的Rabbit控制台查看时,消费者标签正在不断创建和增长。看到那些不断增长的消费者标签是否正常?
java - RabbitMQ DefaultConsumer导致过多的消费者标签-LMLPHP

最佳答案

看到那些不断增长的消费者标签是否正常?


不,您的代码有错误。您只需要使用长期运行的使用者,或者在使用完毕后就必须取消其使用者。

我看不到需要对“ receiveMessages”进行“轮询”的任何方法,只需让它独立运行即可,它将按您的期望将消息添加到同步队列中。



注意:RabbitMQ团队监视rabbitmq-users mailing list,并且有时仅在StackOverflow上回答问题。

07-27 20:25