问题描述
根据数据流2.X的发行说明,删除了IntraBundleParallelization.有没有一种方法可以控制/增加数据流2.1.0上DoFns的并行性?
According to release notes of dataflow 2.X, IntraBundleParallelization is removed. Is there a way to control/increase parallelism of DoFns on dataflow 2.1.0 ?
当我在1.9.0版本的数据流上使用IntrabundleParallelization时,性能得到了改善.
I was getting better performance when I used IntrabundleParallelization on 1.9.0 version of dataflow.
推荐答案
已将其删除,因为在调用完成后,其实现会在ProcessElement
调用的ProcessContext
上保留一个句柄,因此这是不安全的,并且不能保证工作.
It was removed because its implementation keeps a handle on the ProcessContext
of a ProcessElement
call after the call completes, and this is unsafe and not guaranteed to work.
但是,我同意这是一个有用的抽象,很不幸,我们还没有替代品.
However, I agree that it was a useful abstraction, and it is unfortunate that we don't have a replacement yet.
作为解决方法,您可以尝试以下操作:
As a workaround, you can try the following:
- 在您的DoFn的
@Setup
中,创建具有所需线程数的Executor
- 在您的DoFn的
@StartBundle
中,创建一个包装执行程序的ExecutorCompletionService
- 在
@ProcessElement
中,向其提交Future
,代表处理元素的结果 - 在
@ProcessElement
中,也poll()
CompletionService
表示已完成的期货并输出其结果 - 在
@FinishBundle
中,等待所有剩余的期货完成,输出其结果,然后关闭CompletionService
.
- In your DoFn's
@Setup
, create anExecutor
with the needed number of threads - In your DoFn's
@StartBundle
, create anExecutorCompletionService
wrapping the executor - In
@ProcessElement
, submit aFuture
to it representing the result of processing the element - In
@ProcessElement
, alsopoll()
theCompletionService
for completed futures and output their results - In
@FinishBundle
, wait for all remaining futures to complete, output their results, and shut down theCompletionService
.
请记住不在您的期货中使用ProcessContext
. ProcessContext
只能在当前线程和当前ProcessElement
调用中使用.
Remember to not use the ProcessContext
in your futures. ProcessContext
can only be used from the current thread and from within the current ProcessElement
call.
这篇关于Dataflow 2.1.0中是否有IntrabundleParalleization的替代方案?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!