问题描述
我正在编写一个应用程序,该应用程序需要并行运行一系列任务,然后运行所有任务的结果的单个任务:
I'm writing an application which needs to run a series of tasks in parallel and then a single task with the results of all the tasks run:
@celery.task
def power(value, expo):
return value ** expo
@celery.task
def amass(values):
print str(values)
这是一个非常人为和过于简化的示例,但希望这一点很好。基本上,我有很多项需要通过 power
运行,但是我只想运行 amass
有关所有任务的结果。所有这些都应该异步发生,并且我不需要 amass
方法返回的任何东西。
It's a very contrived and oversimplified example, but hopefully the point comes across well. Basically, I have many items which need to run through power
, but I only want to run amass
on the results from all of the tasks. All of this should happen asynchronously, and I don't need anything back from the amass
method.
有人吗?知道如何在celery上进行设置,以便所有操作都异步执行,并在说完所有步骤后调用带有结果列表的单个回调吗?
Does anyone know how to set this up in celery so that everything is executed asynchronously and a single callback with a list of the results is called after all is said and done?
ve按照亚历山大·阿凡纳西耶夫(Alexander Afanasiev)的建议,将本示例设置为以和弦
运行:
I've setup this example to run with a chord
as Alexander Afanasiev recommended:
from time import sleep
import random
tasks = []
for i in xrange(10):
tasks.append(power.s((i, 2)))
sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
callback = amass.s()
r = chord(tasks)(callback)
不幸的是,在上面例如,仅在调用 chord
方法时启动 task
中的所有任务。有没有一种方法可以使每个任务分别开始,然后可以将回调添加到组中以便在一切完成后运行?
Unfortunately, in the above example, all tasks in tasks
are started only when the chord
method is called. Is there a way that each task can start separately and then I could add a callback to the group to run when everything has finished?
推荐答案
这是一个对我有用的解决方案:
Here's a solution which worked for my purposes:
tasks.py :
from time import sleep
import random
@celery.task
def power(value, expo):
sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
return value ** expo
@celery.task
def amass(results, tasks):
completed_tasks = []
for task in tasks:
if task.ready():
completed_tasks.append(task)
results.append(task.get())
# remove completed tasks
tasks = list(set(tasks) - set(completed_tasks))
if len(tasks) > 0:
# resend the task to execute at least 1 second from now
amass.delay(results, tasks, countdown=1)
else:
# we done
print results
用例:
tasks = []
for i in xrange(10):
tasks.append(power.delay(i, 2))
amass.delay([], tasks)
什么应该要做的是尽快异步启动所有任务。将它们全部发布到队列后, amass
任务也将发布到队列中。 amass任务将继续重新发布自身,直到所有其他任务都已完成。
What this should do is start all of the tasks as soon as possible asynchronously. Once they've all been posted to the queue, the amass
task will also be posted to the queue. The amass task will keep reposting itself until all of the other tasks have been completed.
这篇关于完成所有任务后运行任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!