问题描述
我在本地计算机上有一个很大的NumPy数组,我想与集群上的Dask.array并行化
I have a large NumPy array on my local machine that I want to parallelize with Dask.array on a cluster
import numpy as np
x = np.random.random((1000, 1000, 1000))
但是,当我使用dask.array时,我发现我的调度程序开始占用大量RAM.为什么是这样?这些数据不应该交给工人吗?
However when I use dask.array I find that my scheduler starts taking up a lot of RAM. Why is this? Shouldn't this data go to the workers?
import dask.array as da
x = da.from_array(x, chunks=(100, 100, 100))
from dask.distributed import Client
client = Client(...)
x = x.persist()
推荐答案
无论何时persist
或compute
一个Dask集合,数据都会进入调度程序,然后从调度程序传递到工作程序.如果要绕过调度程序上的数据存储,则必须学习如何使用分散移动数据
Whenever you persist
or compute
a Dask collection that data goes to the scheduler, and from there to the workers. If you want to bypass storing data on the scheduler then you will have to learn how to move data with scatter.
您有三个选择:
- 不在客户端计算机上加载数据
- 散布然后成块
- 然后散布块
不要在客户端计算机上加载数据
最好的方法是将加载数据作为计算的一部分,而不是在本地进行.
Don't load data on your client machine
The best approach is to include loading data as part of your computation rather than do it locally.
x = load_array_from_file(fn) # load into a local numpy array
x = da.from_array(x, chunks=(100, 100, 100)) # split with dask
x = x.persist()
后
x = dask.delayed(load_array_from_file)(fn)
x = da.from_delayed(x, shape=(1000, 1000, 1000), dtype=float)
x = x.rechunk((100, 100, 100))
x = x.persist()
有关创建dask数组的更多信息,请参见: http://dask. pydata.org/en/latest/array-creation.html
More information about creating dask arrays is here: http://dask.pydata.org/en/latest/array-creation.html
您可以将numpy数组直接散布到工人上
You can scatter your numpy array directly to a worker
future = client.scatter(x)
x = da.from_delayed(future, shape=x.shape, dtype=x.dtype)
x = x.rechunk((100, 100, 100))
x = x.persist()
这会将您的数据直接移到工作人员,然后从那里分块.很好,因为它绕过了调度程序.但是,如果您的工作程序开始失败,您现在就有丢失数据的风险.仅当您在大规模并行系统中时,这才重要.
This moves your data directly to a worker, then chunks from there. This is nice because it bypasses the scheduler. However you are now at risk for data loss if your workers start to fail. This only matters if you are in a massively parallel system.
这在某种程度上也效率低下,因为您的所有数据都集中在一个工作线程上,而不是散布开来.您可以致电client.rebalance
或继续阅读.
This is also somewhat inefficient because all of your data in on one worker, rather than spread out. You might call client.rebalance
or read on.
您可以使用本地调度程序在本地对数据进行分块,然后分散到群集中.
You can chunk your data locally with a local scheduler, then scatter out to the cluster.
x = da.from_array(x, chunks=(100, 100, 100))
x = x.persist(get=dask.threaded.get) # chunk locally
futures = client.scatter(dict(x.dask)) # scatter chunks
x.dask = x # re-attach scattered futures as task graph
留在本地
或者,您可以继续通过线程调度程序或仅使用本地进程的分布式调度程序在本地使用dask.
Stay local
Alternatively you can continue to use dask locally, either with the threaded scheduler, or with the distributed scheduler using only the local process.
client = Client(processes=False)
这将停止在本地进程,调度程序和工作进程之间不必要的数据复制.它们现在都在您的本地流程中.
This will stop needless copying of data between your local process, the scheduler, and the workers. They are now all in your same local process.
另请参阅:如何在Dask分布式环境中有效地提交带有大参数的任务?针对此答案的基于任务的版本
See also: How to efficiently submit tasks with large arguments in Dask distributed? for a task-based version of this answer
这篇关于如何使用Dask.array有效地将大型numpy数组发送到集群的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!