问题描述
我正在尝试启动celery worker,因此它仅侦听单个队列.这不是问题,我可以这样:
I'm trying to start celery worker so it only listens to single queue. This is not a problem, I can do this that way:
python -m celery worker -A my_module -Q my_queue -c 1
但是现在我也希望这个 my_queue
队列成为广播队列,所以我在celeryconfig中做到这一点:
But now I also want this my_queue
queue to be a broadcast queue, so I do this in my celeryconfig:
from kombu.common import Broadcast
CELERY_QUEUES = (Broadcast('my_queue'),)
但是,一旦执行此操作,我就无法再开始工作了,我从Rabbitmq收到错误消息:
But as soon as I do this I cannot start my worker anymore, I get error from rabbitmq:
amqp.exceptions.PreconditionFailed: Exchange.declare: (406) PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'my_queue' in vhost 'myvhost': received 'fanout' but current is 'direct'
如果我开始没有 -Q
的工作程序(但如上所述将 Broadcast
保留在 celeryconfig.py
中),并且我列出了Rabbitmq队列,我可以看到广播队列已创建并命名为:
If I start worker without -Q
(but leaving Broadcast
in celeryconfig.py
as described above) and I list rabbitmq queues I can see broadcast queue is created and named like this:
bcast.43fecba7-786a-461c-a322-620039b29b8b
并且类似地,如果我在worker中定义此队列(如上所述,使用 -Q
)或在 celeryconfig.py
中将其定义为简单的 Queue
这个:
And similarly if I define this queue within worker (using -Q
as mentioned above) or as simple Queue
in celeryconfig.py
like this:
from kombu import Queue
CELERY_QUEUES = (Queue('my_queue'),)
我可以在Rabbitmq中看到这样的队列:
I can see this queue in rabbitmq like this:
my_queue
在定义队列时,我放入 Broadcast
调用中的内容并不重要-这似乎是内部芹菜名称,没有传递给Rabbitmq.
It apperas it does not matter what I put into Broadcast
call when defining the queue - this seems to be internal celery name, not passed to rabbitmq.
所以我猜测何时启动工作程序,然后创建 my_queue
,一旦完成,就无法进行 Broadcast
.
So I'm guessing when worker is starting then my_queue
is created and once that's done it cannot be made Broadcast
.
我可以拥有一个侦听任何队列(不仅可以监听 my_queue
的队列)的工人,而该队列可以通过删除 -Q
参数来开始.但是能够有一个只监听特定队列的进程将是一件很不错的事,因为我在那儿丢的任务很快,而且我想尽可能降低延迟.
I can have a worker that listens to any queue (not only to my_queue
) which I would start by removing the -Q
argument. But it would be nice to be able to have a single process that only listens to that particular queue since my tasks I throw in there are fast and I'd like to bring latency down as much as possible.
-编辑1-花了一些时间解决此问题,并且上述 bcast
队列似乎并不一致.重置Rabbitmq并在没有 -Q
选项 bcast
队列的情况下运行celery之后,没有出现...
--- Edit 1 ---Spent some time with this problem and it seems bcast
queue mentioned above does not appear consistently. After reseting rabbitmq and running celery without -Q
option bcast
queue did not appear...
推荐答案
在使用代理发送消息时,客户端和工作进程必须同意相同的配置值.如果必须更改配置,则需要清除现有消息并重新启动所有消息,以使它们保持同步.
When using a broker for sending messages, client and workers must agree on same configuration values. If you have to change config, you need to purge existing messages and restart everything so that they are in sync.
启动广播队列时,您可以设置交换类型并配置队列.
When starting a broadcast queue you can set exchange type and configure the queue.
from kombu.common import Broadcast
from kombu import Exchange
exchange = Exchange('custom_exchange', type='fanout')
CELERY_QUEUES = (
Broadcast(name='bcast', exchange=exchange),
)
现在您可以以...开始工作
Now you can start worker with
celery worker -l info -A tasks -Q bcast
这篇关于启动celery worker并将其启用广播队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!