我尝试用这种方式使用Python的多处理包:

featureClass = [[1000,k,1] for k in drange(start,end,step)] #list of arguments
for f in featureClass:
  pool .apply_async(worker, args=f,callback=collectMyResult)
pool.close()
pool.join

在池的进程中,我希望避免等待那些需要超过60秒才能返回结果的进程。有可能吗?

最佳答案

以下是一种不需要更改worker函数即可完成此操作的方法。其思想是将工作线程包装在另一个函数中,该函数将在后台线程中调用worker,然后等待结果timeout秒。如果超时过期,则会引发异常,异常会突然终止线程worker

import multiprocessing
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial

def worker(x, y, z):
    pass # Do whatever here

def collectMyResult(result):
    print("Got result {}".format(result))

def abortable_worker(func, *args, **kwargs):
    timeout = kwargs.get('timeout', None)
    p = ThreadPool(1)
    res = p.apply_async(func, args=args)
    try:
        out = res.get(timeout)  # Wait timeout seconds for func to complete.
        return out
    except multiprocessing.TimeoutError:
        print("Aborting due to timeout")
        p.terminate()
        raise

if __name__ == "__main__":
    pool = multiprocessing.Pool()
    featureClass = [[1000,k,1] for k in drange(start,end,step)] #list of arguments
    for f in featureClass:
      abortable_func = partial(abortable_worker, worker, timeout=3)
      pool.apply_async(abortable_func, args=f,callback=collectMyResult)
    pool.close()
    pool.join()

超时将引发的任何函数。注意,这意味着当发生超时时,回调不会执行。如果这是不可接受的,只需更改multiprocessing.TimeoutErrorexcept块以返回某些内容,而不是调用abortable_worker

关于python - 如何在超时后中止multiprocessing.Pool中的任务?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/29494001/

10-12 22:10
查看更多