我为事件流创建了自定义触发器和处理功能。

DataStream<DynamoDBRow> dynamoDBRows =
    sensorEvents
        .keyBy("id")
        .window(GlobalWindows.create())
        .trigger(new MyCustomTrigger())
        .allowedLateness(Time.minutes(1)) # Note
        .process(new MyCustomWindowProcessFunction());


我的触发器基于事件参数。接收到事件结束信号后,将MyCustomWindowProcessFunction()应用于窗口元素。

@Slf4j
public class MyCustomTrigger extends Trigger<SensorEvent, GlobalWindow> {

  @Override
  public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

    if (element.isEventEnd() == true) {
      return TriggerResult.FIRE_AND_PURGE;
    }

    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}


可能只有很少的传感器数据,即使在触发之后也可能会出现。因此,我添加了.allowedLateness(Time.minutes(1)),以确保在处理时不会错过这些事件。

就我而言,allowedLateness无法正常工作。

浏览完文件后,我发现了这个

java - 全局窗口自定义触发器上的allowedLateness-LMLPHP

如何在GlobalWindow中包含allowedLateness?

注意:我也尝试设置环境时间特征

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);


更新:2020年2月20日

目前正在思考以下方法。 (目前无法正常工作)

@Slf4j
public class JourneyTrigger extends Trigger<SensorEvent, GlobalWindow> {

  private final long allowedLatenessMillis;

  public JourneyTrigger(Time allowedLateness) {
    this.allowedLatenessMillis = allowedLateness.toMilliseconds();
  }

  @Override
  public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

    if (element.isEventEnd() == true) {
      log.info("Timer started with allowedLatenessMillis " + allowedLatenessMillis);
      ctx.registerEventTimeTimer(System.currentTimeMillis() + allowedLatenessMillis);
    }

    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    log.info("onEvenTime called at "+System.currentTimeMillis() );
    return TriggerResult.FIRE_AND_PURGE;
  }


  @Override
  public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}

最佳答案

老实说,我看不出在这里使用GlobalWindow的原因。您可以只使用与您的KeyedProcessFunction相同目的的Trigger,基本上,它将把从事件开始到事件结束的所有元素收集到ListState中,然后当您收到isEventEnd()==true,您只需安排EventTime计时器,该计时器将在一分钟后触发并发出在ListState内部收集的结果。

07-25 22:33
查看更多