我有

  • 设置CELERY_CREATE_MISSING_QUEUES = True
  • 未定义CELERY_QUEUES
  • 定义的CELERY_DEFAULT_QUEUE = 'default'(类型直接)
  • 一个自定义路由器类,可动态创建路由,如下所示
    这张票(https://github.com/celery/celery/issues/150)。

  • 我看到在自定义路由器返回的路由中创建了新队列,我认为这是因为CELERY_CREATE_MISSING_QUEUES

    现在在我运行的工作程序节点中,我没有传递-Q参数,它仅从似乎与文档一致的“默认”队列中使用-



    有什么方法可以让我的工作节点从所有队列(包括动态创建的队列)中使用?

    谢谢,

    最佳答案

    需要告知工作人员这些自动或动态创建的队列,因此您需要一种方法来获取这些队列名称并存储它们(也许在创建时),或者如果您使用RabbitMQ作为代理,则可以从rabbitmqctl list_queues获取它们,并且示例添加一个信号处理程序,以将这些动态队列添加到工作程序中以供使用。

    例如,使用 celeryd_after_setup 信号:

    from celery.signals import celeryd_after_setup
    
    @celeryd_after_setup.connect
    def add_dynamic_queue(sender, instance, **kwargs):
        # get the dynamic queue, maybe stored somewhere
        queue = 'dynamic_queue'
        instance.app.amqp.queues.select_add(queue)
    

    如果始终创建新的动态队列,则还可以使用以下命令命令工作程序在运行时从这些队列开始使用:
    #command all workers to consume from the 'dynamic_queue' queue
    app.control.add_consumer('dynamic_queue', reply=True)
    
    # command specific workers
    app.control.add_consumer('dynamic_queue', reply=True, destination=[w1@example])
    

    参见Adding Consumers

    希望对您有所帮助,当我获得更多有关此信息的信息时,我将对其进行编辑。

    10-07 13:30
    查看更多