问题描述
我想以这种方式创建一个不同的 WindowFn
,以便根据另一个字段而不是我的输入条目的时间戳将 Windows 分配给我的任何输入元素.我知道来自 Google DataFlow SDK 的预定义 WindowFn
使用时间戳作为分配窗口的标准.
I'd like to create a different WindowFn
in a such way to assign Windows to any of my input elements based on another field instead of based on my input entry's timestamp. I know the pre-defined WindowFn
's from Google DataFlow SDK use the timestamp as a criteria to assign window.
更具体地说,我想创建一种 SlidingWindows
但不是将时间戳视为窗口分配标准,我想将另一个字段视为该标准.
More specifically I'd like to create a kind of SlidingWindows
but instead of considering timestamp as the Window assignment criteria I'd like to consider another field as that criteria.
如何创建自定义的 WindowFn
?创建自己的 WindowFn
时应该考虑哪些要点?
How could I create my customised WindowFn
? What are the points that I should consider when creating my own WindowFn
?
谢谢.
推荐答案
要创建一个新的 WindowFn,只需要继承 WindowFn 或子类并覆盖各种抽象方法.
To create a new WindowFn, you just need to inherit from WindowFn or a subclass and override the various abstract methods.
在您的情况下,您不需要窗口合并,因此您可以从 NonMergingWindowFn 继承,并且您的代码可能看起来像
In your case, you don't need window merging, so you can inherit from NonMergingWindowFn, and your code could look something like
public class MyWindowFn extends NonMergingWindowFn<ElementT, IntervalWindow> {
public Collection<W> assignWindows(AssignContext c) {
return setOfWindowsElementShouldBeIn(c.element());
}
public boolean isCompatible(WindowFn other) {
return other instanceof MyWindowFn;
}
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
public W getSideInputWindow(final BoundedWindow window) {
// You may not need this if you won't ever be using PCollections windowed
// with this as side inputs. If that's the case, just throw.
// Otherwise you'll need to figure out how to map the main input windows
// into the windows generated by this WindowFn.
}
}
这篇关于如何在谷歌数据流中创建个性化的 WindowFn的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!