我想以一种不同的方式创建一个不同的WindowFn,以便根据另一个字段而不是根据输入条目的时间戳将Windows分配给我的任何输入元素。我知道Google DataFlow SDK中的预定义WindowFn使用时间戳作为分配窗口的条件。

更具体地说,我想创建一种SlidingWindows,而不是将时间戳记作为窗口分配条件,我想考虑另一个字段作为该条件。

如何创建自定义的WindowFn?创建自己的WindowFn时应考虑哪些要点?

谢谢。

最佳答案

要创建一个新的WindowFn,您只需要从WindowFn或子类继承并重写各种抽象方法即可。

在您的情况下,您不需要窗口合并,因此可以从NonMergingWindowFn继承,并且您的代码可能类似于

public class MyWindowFn extends NonMergingWindowFn<ElementT, IntervalWindow> {
  public Collection<W> assignWindows(AssignContext c) {
    return setOfWindowsElementShouldBeIn(c.element());
  }

  public boolean isCompatible(WindowFn other) {
    return other instanceof MyWindowFn;
  }

  public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
  }

  public W getSideInputWindow(final BoundedWindow window) {
    // You may not need this if you won't ever be using PCollections windowed
    // with this as side inputs.  If that's the case, just throw.
    // Otherwise you'll need to figure out how to map the main input windows
    // into the windows generated by this WindowFn.
  }
}

07-24 21:45