问题描述
消费者收到消息后,消费者/工人做一些验证,然后调用网络服务.在此阶段,如果发生任何错误或验证失败,我们希望将消息放回最初消费的队列.
After the consumer gets a message, consumer/worker does some validations and then call web service. In this phase, if any error occurs or validation fails, we want the message put back to the queue it was originally consumed from.
我已阅读 RabbitMQ 文档.但是我对reject、nack和cancel方法之间的区别感到困惑.
I have read RabbitMQ documentation. But I am confused about differences between reject, nack and cancel methods.
推荐答案
简短回答:
要重新排队特定消息,您可以选择 basic.reject
或 basic.nack
并将 multiple
标志设置为 false.
To requeue specific message you can pick both basic.reject
or basic.nack
with multiple
flag set to false.
basic.consume
调用也可能导致消息重新传递,如果您使用消息确认并且在特定时间消费者上有未确认的消息并且消费者在没有确认它们的情况下退出.
basic.consume
calling may also results to messages redelivering if you are using message acknowledge and there are un-acknowledged message on consumer at specific time and consumer exit without ack-ing them.
basic.recover
将重新发送特定频道上所有未确认的消息.
basic.recover
will redeliver all un-acked messages on specific channel.
长答案:
basic.reject
和 basic.nack
都用于相同的目的 - 删除或重新排队无法由特定消费者处理的消息(在给定时刻,在某些条件下或根本无法处理).它们之间的主要区别在于 basic.nack
支持批量消息处理,而 basic.reject
不支持.
basic.reject
and basic.nack
both serves to same purpose - drop or requeue message that can't be handled by specific consumer (at the given moment, under certain conditions or at all). The main difference between them is that basic.nack
supports bulk messages processing, whilst basic.reject
doesn't.
官方 RabbitMQ 网站上的否定确认文章中描述了这种差异:
This difference described in Negative Acknowledgements article on official RabbitMQ web site:
AMQP 规范定义了 basic.reject
方法,该方法允许客户端拒绝单独的、已传递的消息,指示代理丢弃它们或重新排队.不幸的是,basic.reject
不支持批量否定确认消息.
为了解决这个问题,RabbitMQ 支持 basic.nack
方法,该方法提供了 basic.reject
的所有功能,同时还允许批量处理消息强>.
To solve this, RabbitMQ supports the basic.nack
method that provides all the functionality of basic.reject
whilst also allowing for bulk processing of messages.
要批量拒绝消息,客户端将 basic.nack
方法的 multiple
标志设置为 true
.然后,代理将拒绝所有未确认的、已传递的消息,直到并包括在 basic.nack
方法的 delivery_tag
字段中指定的消息.在这方面,basic.nack
补充了 basic.ack
.
To reject messages in bulk, clients set the multiple
flag of the basic.nack
method to true
. The broker will then reject all unacknowledged, delivered messages up to and including the message specified in the delivery_tag
field of the basic.nack
method. In this respect, basic.nack
complements the bulk acknowledgement semantics of basic.ack
.
请注意,basic.nack
方法是 RabbitMQ 特定的扩展,而 basic.reject
方法是 AMQP 0.9.1 规范的一部分.
Note, that basic.nack
method is RabbitMQ-specific extension while basic.reject
method is part of AMQP 0.9.1 specification.
至于 basic.cancel
方法,用于通知服务器客户端停止消息消费.请注意,客户端可能会在 basic.cancel
方法发送接收 cancel-ok
回复之间接收任意消息编号.如果客户端使用消息确认并且它有任何未确认的消息,它们将被移回它们最初被消费的队列.
As to basic.cancel
method, it used to notify server that client stops message consuming. Note, that client may receive arbitrary messages number between basic.cancel
method sending an receiving the cancel-ok
reply. If message acknowledge is used by client and it has any un-acknowledged messages they will be moved back to the queue they originally was consumed from.
basic.recover
在 RabbitMQ 中有一些限制:- basic.recover with requeue=false- 基本.恢复同步性
除了勘误表,根据 RabbitMQ 规范 basic.recover
有部分支持(不支持 requeue=false 的恢复.)
In addition to errata, according to RabbitMQ specs basic.recover
has partial support (Recovery with requeue=false is not supported.)
注意基本.消费
:
当 basic.consume
开始时没有自动确认 (noack=false
) 并且有一些未确认的待处理消息,然后当消费者被取消(死亡、致命错误、异常等)时,该待处理消息消息将被重新发送.从技术上讲,在消费者释放它们(ack/nack/reject/recover)之前,不会处理待处理的消息(甚至是死信).只有在此之后它们才会被处理(例如,deadlettered).
When basic.consume
started without auto-ack (noack=false
) and there are some pending messages non-acked messages, then when consumer get canceled (dies, fatal error, exception, whatever) that pending messages will be redelivered. Technically, that pending messages will not be processed (even dead-lettered) until consumer release them (ack/nack/reject/recover). Only after that they will be processed (e.g. deadlettered).
例如,假设我们最初连续发布 5 条消息:
For example, let say we post originally 5 message in a row:
Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)
然后消费其中的3个,但不确认它们,然后取消消费.我们会有这样的情况:
And then consume 3 of them, but not ack them, and then cancel consumer. We will have this situation:
Queue(main) (tail) { [4] [3] [2*] [1*] [0*] } (head)
其中 star (*
) 表示 redelivered
标志设置为 true
.
where star (*
) notes that redelivered
flag set to true
.
假设我们有死信交换集和死信消息队列的情况
Assume that we have situation with dead-lettered exchange set and queue for dead-lettered messages
Exchange(e-main) Exchange(e-dead)
Queue(main){x-dead-letter-exchange: "e-dead"} Queue(dead)
假设我们发布了 5 条消息,expire
属性设置为 5000
(5 秒):
And assume we post 5 message with expire
property set to 5000
(5 sec):
Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)
Queue(dead) (tail) { }(head)
然后我们从 main
队列中消费 3 条消息并保持它们 10 秒:
and then we consume 3 message from main
queue and hold them for 10 second:
Queue(main) (tail) { [2!] [1!] [0!] } (head)
Queue(dead) (tail) { [4*] [3*] } (head)
其中感叹号 (!
) 代表未确认的消息.此类消息无法传递给任何消费者,并且通常无法在管理面板中查看.但是让我们取消消费者,记住,它仍然持有 3 条未确认的消息:
where exclamation mark (!
) stands for unacked message. Such messages can't be delivered to any consumer and they normally can't be viewed in management panel. But let's cancel consumer, remember, that it still hold 3 un-acked message:
Queue(main) (tail) { } (head)
Queue(dead) (tail) { [2*] [1*] [0*] [4*] [3*] } (head)
所以现在头部中的 3 条消息放回原始队列,但是由于它们设置了每条消息的 TTL,它们被死信发送到死信队列的尾部(当然,通过死信交换).
So now that 3 messages which was in the head put back to original queue, but as they has per-message TTL set, they are dead-lettered to the tail of dead-letter queue (sure, via dead-letter exchange).
附注:
消费消息又名侦听新消息与直接队列访问(获取一条或多条消息而不关心其他消息)在某种程度上有所不同.请参阅basic.get
更多方法说明.
Consuming message aka listening for new one is somehow different from direct queue access (getting one or more message without taking care of others). See basic.get
method description for more.
这篇关于如何在 RabbitMQ 中重新排队消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!