我有一个简单的问题。
可以说我正在读取一个实木复合地板文件,该文件每行生成一个avro GenericRecord对象,如下所示。

{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j1"}
{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j2"}
{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j3"}
{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j4"}

{"name":"paul", "surename":"carl", "age":28, "user_pk":"paul:carl:28", "unique_attribute":"p1"}
{"name":"paul", "surename":"carl", "age":28, "user_pk":"paul:carl:28", "unique_attribute":"p2"}
{"name":"paul", "surename":"carl", "age":28, "user_pk":"paul:carl:28", "unique_attribute":"p3"}



这个文件是故意放平的,我想取消放平它们。


我们知道输入是有序的,我想处理它们直到下一个会话密钥,然后传递给管道中的下一个apply,以保持最小的内存需求,
因此,中介阶段应返回KV<String, Iterable<GenericRecord>>甚至更好的组合KV<String, GenericRecord>


<"john:doe:40", {"name":"john", "surename":"doe", "age":40, ["unique_attribute":"j1", ...]}>
<"paul:carl:28", {"name":"paul", "surename":"carl", "age":28, "user_pk":, ["unique_attribute":"p1", ...]}



这是我到目前为止所得到的;

        pipeline.apply("FilePattern", FileIO.match().filepattern(PARQUET_FILE_PATTERN))
                .apply("FileReadMatches", FileIO.readMatches())
                .apply("ParquetReadFiles", ParquetIO.readFiles(schema))
                .apply("SetKeyValuePK", WithKeys.of(input -> AvroSupport.of(input).extractString("user_pk").get())).setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(schema)))
                .apply(Window.into(Sessions.withGapDuration(Duration.standardSeconds(5L)))).setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(schema)))
                .apply("SetGroupByPK", GroupByKey.create()).setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(AvroCoder.of(schema))))
...
...



我不知道是否有更好的方法,但是现在我使用了Sessions.withGapDuration窗口化策略。
我希望每隔5秒就会得到一个分组元素KV<String, Iterable<GenericRecord>> element,但是GroupByKey之后我什么也没得到,我什至不确定GroupByKey是否实际上在做什么,但是我知道内存在增加快速,因此它必须等待所有物品。

所以问题是,您将如何设置一个窗口功能以使我能够groupbykey。
我还尝试过Combine.byKey,因为它应该是GroupByKey + Windowing Function,但无法实现?

最佳答案

我设法使groupby工作,但是不确定我是否完全理解。
我不得不加两个想法。
Beam中的第一个(任何一个)IO操作不会添加时间戳。

.apply("WithTimestamp", WithTimestamps.of(input -> Instant.now()))


第二,我添加了Triger,因此GroupByKey实际上会被触发。不知道为什么它没有首先触发。我敢肯定有人对此有解释。

.apply("SessionWindow", Window.<KV<String, GenericRecord>>into(Sessions.withGapDuration(Duration.standardSeconds(5L))).triggering(
                        AfterWatermark.pastEndOfWindow()
                                .withLateFirings(AfterProcessingTime
                                        .pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
                        .withAllowedLateness(Duration.ZERO)
                        .discardingFiredPanes())



这不是完美的,即使窗口只是GroupByKey,我仍然必须等待几分钟才能看到5s被触发,但最终它会被触发,这是进度。

编辑:
好的,看起来好像不需要时间戳,我假设是因为窗口是基于会话的,而不是基于时间的。
我也将设置更改为流

        options.as(StreamingOptions.class)
                .setStreaming(true);


我希望这对遇到类似问题的人有所帮助。

10-06 05:46