我正在比较Dask和Numpy针对不同数据大小的计算速度。我了解Dask可以并行执行数据计算,并将数据拆分为多个块,以便数据大小可以大于RAM。当使用下面的Dask代码时,出现大小为42000的正方形数组的内存错误(显示在底部)。

import dask as da
import time
size = 42000
y = da.random.random(size = (size,size), chunks = (size/8,size/8))
start = time.time()
y = y.dot(y*2)      #arbitrary dot product calculation
y.compute()
end = time.time()
print(str(end-start) + " seconds")

但是,使用Numpy运行类似的代码时,我不会出现任何错误。
import numpy as np
import time
size = 42000
x = np.random.random(size = (size,size))
start = time.time()
x = x.dot(x*2)      #arbitrary dot product calculation
end = time.time()
print(str(end-start) + " seconds")

因此,我不明白为什么当Numpy不在时Dask会引发内存错误,尤其是因为Dask应该能够对数据进行分区。是否对此有任何解释/解决方案?

编辑:我只有点产品有此问题。我已经测试过,没有任何问题。
MemoryError                               Traceback (most recent call last)
<ipython-input-3-a3af599b673a> in <module>()
      3 start = time.time()
      4 y = y.dot(y*2)
----> 5 y.compute()
      6 end = time.time()
      7 print(str(end-start) + " seconds")

~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\base.py in compute(self, **kwargs)
    152         dask.base.compute
    153         """
--> 154         (result,) = compute(self, traverse=False, **kwargs)
    155         return result
    156

~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\base.py in compute(*args, **kwargs)
    405     keys = [x.__dask_keys__() for x in collections]
    406     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 407     results = get(dsk, keys, **kwargs)
    408     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    409

~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\threaded.py in get(dsk, result, cache, num_workers, **kwargs)
     73     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     74                         cache=cache, get_id=_thread_get_id,
---> 75                         pack_exception=pack_exception, **kwargs)
     76
     77     # Cleanup pools associated to dead threads

~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    519                         _execute_task(task, data)  # Re-execute locally
    520                     else:
--> 521                         raise_exception(exc, tb)
    522                 res, worker_id = loads(res_info)
    523                 state['cache'][key] = res

~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\compatibility.py in reraise(exc, tb)
     65         if exc.__traceback__ is not tb:
     66             raise exc.with_traceback(tb)
---> 67         raise exc
     68
     69 else:

~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    288     try:
    289         task, data = loads(task_info)
--> 290         result = _execute_task(task, data)
    291         id = get_id()
    292         result = dumps((result, id))

~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\local.py in _execute_task(arg, cache, dsk)
    269         func, args = arg[0], arg[1:]
    270         args2 = [_execute_task(a, cache) for a in args]
--> 271         return func(*args2)
    272     elif not ishashable(arg):
    273         return arg

~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\compatibility.py in apply(func, args, kwargs)
     46     def apply(func, args, kwargs=None):
     47         if kwargs:
---> 48             return func(*args, **kwargs)
     49         else:
     50             return func(*args)

~\AppData\Local\Continuum\anaconda3\lib\site-packages\numpy\core\fromnumeric.py in sum(a, axis, dtype, out, keepdims)
   1880             return sum(axis=axis, dtype=dtype, out=out, **kwargs)
   1881     return _methods._sum(a, axis=axis, dtype=dtype,
-> 1882                          out=out, **kwargs)
   1883
   1884

~\AppData\Local\Continuum\anaconda3\lib\site-packages\numpy\core\_methods.py in _sum(a, axis, dtype, out, keepdims)
     30
     31 def _sum(a, axis=None, dtype=None, out=None, keepdims=False):
---> 32     return umr_sum(a, axis, dtype, out, keepdims)
     33
     34 def _prod(a, axis=None, dtype=None, out=None, keepdims=False):

MemoryError:

最佳答案

在最后阶段,Dask将内容缝合在一起时,可能需要大约2倍的内存来存储输出。

通常,如果您的计算适合内存,则可能不应该使用Dask。使用现代BLAS实现(NumPy,OpenBLAS,MKL等)的性能可能会更好。

关于python - 当Numpy不进行点积计算时,为什么Dask数组会引发内存错误?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/51332325/

10-12 23:27