我有一个 celery 任务,如:

from celery.task import task
from django.conf import settings
from base.tasks import BaseTask

@task(name="throw_exception", base=BaseTask)
def print_value(*args, **kwargs):
    print('BROKER_URL:', settings.BROKER_URL)

我在我的 virtualenv 中运行一个 Celery worker ,例如:
celery worker -A myproject -l info

worker 显示:
Connected to amqp://guest:**@127.0.0.1:5672/myapp

当我从 Django shell 启动我的任务时:
>>> from django.conf import settings
>>> settings.BROKER_URL
'amqp://guest:**@127.0.0.1:5672/myapp'
>>> from myapp.tasks import print_value
>>> print_value.delay()

我从未在我的 worker 日志中看到执行的任务。

但是,如果我改为将我的工作人员更改为使用带有默认“/”虚拟主机的 BROKER_URL,那么它会立即执行所有挂起的任务,这意味着即使设置了正确的 BROKER_URL,我对 print_value.delay() 的所有调用都将其发送到错误的虚拟主机。我究竟做错了什么?

编辑:问题似乎是 Celery 没有一致的 @task 装饰器,并且通过使用错误的装饰器,您会断开任务与代理设置的连接。所以基本上,我的所有任务都配置为使用默认代理,而不是我的设置中定义的代理。旧文档说使用 from celery.task import task 但新文档...并没有真正指定,似乎暗示您应该使用 app 文件中定义的 celery.py 实例,如 @app.task 。问题在于我的所有任务都在单独的 tasks.py 文件中,它们无法访问 app 实例。如果我将一个任务复制到我的 celery.py 并使用 @app.task 装饰器,那么它会使用正确的 vhost 并按预期工作,但很明显,这不是一个实用的修复,因为我必须将数十个函数复制到这个文件中。我该如何正确解决这个问题?

最佳答案

现在使用 Django + (Celery + RabbitMQ) 对我自己有同样的问题。我的解决办法是,CELERY_BROKER_URL=amqp://<user>:<password>@localhost:5672/<vhost>这是来自 RabbitMQ.com > Client Documentation > RabbitMQ URI Specification 的详细确认

对于它的值(value)...
... Celery + RabbitMQ 有很多事情要做。我正在用 rabbitmqctl list_vhosts 看 RabbitMQ——但我没有看到我的虚拟主机。跆拳道?最后,我意识到我在本地开发服务器上配置 supervisord 太早了。从 CLI 启动 Celery 给出了一堆反馈,supervisord 把这些反馈放在了一个不直接在我眼皮底下的地方,比如:[2021-02-19 18:26:52,803: WARNING/MainProcess] (0, 0): (403) ACCESS_REFUSED - Login was refused using authentication mechanism AMQPLAIN. For details see the broker logfile.AMQP 立刻让我想起了连接字符串。繁荣。有你的 vhost

关于python - 如何在特定虚拟主机上处理 Celery 任务?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47049240/

10-16 08:47