问题描述
我们正在使用 Beam 的 Java SDK 2.0.0 在 ParDo 中生成顺序索引.就像 Beam 的 有状态处理介绍 我们使用 ValueState
单元格,我们对它的唯一操作是检索值并在需要下一个索引时递增:
We're generating a sequential index in a ParDo using Beam's Java SDK 2.0.0. Just like the simple stateful index example in Beam's introduction to stateful processing we use a ValueState<Integer>
cell and our only operation on it is to retrieve the value and increment when we need the next index:
Integer statefulIndex = firstNonNull(index.read(), 0);
index.write(statefulIndex + 1);
在使用 Google 的 Dataflow 运行器运行时,我们注意到在 Dataflow 监控界面上,ParDo 的挂墙时间与经过的时间同步累积.我们能够通过 ssh 进入工作节点并使用 top
和 1
查看每个内核的 CPU 使用率来确认 ParDo 是单线程执行的.注释掉有状态处理单元并保持其他代码不变,相同的 ParDo 使用我们 n1-standard-32
工作节点的所有核心.
When running with Google's Dataflow runner, we noticed on the Dataflow monitoring interface that the wall time for that ParDo was accumulating in sync with elapsed time. We were able to confirm that the ParDo executes single-threaded by ssh'ing in to the worker node and using top
and 1
to view the CPU usage per core. Commenting out the stateful processing cell and keeping the code otherwise unchanged, the same ParDo uses all cores of our n1-standard-32
worker node.
即使 Dataflow runner 能够基于每个键和窗口对(我们目前有一个窗口和一个键)并行化有状态索引,但缺乏并行性会导致性能显着下降,以至于我们无法使用它.这是 Dataflow 运行程序的预期行为吗?
Even if the Dataflow runner is able to parallelize stateful indexing based on each key and window pair (we currently have one window and one key), the lack of parallelism causes such a significant decrease in performance that we are unable to use it. Is this the expected behavior of the Dataflow runner?
天真地,我希望在幕后,Beam 的有状态索引的操作类似于 Java 的 AtomicInteger
.是否存在阻止使用 ValueState
单元进行并行处理的限制,或者此功能是否尚未内置到运行器中?
Naively, I expected that behind the scenes, Beam's stateful indexing would operate similarly to Java's AtomicInteger
. Are there constraints that prevent parallel processing with a ValueState<Integer>
cell or is this functionality just not yet built into the runner?
推荐答案
这不仅是 Dataflow 运行器的预期行为,而且是任何上下文中的逻辑必要性.无论您是在 Beam 中使用 state 还是在单进程 Java 程序中使用 AtomicInteger
都没有关系:如果操作A"写入一个值而操作B"读取该值,则B"" 必须在 "A" 之后执行.这种关系的常用术语是发生在之前".
This is not only the expected behavior of the Dataflow runner, but a logical necessity in any context. It doesn't matter if you are using state in Beam or an AtomicInteger
in a single-process Java program: if operation "A" writes a value and operation "B" reads the value, then "B" must be executed after "A". The common term for this is relationship is "happens-before".
这种形式的有状态计算与并行计算相反.根据定义,观察写入的读取具有因果关系.根据定义,并行的两个操作没有因果关系.
This form of stateful computation is the opposite of parallel computation. By definition, a read that observes a write has a causal relationship. By definition, two operations that are in parallel do not have a causal relationship.
现在,您可能希望并行线程同时访问状态单元,就像多线程编程的标准模式一样,具有一些具有并发控制的共享状态.对于这个例子,如果这些线程实际上是并行的,你会得到重复的索引.退后一步,Beam 的目标是大规模的令人尴尬的并行"计算,这些计算透明地分布在大型机器集群中.细粒度的并发控制,除了极难做到正确之外,还不容易转化为大规模的分布式计算.
Now, you are perhaps expecting parallel threads that access the state cell concurrently, as in the standard pattern of multi-threaded programming with some shared state with concurrency control. For this example, if these threads were actually parallel, you would get duplicate indices. Taking a step back, Beam targets massive "embarrassingly parallel" computations transparently distributed across a large cluster of machines. Fine-grained concurrency controls, aside from being extremely difficult to get right, do not readily translate to massive distributed computations.
这篇关于有状态索引导致 ParDo 在 Dataflow Runner 上单线程运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!