我有一个大数据问题。特定的问题不是很重要,但我已经通过轻松解决了。现在我有两个问题。
from dask import distributed
import numpy as np
local_cluster = distributed.LocalCluster(n_workers=20, processes=True, memory_limit=0)
dask_client = distributed.Client(local_cluster)
hat_matrix = np.random.rand(1000,25000)
weight_matrix = np.random.rand(1000)
Y = np.random.rand(1000, 25000)
[scatter_hat] = dask_client.scatter([hat_matrix], broadcast=True)
[scatter_weight] = dask_client.scatter([weight_matrix], broadcast=True)
futures = [dask_client.submit(apply_function, i, scatter_hat, Y[i, :], scatter_weight)
for i in range(Y.shape[0])]
results = dask_client.gather(futures)
我可以拆分
Y
(这很好,因为我没有足够的内存来一次真正加载所有内存),但是我所有的工作人员都需要hat_matrix
。分散hat_matrix
然后按行发送Y
效果很好。除了hat_matrix
和Y
都...大,可以。我准备了足够的内存来处理它。但是我找不到任何办法来减少短暂的内存高峰(在反序列化期间发生),因此,如果我设置了内存限制,保姆会杀死我的所有工作人员。然后是我所有的新工人。等等等等。所以我有三个问题:有没有一种方法来设置内存限制,以允许在串行数据进入和解压缩时出现峰值?如果我有64 GB的内存来驱动20个进程,那么我想将每个进程的内存限制设置为2.8GB。当我分散2GB的数据时,每个进程进行反序列化的峰值都会达到〜4GB,而保姆会杀死所有数据。
有没有一种方法可以使散射错开以最大程度地减少瞬态存储峰值?
是否有一种方便的方法可以通过磁盘而不是通过TCP分散数据,还是必须自定义写入?
(推论:是否有一种方便的方法可以从我所有工作程序中的序列化文件中加载内存映射的dask数组?)
最佳答案
有没有一种方法来设置内存限制,以允许在串行数据进入和解压缩时出现峰值?
通常,反序列化运行任意代码,因此dask无法真正控制发生的事情。在实践中,尽管与现代硬件上可用的典型内存相比,您的矩阵实际上并没有那么大,但是令您惊讶的是,您遇到了一个问题。 Dask对NumPy数组非常谨慎。我不希望它使用比数组大小更多的内存。
有没有一种方法可以使散射错开以最大程度地减少瞬态存储峰值?
当前,散射是通过广播树进行的。您的客户将派遣给一些工作人员,然后再派遣给更多工作人员,依此类推。默认情况下,这里的分支因子只有两个,因此如果在这里看到巨大的爆炸,我会感到惊讶。
是否有一种方便的方法可以通过磁盘而不是通过TCP分散数据,还是必须自定义写入? (推论:是否有一种方便的方法可以从我所有工作程序中的序列化文件中加载内存映射的dask数组?)
也许您可以使用某种内存映射的NumPy数组而不是内存中的NumPy数组?
关于python - 优化分散,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/55034654/