对于未提供MVCE,我事先表示歉意-不幸的是,由于这个问题的性质,它不能很好地适用于最少的示例。尽管如此,我认为如果没有MVCE,它仍然会很负责任。

我有一个任务列表,客户端可以从中选择要在Flask中创建的任务子集。我创建如下任务:

current_app.logger.info("Creating list of chained tasks..")
chains = [functools.reduce(
    lambda x, y: x | y.s(foo, bar), remaining_tasks, first_task.s(foo, bar)
) for foo in foos]


所有任务都具有相似的功能签名,类似于

@celery.task
def my_task(baz, foo, bar):
    # ...
    return baz


我尝试通过以下方式执行该组:

current_app.logger.info("Created a group of chained tasks..")
g = group(*chains)
res = g.apply_async(args=(baz,), queue="default")


我发现调用apply_async时会引发两个异常:

Traceback (most recent call last):
  File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/utils/functional.py", line 209, in __getitem__
    return self.__consumed[index]
IndexError: list index out of range




File "/Users/erip/Code/whatever.py", line 101, in blahblah
    res = g.apply_async(args=(baz,), queue="default")
  File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/canvas.py", line 977, in apply_async
    app = self.app
  File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/canvas.py", line 1144, in app
    app = self.tasks[0].app
  File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/utils/functional.py", line 213, in __getitem__
    self.__consumed.append(next(self.__it))
TypeError: 'Signature' object is not an iterator


docs表示链的构造是有效的,所以我不明白为什么异步应用程序会引起问题。

我的目标是创建一组异步应用的len(foos)链。我发现这种行为仅在len(foos) == 1时发生。

有人遇到过这个问题吗?

最佳答案

我遇到了类似的问题,芹菜文档具有以下注意事项:


  如果仅传递一个参数,并且该参数是可迭代的
     然后将其用作任务列表:
     允许我们将group与生成器表达式一起使用。


看一下Group类的构造函数。如果我们仅传递一个签名来初始化组对象,则此签名将被视为生成器。

def __init__(self, *tasks, **options):
    if len(tasks) == 1:
        tasks = tasks[0]
        if isinstance(tasks, group):
            tasks = tasks.tasks
        if not isinstance(tasks, _regen):
            tasks = regen(tasks)
    Signature.__init__(
        self, 'celery.group', (), {'tasks': tasks}, **options
    )
    self.subtask_type = 'group'


就您而言,您可以通过以下方式简单地执行一组任务:

current_app.logger.info("Created a group of chained tasks..")
if len(chains) == 1:
    g = group(chains)
else:
    g = group(*chains)
res = g.apply_async(args=(baz,), queue="default")

关于python - 为什么一组这样的链条构成在Celery中引起异常?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/41371933/

10-12 23:09