本文介绍了非 KV 元素的 GroupIntoBatches的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据Apache Beam 2.0.0 SDK 文档 GroupIntoBatches 仅适用于 KV 集合.

According to the Apache Beam 2.0.0 SDK Documentation GroupIntoBatches works only with KV collections.

我的数据集只包含值,不需要引入键.但是,为了使用 GroupIntoBatches,我必须使用空字符串作为键来实现假"键:

My dataset contains only values and there's no need for introducing keys. However, to make use of GroupIntoBatches I had to implement "fake" keys with an empty string as a key:

static class FakeKVFn extends DoFn<String, KV<String, String>> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    c.output(KV.of("", c.element()));
  }
}

所以整个管道看起来如下:

So the overall pipeline looks like the following:

public static void main(String[] args) {
  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  long batchSize = 100L;

  p.apply("ReadLines", TextIO.read().from("./input.txt"))
      .apply("FakeKV", ParDo.of(new FakeKVFn()))
      .apply(GroupIntoBatches.<String, String>ofSize(batchSize))
      .setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(StringUtf8Coder.of())))
      .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, String>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
          c.output(callWebService(c.element().getValue()));
        }
      }))
      .apply("WriteResults", TextIO.write().to("./output/"));

  p.run().waitUntilFinish();
}

有没有办法在不引入假"键的情况下分组?

Is there any way to group into batches without introducing "fake" keys?

推荐答案

需要向 GroupIntoBatches 提供 KV 输入,因为转换是使用状态和计时器实现的,它们是按键和-窗口.

It is required to provide KV inputs to GroupIntoBatches because the transform is implemented using state and timers, which are per key-and-window.

对于每个键+窗口对,状态和计时器必须串行执行(或可观察到).您必须通过提供键(和窗口,尽管我知道今天没有在窗口上并行化的运行器)来手动表达可用的并行性.两种最常见的方法是:

For each key+window pair, state and timers necessarily execute serially (or observably so). You have to manually express the available parallelism by providing keys (and windows, though no runner that I know of parallelizes over windows today). The two most common approaches are:

  1. 使用一些自然键,如用户 ID
  2. 随机选择一些固定数量的分片和密钥.这可能更难调整.您必须有足够的分片才能获得足够的并行度,但每个分片都需要包含足够的数据,以便 GroupIntoBatches 真正有用.
  1. Use some natural key like a user ID
  2. Choose some fixed number of shards and key randomly. This can be harder to tune. You have to have enough shards to get enough parallelism, but each shard needs to include enough data that GroupIntoBatches is actually useful.

在您的代码片段中为所有元素添加一个虚拟键将导致转换根本无法并行执行.这类似于 有状态索引导致 ParDo 在 Dataflow Runner 上单线程运行.

Adding one dummy key to all elements as in your snippet will cause the transform to not execute in parallel at all. This is similar to the discussion at Stateful indexing causes ParDo to be run single-threaded on Dataflow Runner.

这篇关于非 KV 元素的 GroupIntoBatches的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-25 17:30