我正在使用 Dask 运行任务池,按照 as_completed 方法完成的顺序检索结果,并可能在每次返回时向池提交新任务:

# Initial set of jobs
futures = [client.submit(job.run_simulation) for job in jobs]
pool = as_completed(futures, with_results=True)

while True:
    # Wait for a job to finish
    f, result = next(pool)

    # Exit condition
    if result == 'STOP':
        break

    # Do processing and maybe submit more jobs
    more_jobs = process_result(f, result)
    more_futures = [client.submit(job.run_simulation) for job in more_jobs]
    pool.update(more_futures)

这是我的问题:我提交的函数 job.run_simulation 有时会挂起很长时间,我想让这个函数超时 - 如果运行时间超过某个时间限制,则终止任务并继续前进。

理想情况下,我想做类似 client.submit(job.run_simulation, timeout=10) 的事情,如果任务运行时间超过超时时间,则让 next(pool) 返回 None

Dask 有什么办法可以帮助我暂停这样的工作吗?

到目前为止我尝试过的

我的第一直觉是在 job.run_simulation 函数本身中独立于 Dask 处理超时。我已经看到了关于通用 Python 超时的两种类型的建议(例如 here )。

1) 使用两个线程,一个用于函数本身,另一个用于定时器。我的印象是这实际上不起作用,因为您无法杀死线程。即使计时器用完,两个线程也必须在任务完成之前完成。

2) 使用两个独立的进程(使用 multiprocessing 模块),一个用于函数,一个用于定时器。这会起作用,但由于我已经在 Dask 生成的守护进程子进程中,我不允许创建新的子进程。

第三种可能性是将代码块移动到我使用 subprocess.run 运行的单独脚本中,并使用内置超时的 subprocess.run。我可以做到这一点,但感觉就像是最坏的回退场景,因为它需要大量繁琐的数据传入和传出子流程。

所以感觉我必须在Dask这个级别完成超时。我的一个想法是在将任务提交给 Dask 的同时创建一个计时器作为子进程。然后如果计时器用完,使用 Client.cancel() 停止任务。这个计划的问题是 Dask 可能会在开始任务之前等待工作人员释放,我不希望在任务实际运行之前运行计时器。

最佳答案

您对问题的评估在我看来是正确的,您所经历的解决方案与我会考虑的相同。一些注意事项:

  • Client.cancel 无法阻止已启动的函数的运行。这些函数在线程池中运行,因此您会遇到“无法停止线程”的限制。 Dask Worker 只是 Python 进程,具有相同的能力和局限性。
  • 你说你不能在守护进程中使用进程。对此的一种解决方案是通过以下方式之一更改您使用流程的方式:
  • 如果你在一台机器上使用 dask.distributed 那么就不要使用进程
    client = Client(processes=False)
    
  • 不要使用 Dask 的默认 nanny 进程,那么你的 dask worker 将是一个能够使用多处理的正常进程
  • 将 dask 的 multiprocessing-context 配置设置为 "spawn" 而不是 fork 或 forkserver

  • 解决这个问题的干净方法是在你的函数 job.run_simulation 内部解决它。理想情况下,您可以将此超时逻辑推到该代码中并使其干净利落。

    关于python - 如何使提交给 Dask 的作业超时?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/49925130/

    10-16 12:22