我现在有大量文档要处理,并且正在使用Python RQ来并行化任务。
我希望完成一系列工作,因为每个文档都执行不同的操作。例如:A
-> B
-> C
表示将文档传递给A
函数,完成A
后,继续进行B
和最后一个C
。
但是,Python RQ似乎不能很好地支持管道内容。
这是一个简单但有点脏的操作。简而言之,管道中的每个函数都以嵌套的方式调用其下一个函数。
例如,对于管道A
-> B
-> C
。
在顶层,一些代码是这样写的:q.enqueue(A, the_doc)
其中q是Queue
实例,在A
函数中有类似以下的代码:q.enqueue(B, the_doc)
在B
中,是这样的:q.enqueue(C, the_doc)
还有其他方法比这更优雅吗?例如 ONE 函数中的一些代码:q.enqueue(A, the_doc)q.enqueue(B, the_doc, after = A)q.enqueue(C, the_doc, after= B)
depends_on参数是最接近我的要求的参数,但是运行如下命令:A_job = q.enqueue(A, the_doc)q.enqueue(B, depends_on=A_job )
将无法正常工作。由于在执行q.enqueue(B, depends_on=A_job )
之后立即执行A_job = q.enqueue(A, the_doc)
。到B入队时,A的结果可能尚未准备好,因为需要花费一些时间来处理。
PS:
如果Python RQ不能真正做到这一点,那么我可以使用Python中的其他工具来达到相同的目的:
最佳答案
我不确定当您最初发布问题时这是否真的成立,但是无论如何现在都不是真的。实际上,depends_on
功能正是针对您所描述的工作流程而制作的。
确实,这两个功能是立即连续执行的。
A_job = q.enqueue(A, the_doc)
B_job = q.enqueue(B, depends_on=A_job )
但是在
B
完成之前,工作人员将不会执行A
。在成功执行A_job
之前,请先执行B.status == 'deferred'
。一旦A.status == 'finished'
,然后B
将开始运行。这意味着
B
和C
可以访问并对其依赖项的结果进行操作,如下所示:import time
from rq import Queue, get_current_job
from redis import StrictRedis
conn = StrictRedis()
q = Queue('high', connection=conn)
def A():
time.sleep(100)
return 'result A'
def B():
time.sleep(100)
current_job = get_current_job(conn)
a_job_id = current_job.dependencies[0].id
a_job_result = q.fetch_job(a_job_id).result
assert a_job_result == 'result A'
return a_job_result + ' result B'
def C():
time.sleep(100)
current_job = get_current_job(conn)
b_job_id = current_job.dependencies[0].id
b_job_result = q.fetch_job(b_job_id).result
assert b_job_result == 'result A result B'
return b_job_result + ' result C'
工作人员最终将打印
'result A result B result C'
。另外,如果队列中有很多作业,并且
B
在执行之前可能要等待一会儿,您可能想大幅增加result_ttl
或使用result_ttl=-1
使其不确定。否则,在为result_ttl
设置了许多秒后,将清除A的结果,在这种情况下B
将不再能够访问它并返回所需的结果。但是,设置
result_ttl=-1
具有重要的内存含义。这意味着您的工作结果将永远不会被自动清除,并且内存将成比例地增长,直到您从redis中手动删除这些结果为止。