我目前正在尝试使用 python 多处理。我使用的库是 multiprocess (不是 multiprocessing )。

我有以下代码,它创建了许多计算作业,并通过 map 操作运行它:

pool = multiprocess.Pool(4)
all_responses = pool.map_async(wrapper_singlerun, range(10000))
pool.join()
pool.close()

但是,每当我运行这段代码时,都会出现以下错误:
    pool.join()
  File "/Users/davidal/miniconda3/lib/python3.6/site-packages/multiprocess/pool.py", line 509, in join
    assert self._state in (CLOSE, TERMINATE)
AssertionError

你知道为什么会发生这个错误吗?我之前使用过 pool.map_async,但我认为我需要一个 pool rendez-vous 命令。否则,我的 PC 会创建类似 forkbomb 的东西,它创建了太多线程(至少,我认为它是这样做的......)

任何想法表示赞赏!

最佳答案

问题是您在 join 之前调用 close

multiprocess 似乎是 missing its documentation ,但是,据我所知,它基本上是 stdlib multiprocessing 的一个分支,它预先为 dill 修补了 pickle ,所以 multiprocessing 文档在这里应该是相关的。 (另外,在评论中,你说你可以用 multiprocessing 重现这个问题。)

所以, Pool.join 说:


close 方法用于关闭队列的发送端,因此无法添加新任务。 join 方法用于等待队列中的所有内容被处理。在关闭队列之前等待队列耗尽是行不通的。

但是您是在 close 之后而不是之前调用 jointhe first thing join doesassert ,您已经调用了 closeterminate ,而您还没有调用,因此断言失败。

因此,您可能只想切换这两个调用的顺序。

或者,也许您对 join 的用途感到困惑,并认为您需要先调用它,然后才能使用 all_responses.get().wait() 。如果是这样——你不需要这样做; get 将阻塞,直到结果可用,之后您不需要 join 。这实际上更常见,尤其是 map 和 friend (尽管文档中的示例是通过 with Pool(…) as pool: 而不是手动调用池中的任何内容)。

10-07 19:25
查看更多