本文介绍了未触发闪烁CEP事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经在连接到本地Kafka Broker的Flink中实现了CEP模式,该模式可以像预期的那样工作。但是当我连接到基于群集的云Kafka设置时,Flink CEP没有触发。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //saves checkpoint
    env.getCheckpointConfig().enableExternalizedCheckpoints(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

我正在使用AscendingTimestampExtractor,

consumer.assignTimestampsAndWatermarks(
    new AscendingTimestampExtractor<ObjectNode>() {
      @Override
      public long extractAscendingTimestamp(ObjectNode objectNode) {
        long timestamp;
        Instant instant = Instant.parse(objectNode.get("value").get("timestamp").asText());
        timestamp = instant.toEpochMilli();
        return timestamp;
      }
    });

而且我还收到警告消息,

我还尝试使用AssignerWithPeriodicWatermark和AssignerWithPunctuatedWatermark,它们都不起作用

我附加了未分配水印的Flink控制台屏幕截图。Updated flink console screenshot

有人能帮忙吗?

推荐答案

CEP必须首先根据水印对输入流进行排序。所以问题可能与水印有关,但您没有向我们展示足够的内容来调试原因。一个常见问题是有一个idle source,它可能会阻止水印前进。

但还有其他可能的原因。要调试这种情况,我建议您查看一些指标,要么在Flink Web UI中,要么在指标系统中(如果您连接了一个指标系统)。首先,通过查看管道不同阶段的numRecordsInnumRecordsOutnumRecordsInPerSecondnumRecordsOutPerSecond来检查记录是否在流动。

如果有事件,请查看作业的所有不同任务中的currentOutputWatermark,以查看事件时间是否提前。

更新:

您可能正在对Kafka消费者调用assignTimestampsAndWatermarks,这将导致每个分区的水印。在这种情况下,如果您有一个空闲分区,该分区将不会产生任何水印,这将阻碍整个水印。相反,尝试在源生成的数据流上调用assignTimestampsAndWatermarks,看看这样是否可以解决问题。(当然,如果没有每个分区的水印,您将无法使用AscendingTimestampExtractor,因为流将不按顺序排列。)

这篇关于未触发闪烁CEP事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

06-18 08:10