我正在尝试围绕这个异步任务处理设置。到目前为止,我一直在考虑使用 Celery,但还没有锁定任何东西。唯一的要求是我可以使用 Redis 作为代理并在多个节点上分发任务。

       ->  Task2  ->  Task3
Task1  ->  Task2  ->  Task3    [then]    Task4
       ->  Task2  ->  Task3

说明 :
  • Task1 生成项目列表
  • Task2 从 Task1 接收一项作为参数
  • Task2 和 Task3 是链式的,每条链都是并行执行的
  • Task4 在所有 Task2-Task3 链都完成时执行(不需要从 Task3 传递任何数据)

  • 那么问题是,我怎样才能用 celery 做到这一点?

    最佳答案

    可以使用和弦和链函数来完成,请看示例。它应该适合您的需求。

    from celery import Celery, chord, chain
    
    backend = 'redis://redis:6379/'
    app = Celery(result_backend=backend, backend=backend)
    
    
    @app.task
    def task1():
        argument = 123
        return chord([
            chain(task2.s(argument), task3.s()),
            chain(task2.s(argument), task3.s()),
            chain(task2.s(argument), task3.s()),
        ])(task4.s())
    
    
    @app.task
    def task2(argument):
        pass
    
    
    @app.task
    def task3(result_task2):
        pass
    
    
    @app.task
    def task4(result):
        pass
    
    
    task1.apply_async()
    

    10-07 15:14