我需要在特定的 celeryd 实例上运行一些任务。所以我配置了队列:

celery 配置文件:

CELERY_QUEUES = {
    'celery': {
        'exchange': 'celery',
        'binding_key': 'celery',
    },
    'import': {
        'exchange': 'import',
        'binding_key': 'import.products',
    },
}

CELERY_ROUTES = {
    'celery_tasks.import_tasks.test': {
        'queue': 'import',
        'routing_key': 'import.products',
    },
}

import_tasks.py:
@task
def test():
    print 'test'

@task(exchange='import', routing_key='import.products')
def test2
    print 'test2'

然后我开始 celeryd:
celeryd -c 2 -l INFO -Q import

并尝试执行这些任务。 'test' 执行但 'test2' 不执行。但我不想在 CELERY_ROUTES 中指定每个导入任务。如何在任务定义中指定哪个队列应该执行任务?

最佳答案

哦,忘了说我用send_task函数来执行任务了。并且此功能不导入任务。它只是将任务的名称发送到队列。

所以而不是这个:

from celery.execute import send_task

result = send_task(args.task, task_args, task_kwargs)

我写:
from celery import current_app as celery_app, registry as celery_registry

celery_imports = celery_app.conf.get('CELERY_IMPORTS')
if celery_imports:
    for module in celery_imports:
        __import__(module)

task = celery_registry.tasks.get(args.task)
if task:
    result = task.apply_async(task_args, task_kwargs)

关于python - celery 和路由,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/7659700/

10-12 19:38