假设我有一个包含五个项目的队列:

(tail) E, D, C, B, A (head)

我从这个队列的头部消费消息,但决定消息 A 目前不适合处理。我用 reject 对该项目进行 requeue=True 编码,队列变为:
(tail) A, E, D, C, B (head)

然后我使用 BCDEack 。现在队列只包含 A ,我在一个永无止境的循环中一遍又一遍地不断消耗和 reject 。如果一个新的、非 A 消息进来,它几乎立即被消耗,然后进程继续尝试消耗 A 的循环。

我通过对 Pika 文档中的 Twisted Consumer Example 稍加修改来做到这一点:
import pika
from pika import exceptions
from pika.adapters import twisted_connection
from twisted.internet import defer, reactor, protocol,task


@defer.inlineCallbacks
def run(connection):

    channel = yield connection.channel()

    exchange = yield channel.exchange_declare(exchange='topic_link',type='topic')

    queue = yield channel.queue_declare(queue='hello', auto_delete=False, exclusive=False)

    yield channel.queue_bind(exchange='topic_link',queue='hello',routing_key='hello.world')

    #yield channel.basic_qos(prefetch_count=1)

    queue_object, consumer_tag = yield channel.basic_consume(queue='hello',no_ack=False)

    l = task.LoopingCall(read, queue_object)

    l.start(0.01)


@defer.inlineCallbacks
def read(queue_object):

    ch,method,properties,body = yield queue_object.get()

    print body

    if body == 'A':
        yield ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
    else:
        yield ch.basic_ack(delivery_tag=method.delivery_tag)


parameters = pika.ConnectionParameters()
cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters)
d = cc.connectTCP('hostname', 5672)
d.addCallback(lambda protocol: protocol.ready)
d.addCallback(run)
reactor.run()

问题: 请注意以下注释掉的行:
#yield channel.basic_qos(prefetch_count=1)

当我取消注释并且消费者到达消息 A 时,它​​会在 reject 之后立即再次拿起它,忽略可能在它后面的队列中等待的任何其他项目。它不会将被拒绝的项目放在队列的尾部,而是一遍又一遍地重复尝试,完全阻塞队列中的其他所有内容。

注释掉该行后,它可以正常工作(尽管慢了一点)。如果该行存在并且 prefetch_count > 1 ,它也可以工作。将其设置为完全 1 会触发此行为。

我在拒绝消息 A 时遗漏了一个步骤吗?还是 Pika 的预取系统与这种边缘情况根本不兼容?

最佳答案

如果您只有一个消费者,那么 RabbitMQ 除了向被拒绝的同一个消费者发送消息外别无他法(无论如何:使用 basic.reject 或 basic.nack)。

当您设置 prefetch_count > 1 时,您的消费者将拥有您的循环消息以及从头到循环的新消息(字面意思是,您的循环消息将留在头上)。

如果您不小心收到带有 N*M 和消费者编号 prefetch_count <= N 的 M 循环消息,您将循环所有消息(这会导致 CPU 烧毁等),因此检查 rejected 消息标志并使用一些高级消息可能是一个不错的选择如果消息已经重新传递,则逻辑。

关于python - 当 prefetch_count == 1 时拒绝和重新排队 RabbitMQ 任务,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/24333840/

10-11 17:53