我正在尝试创建一个将订阅多个队列的使用者,然后在消息到达时对其进行处理。

问题是,当第一个队列中已经有一些数据时,它将消耗第一个队列,而永远不会消耗第二个队列。
但是,当第一个队列为空时,它会转到下一个队列,然后同时使用两个队列。

我首先实现了线程,但是当pika库为我做到这一点而没有太多复杂性时,我想避免使用它。下面是我的代码:

import pika

mq_connection = pika.BlockingConnection(pika.ConnectionParameters('x.x.x.x'))
mq_channel = mq_connection.channel()
mq_channel.basic_qos(prefetch_count=1)


def callback(ch, method, properties, body):
    print body
    mq_channel.basic_ack(delivery_tag=method.delivery_tag)

mq_channel.basic_consume(callback, queue='queue1', consumer_tag="ctag1.0")
mq_channel.basic_consume(callback, queue='queue2', consumer_tag="ctag2.0")
mq_channel.start_consuming()

最佳答案

一种可能的解决方案是使用非阻塞连接并使用消息。

import pika


def callback(channel, method, properties, body):
    print(body)
    channel.basic_ack(delivery_tag=method.delivery_tag)


def on_open(connection):
    connection.channel(on_channel_open)


def on_channel_open(channel):
    channel.basic_consume(callback, queue='queue1')
    channel.basic_consume(callback, queue='queue2')


parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
connection = pika.SelectConnection(parameters=parameters,
                                   on_open_callback=on_open)

try:
    connection.ioloop.start()
except KeyboardInterrupt:
    connection.close()

这将连接到多个队列,并将相应地使用消息。

10-05 23:11