问题描述
我想我遗漏了一些东西(仍然是Dask Noob),但是我正在尝试进行批处理建议,以避免从此处避免太多的Dask任务:
I guess I'm missing something (still a Dask Noob) but I'm trying the batching suggestion to avoid too many Dask tasks from here:
https://docs.dask.org/en/latest/delayed-best-practices.html
,并且无法使其正常工作.这是我尝试过的:
and can't make them work.This is what I tried:
import dask
def f(x):
return x*x
def batch(seq):
sub_results = []
for x in seq:
sub_results.append(f(x))
return sub_results
batches = []
for i in range(0, 1000000000, 1000000):
result_batch = dask.delayed(batch, range(i, i + 1000000))
batches.append(result_batch)
批次现在包含延迟的对象:
Batches now contains delayed objects:
batches[:3]
[Delayed(range(0, 1000000)),
Delayed(range(1000000, 2000000)),
Delayed(range(2000000, 3000000))]
但是当我计算它们时,我得到了批处理函数指针(我认为?):
but when I compute them I get batch function pointers (I think??):
results = dask.compute(*batches)
results[:3]
(<function __main__.batch(seq)>,
<function __main__.batch(seq)>,
<function __main__.batch(seq)>)
我有两个问题:
-
这真的是应该如何运行,因为它似乎与
最佳做法
页的第一行相反,该行说要不运行delayed(f(x))
,因为它会立即运行并且不会延迟.
Is this really how should this be run, because it seems to contradict the first line of the
Best practices
page where it says to not run it likedelayed(f(x))
because that would run immediately and not lazy.
如何获得上述批处理运行的结果?
How do I get the results of above batched run?
推荐答案
看起来您的代码缺少一对括号.不确定这是否是错字(???).
It looks like your code is missing a pair of parentheses. Not sure if this was a typo (???).
根据文档中的示例,我认为您想要
Per the example from the docs, I think you wanted
result_batch = dask.delayed(batch)(range(i, i + 1000000))
我将 batch,ran ...
替换为 batch)(ran ...
,因为调用了 batch()
功能应该延迟.
where I replaced batch, ran...
by batch)(ran...
, since the call to the batch()
function should be delayed.
答案
- 修正了拼写错误后,您的代码对我来说就可以正常工作-现在,计算将被延迟.关于文档开头的内容-用
dask.delayed
包装的内容很重要.使用dask.delayed(batch(range(i,i + 1000000)))
对函数batch(...)
的调用不会被延迟,因此它将立即运行.这是因为该函数的输出已包装在dask.delayed
中,因此输出(结果)将被延迟,这不是所需的工作流程.但是,dask.delayed(batch)(range(i,i + 1000000))
会延迟对函数的调用(因为在这里,dask.delayed
会包装函数本身).我相信这是文档在最佳做法部分的开头要说的话. - 同样,在修正拼写错误的情况下,您的代码也按我的预期运行,并将冗长输出显示在屏幕上.
- With the typo fixed, your code works fine for me - the computation will now be delayed. Regarding what's written at the start of the docs - it matters what is being wrapped with
dask.delayed
. Withdask.delayed( batch(range(i, i + 1000000)) )
the call to the functionbatch(...)
is not being delayed and so it would run immediately. This is because the output of the function has been wrapped withindask.delayed
, and so the output (result) would be delayed, which is not the desired workflow. However,dask.delayed(batch)(range(i, i + 1000000))
delays the call to the function (since, here,dask.delayed
wraps the function itself). I believe this is what the docs are trying to say, at the start of the best practices section. - Again, with the typo fixed, your code runs as expected for me and prints the lengthy output to the screen.
这篇关于Dask延迟最佳实践的批处理说明如何工作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!