假设我有一个简单的昂贵函数,将一些结果存储到文件中:

def costly_function(filename):
    time.sleep(10)
    with open('filename', 'w') as f:
        f.write("I am done!)


现在让我们说我想在dask中安排许多这些任务,然后异步处理这些请求并逐个运行这些功能。我目前正在设置一个dask客户对象...

cluster = dask.distributed.LocalCluster(n_workers=1, processes=False)  # my attempt at sequential job processing
client = dask.distributed.Client(cluster)


...然后交互地(从IPython中)调度这些作业:

>>> client.schedule(costly_function, "result1.txt")
>>> client.schedule(costly_function, "result2.txt")
>>> client.schedule(costly_function, "result3.txt")


我遇到的问题是这些任务不是连续运行,而是并行运行,在我的特定情况下,这会导致并发问题。

所以我的问题是:像我前面提到的那样建立一个工作队列的正确方法是什么?

最佳答案

好的,我想我可能有一个解决方案(不过可以提出更好的方案!)。它需要稍微修改以前的昂贵功能:

def costly_function(filename, prev_job=None):
    time.sleep(10)
    with open('filename', 'w') as f:
        f.write("I am done!")

cluster = dask.distributed.LocalCluster(n_workers=1, processes=False)  # my attempt at sequential job processing
client = dask.distributed.Client(cluster)


然后在交互式上下文中,您将编写以下内容:

>>> future = client.submit(costly_function, "result1.txt")
>>> future = client.submit(costly_function, "result2.txt", prev_job=future)
>>> future = client.submit(costly_function, "result3.txt", prev_job=future)

09-04 23:53