我目前正在尝试使用 python 多处理。我使用的库是 multiprocess
(不是 multiprocessing
)。
我有以下代码,它创建了许多计算作业,并通过 map 操作运行它:
pool = multiprocess.Pool(4)
all_responses = pool.map_async(wrapper_singlerun, range(10000))
pool.join()
pool.close()
但是,每当我运行这段代码时,都会出现以下错误:
pool.join()
File "/Users/davidal/miniconda3/lib/python3.6/site-packages/multiprocess/pool.py", line 509, in join
assert self._state in (CLOSE, TERMINATE)
AssertionError
你知道为什么会发生这个错误吗?我之前使用过
pool.map_async
,但我认为我需要一个 pool rendez-vous
命令。否则,我的 PC 会创建类似 forkbomb 的东西,它创建了太多线程(至少,我认为它是这样做的......)任何想法表示赞赏!
最佳答案
问题是您在 join
之前调用 close
。
multiprocess
似乎是 missing its documentation ,但是,据我所知,它基本上是 stdlib multiprocessing
的一个分支,它预先为 dill
修补了 pickle
,所以 multiprocessing
文档在这里应该是相关的。 (另外,在评论中,你说你可以用 multiprocessing
重现这个问题。)
所以, Pool.join
说:
close
方法用于关闭队列的发送端,因此无法添加新任务。 join
方法用于等待队列中的所有内容被处理。在关闭队列之前等待队列耗尽是行不通的。
但是您是在 close
之后而不是之前调用 join
。 the first thing join
does 是 assert
,您已经调用了 close
或 terminate
,而您还没有调用,因此断言失败。
因此,您可能只想切换这两个调用的顺序。
或者,也许您对 join
的用途感到困惑,并认为您需要先调用它,然后才能使用 all_responses.get()
或 .wait()
。如果是这样——你不需要这样做; get
将阻塞,直到结果可用,之后您不需要 join
。这实际上更常见,尤其是 map
和 friend (尽管文档中的示例是通过 with Pool(…) as pool:
而不是手动调用池中的任何内容)。