我正在使用 Dask 运行任务池,按照 as_completed
方法完成的顺序检索结果,并可能在每次返回时向池提交新任务:
# Initial set of jobs
futures = [client.submit(job.run_simulation) for job in jobs]
pool = as_completed(futures, with_results=True)
while True:
# Wait for a job to finish
f, result = next(pool)
# Exit condition
if result == 'STOP':
break
# Do processing and maybe submit more jobs
more_jobs = process_result(f, result)
more_futures = [client.submit(job.run_simulation) for job in more_jobs]
pool.update(more_futures)
这是我的问题:我提交的函数
job.run_simulation
有时会挂起很长时间,我想让这个函数超时 - 如果运行时间超过某个时间限制,则终止任务并继续前进。理想情况下,我想做类似
client.submit(job.run_simulation, timeout=10)
的事情,如果任务运行时间超过超时时间,则让 next(pool)
返回 None
。Dask 有什么办法可以帮助我暂停这样的工作吗?
到目前为止我尝试过的
我的第一直觉是在
job.run_simulation
函数本身中独立于 Dask 处理超时。我已经看到了关于通用 Python 超时的两种类型的建议(例如 here )。1) 使用两个线程,一个用于函数本身,另一个用于定时器。我的印象是这实际上不起作用,因为您无法杀死线程。即使计时器用完,两个线程也必须在任务完成之前完成。
2) 使用两个独立的进程(使用
multiprocessing
模块),一个用于函数,一个用于定时器。这会起作用,但由于我已经在 Dask 生成的守护进程子进程中,我不允许创建新的子进程。第三种可能性是将代码块移动到我使用
subprocess.run
运行的单独脚本中,并使用内置超时的 subprocess.run
。我可以做到这一点,但感觉就像是最坏的回退场景,因为它需要大量繁琐的数据传入和传出子流程。所以感觉我必须在Dask这个级别完成超时。我的一个想法是在将任务提交给 Dask 的同时创建一个计时器作为子进程。然后如果计时器用完,使用
Client.cancel()
停止任务。这个计划的问题是 Dask 可能会在开始任务之前等待工作人员释放,我不希望在任务实际运行之前运行计时器。 最佳答案
您对问题的评估在我看来是正确的,您所经历的解决方案与我会考虑的相同。一些注意事项:
Client.cancel
无法阻止已启动的函数的运行。这些函数在线程池中运行,因此您会遇到“无法停止线程”的限制。 Dask Worker 只是 Python 进程,具有相同的能力和局限性。 client = Client(processes=False)
multiprocessing-context
配置设置为 "spawn"
而不是 fork 或 forkserver 解决这个问题的干净方法是在你的函数
job.run_simulation
内部解决它。理想情况下,您可以将此超时逻辑推到该代码中并使其干净利落。关于python - 如何使提交给 Dask 的作业超时?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/49925130/