假设我有一个简单的昂贵函数,将一些结果存储到文件中:
def costly_function(filename):
time.sleep(10)
with open('filename', 'w') as f:
f.write("I am done!)
现在让我们说我想在dask中安排许多这些任务,然后异步处理这些请求并逐个运行这些功能。我目前正在设置一个dask客户对象...
cluster = dask.distributed.LocalCluster(n_workers=1, processes=False) # my attempt at sequential job processing
client = dask.distributed.Client(cluster)
...然后交互地(从IPython中)调度这些作业:
>>> client.schedule(costly_function, "result1.txt")
>>> client.schedule(costly_function, "result2.txt")
>>> client.schedule(costly_function, "result3.txt")
我遇到的问题是这些任务不是连续运行,而是并行运行,在我的特定情况下,这会导致并发问题。
所以我的问题是:像我前面提到的那样建立一个工作队列的正确方法是什么?
最佳答案
好的,我想我可能有一个解决方案(不过可以提出更好的方案!)。它需要稍微修改以前的昂贵功能:
def costly_function(filename, prev_job=None):
time.sleep(10)
with open('filename', 'w') as f:
f.write("I am done!")
cluster = dask.distributed.LocalCluster(n_workers=1, processes=False) # my attempt at sequential job processing
client = dask.distributed.Client(cluster)
然后在交互式上下文中,您将编写以下内容:
>>> future = client.submit(costly_function, "result1.txt")
>>> future = client.submit(costly_function, "result2.txt", prev_job=future)
>>> future = client.submit(costly_function, "result3.txt", prev_job=future)