本文介绍了如何获取当前滑动窗口的最大时间戳的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在使用X大小和Y周期的滑动时间窗口.为了标记每个窗口的输出,我想获取PCollection当前窗口的时间戳.
I'm using a sliding time window of X size and Y period. In order to mark the output of each window, I'd like to get the timestamp of the current window of PCollection.
PCollection<T> windowedInput = input
.apply(Window<T>into(
SlidingWindows.of(Duration.standardMinutes(10))
.every(Duration.standardMinutes(1))));
// Extract key from each input and run a function per group.
//
// Q: ExtractKey() depends on the window triggered time.
// How can I pass the timestamp of windowedInputs to ExtractKey()?
PCollection<KV<K, Iterable<T>>> groupedInputs = windowedInputs
.apply(ParDo.of(new ExtractKey()))
.apply(GroupByKey.<K, Ts>create());
// Run Story clustering and write outputs.
//
// Q: Also I'd like to add a window timestamp suffix to the output.
// How can I pass (or get) the timestamp to SomeDoFn()?
PCollection<String> results = groupedInputs.apply(ParDo.of(new SomeDoFn()));
推荐答案
允许DoFn
通过@ProcessElement
方法上的可选BoundedWindow
参数访问当前元素的窗口:
A DoFn
is allowed to access the window of the current element via an optional BoundedWindow
parameter on the @ProcessElement
method:
class SomeDoFn extends DoFn<KV<K, Iterable<T>>, String> {
@ProcessElement
public void process(ProcessContext c, BoundedWindow window) {
...
}
}
这篇关于如何获取当前滑动窗口的最大时间戳的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!