我有
CELERY_CREATE_MISSING_QUEUES = True
CELERY_QUEUES
CELERY_DEFAULT_QUEUE = 'default'
(类型直接)这张票(https://github.com/celery/celery/issues/150)。
我看到在自定义路由器返回的路由中创建了新队列,我认为这是因为
CELERY_CREATE_MISSING_QUEUES
。现在在我运行的工作程序节点中,我没有传递
-Q
参数,它仅从似乎与文档一致的“默认”队列中使用-有什么方法可以让我的工作节点从所有队列(包括动态创建的队列)中使用?
谢谢,
最佳答案
需要告知工作人员这些自动或动态创建的队列,因此您需要一种方法来获取这些队列名称并存储它们(也许在创建时),或者如果您使用RabbitMQ作为代理,则可以从rabbitmqctl list_queues
获取它们,并且示例添加一个信号处理程序,以将这些动态队列添加到工作程序中以供使用。
例如,使用 celeryd_after_setup
信号:
from celery.signals import celeryd_after_setup
@celeryd_after_setup.connect
def add_dynamic_queue(sender, instance, **kwargs):
# get the dynamic queue, maybe stored somewhere
queue = 'dynamic_queue'
instance.app.amqp.queues.select_add(queue)
如果始终创建新的动态队列,则还可以使用以下命令命令工作程序在运行时从这些队列开始使用:
#command all workers to consume from the 'dynamic_queue' queue
app.control.add_consumer('dynamic_queue', reply=True)
# command specific workers
app.control.add_consumer('dynamic_queue', reply=True, destination=[w1@example])
参见Adding Consumers。
希望对您有所帮助,当我获得更多有关此信息的信息时,我将对其进行编辑。