我现在有大量文档要处理,并且正在使用Python RQ来并行化任务。

我希望完成一系列工作,因为每个文档都执行不同的操作。例如:A-> B-> C表示将文档传递给A函数,完成A后,继续进行B和最后一个C

但是,Python RQ似乎不能很好地支持管道内容。

这是一个简单但有点脏的操作。简而言之,管道中的每个函数都以嵌套的方式调用其下一个函数。

例如,对于管道A-> B-> C

在顶层,一些代码是这样写的:
q.enqueue(A, the_doc)
其中q是Queue实例,在A函数中有类似以下的代码:
q.enqueue(B, the_doc)
B中,是这样的:
q.enqueue(C, the_doc)
还有其他方法比这更优雅吗?例如 ONE 函数中的一些代码:
q.enqueue(A, the_doc)q.enqueue(B, the_doc, after = A)q.enqueue(C, the_doc, after= B)
depends_on参数是最接近我的要求的参数,但是运行如下命令:
A_job = q.enqueue(A, the_doc)q.enqueue(B, depends_on=A_job )
将无法正常工作。由于在执行q.enqueue(B, depends_on=A_job )之后立即执行A_job = q.enqueue(A, the_doc)。到B入队时,A的结果可能尚未准备好,因为需要花费一些时间来处理。

PS:

如果Python RQ不能真正做到这一点,那么我可以使用Python中的其他工具来达到相同的目的:

  • 循环并行化
  • 管道处理支持
  • 最佳答案



    我不确定当您最初发布问题时这是否真的成立,但是无论如何现在都不是真的。实际上,depends_on功能正是针对您所描述的工作流程而制作的。

    确实,这两个功能是立即连续执行的。

    A_job = q.enqueue(A, the_doc)
    B_job = q.enqueue(B, depends_on=A_job )
    

    但是在B完成之前,工作人员将不会执行A。在成功执行A_job之前,请先执行B.status == 'deferred'。一旦A.status == 'finished',然后B将开始运行。

    这意味着BC可以访问并对其依赖项的结果进行操作,如下所示:

    import time
    from rq import Queue, get_current_job
    from redis import StrictRedis
    
    conn = StrictRedis()
    q = Queue('high', connection=conn)
    
    def A():
        time.sleep(100)
        return 'result A'
    
    def B():
        time.sleep(100)
        current_job = get_current_job(conn)
        a_job_id = current_job.dependencies[0].id
        a_job_result = q.fetch_job(a_job_id).result
        assert a_job_result == 'result A'
        return a_job_result + ' result B'
    
    
    def C():
        time.sleep(100)
        current_job = get_current_job(conn)
        b_job_id = current_job.dependencies[0].id
        b_job_result = q.fetch_job(b_job_id).result
        assert b_job_result == 'result A result B'
        return b_job_result + ' result C'
    

    工作人员最终将打印'result A result B result C'

    另外,如果队列中有很多作业,并且B在执行之前可能要等待一会儿,您可能想大幅增加result_ttl或使用result_ttl=-1使其不确定。否则,在为result_ttl设置了许多秒后,将清除A的结果,在这种情况下B将不再能够访问它并返回所需的结果。

    但是,设置result_ttl=-1具有重要的内存含义。这意味着您的工作结果将永远不会被自动清除,并且内存将成比例地增长,直到您从redis中手动删除这些结果为止。

    10-04 10:58