问题描述
现在我有一些代码可以大致执行以下操作
Right now I have some code that does roughly the following
def generator():
while True:
value = do_some_lengthy_IO()
yield value
def model():
for datapoint in generator():
do_some_lengthy_computation(datapoint)
现在,I/O和计算是串行进行的.理想情况下,应该并行运行(生成器已准备好下一个值),因为它们只共享要传递的值,而没有其他共享.我开始研究这个问题,并对multiprocessing
,threading
和async
东西感到非常困惑,无法获得一个最小的工作示例.另外,由于其中一些功能似乎是最新功能,因此我正在使用Python 3.6.
Right now, the I/O and the computation happen in serial. Ideally the should be running concurrently (the generator having ready the next value) since they share nothing but the value being passed. I started looking into this and got very confused with the multiprocessing
, threading
, and async
stuff and could not get a minimal working example going. Also, since some of this seems to be recent features, I am using Python 3.6.
推荐答案
我最终弄清楚了它.最简单的方法是使用multiprocessing
包,并使用管道与子进程进行通信.我写了一个可以使用任何生成器的包装器
I ended up figuring it out. The simplest way is to use the multiprocessing
package and use a pipe to communicate with the child process. I wrote a wrapper that can take any generator
import time
import multiprocessing
def bg(gen):
def _bg_gen(gen, conn):
while conn.recv():
try:
conn.send(next(gen))
except StopIteration:
conn.send(StopIteration)
return
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=_bg_gen, args=(gen, child_conn))
p.start()
parent_conn.send(True)
while True:
parent_conn.send(True)
x = parent_conn.recv()
if x is StopIteration:
return
else:
yield x
def generator(n):
for i in range(n):
time.sleep(1)
yield i
#This takes 2s/iteration
for i in generator(100):
time.sleep(1)
#This takes 1s/iteration
for i in bg(generator(100)):
time.sleep(1)
目前唯一缺少的是,对于无限生成器,该过程永远不会终止,但是可以通过执行parent_conn.send(False)
轻松添加.
The only missing thing right now is that for infinite generators the process is never killed but that can be easily added by doing a parent_conn.send(False)
.
这篇关于使python生成器在后台运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!