我需要在特定的 celeryd 实例上运行一些任务。所以我配置了队列:
celery 配置文件:
CELERY_QUEUES = {
'celery': {
'exchange': 'celery',
'binding_key': 'celery',
},
'import': {
'exchange': 'import',
'binding_key': 'import.products',
},
}
CELERY_ROUTES = {
'celery_tasks.import_tasks.test': {
'queue': 'import',
'routing_key': 'import.products',
},
}
import_tasks.py:
@task
def test():
print 'test'
@task(exchange='import', routing_key='import.products')
def test2
print 'test2'
然后我开始 celeryd:
celeryd -c 2 -l INFO -Q import
并尝试执行这些任务。 'test' 执行但 'test2' 不执行。但我不想在 CELERY_ROUTES 中指定每个导入任务。如何在任务定义中指定哪个队列应该执行任务?
最佳答案
哦,忘了说我用send_task函数来执行任务了。并且此功能不导入任务。它只是将任务的名称发送到队列。
所以而不是这个:
from celery.execute import send_task
result = send_task(args.task, task_args, task_kwargs)
我写:
from celery import current_app as celery_app, registry as celery_registry
celery_imports = celery_app.conf.get('CELERY_IMPORTS')
if celery_imports:
for module in celery_imports:
__import__(module)
task = celery_registry.tasks.get(args.task)
if task:
result = task.apply_async(task_args, task_kwargs)
关于python - celery 和路由,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/7659700/