问题描述
根据 Apache Beam 2.0.0 SDK文档 GroupIntoBatches
仅适用于KV
集合.
我的数据集仅包含值,不需要引入键.但是,要使用GroupIntoBatches
,我必须使用空字符串作为键来实现假"键:
My dataset contains only values and there's no need for introducing keys. However, to make use of GroupIntoBatches
I had to implement "fake" keys with an empty string as a key:
static class FakeKVFn extends DoFn<String, KV<String, String>> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of("", c.element()));
}
}
因此,整个管道如下所示:
So the overall pipeline looks like the following:
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
long batchSize = 100L;
p.apply("ReadLines", TextIO.read().from("./input.txt"))
.apply("FakeKV", ParDo.of(new FakeKVFn()))
.apply(GroupIntoBatches.<String, String>ofSize(batchSize))
.setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(StringUtf8Coder.of())))
.apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(callWebService(c.element().getValue()));
}
}))
.apply("WriteResults", TextIO.write().to("./output/"));
p.run().waitUntilFinish();
}
是否有任何方法可以在不引入假"键的情况下将其分组?
Is there any way to group into batches without introducing "fake" keys?
推荐答案
需要使用KV输入到GroupIntoBatches
,因为转换是使用状态和计时器实现的,而状态和计时器是每个键和窗口的.
It is required to provide KV inputs to GroupIntoBatches
because the transform is implemented using state and timers, which are per key-and-window.
对于每个键+窗口对,状态和计时器必须串行执行(或者可以观察到).您必须通过提供键(和窗口,尽管我不知道今天有哪个运行器可以通过Windows进行并行化)来手动表示可用的并行性.两种最常见的方法是:
For each key+window pair, state and timers necessarily execute serially (or observably so). You have to manually express the available parallelism by providing keys (and windows, though no runner that I know of parallelizes over windows today). The two most common approaches are:
- 使用一些自然键,例如用户ID
- 随机选择一些固定数量的分片和密钥.这可能很难调整.您必须具有足够的分片才能获得足够的并行度,但是每个分片都需要包含足够的数据,以使
GroupIntoBatches
实际上有用.
- Use some natural key like a user ID
- Choose some fixed number of shards and key randomly. This can be harder to tune. You have to have enough shards to get enough parallelism, but each shard needs to include enough data that
GroupIntoBatches
is actually useful.
向您的代码段中的所有元素添加一个虚拟键将导致转换根本无法并行执行.这类似于状态索引使ParDo在Dataflow Runner上单线程运行.
Adding one dummy key to all elements as in your snippet will cause the transform to not execute in parallel at all. This is similar to the discussion at Stateful indexing causes ParDo to be run single-threaded on Dataflow Runner.
这篇关于非KV元素的GroupIntoBatches的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!