问题描述
我已经在连接到本地Kafka Broker的Flink中实现了CEP模式,该模式可以像预期的那样工作。但是当我连接到基于群集的云Kafka设置时,Flink CEP没有触发。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//saves checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
我正在使用AscendingTimestampExtractor,
consumer.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<ObjectNode>() {
@Override
public long extractAscendingTimestamp(ObjectNode objectNode) {
long timestamp;
Instant instant = Instant.parse(objectNode.get("value").get("timestamp").asText());
timestamp = instant.toEpochMilli();
return timestamp;
}
});
而且我还收到警告消息,
我还尝试使用AssignerWithPeriodicWatermark和AssignerWithPunctuatedWatermark,它们都不起作用
我附加了未分配水印的Flink控制台屏幕截图。Updated flink console screenshot
有人能帮忙吗?
推荐答案
CEP必须首先根据水印对输入流进行排序。所以问题可能与水印有关,但您没有向我们展示足够的内容来调试原因。一个常见问题是有一个idle source,它可能会阻止水印前进。
但还有其他可能的原因。要调试这种情况,我建议您查看一些指标,要么在Flink Web UI中,要么在指标系统中(如果您连接了一个指标系统)。首先,通过查看管道不同阶段的numRecordsIn
、numRecordsOut
或numRecordsInPerSecond
和numRecordsOutPerSecond
来检查记录是否在流动。
如果有事件,请查看作业的所有不同任务中的currentOutputWatermark
,以查看事件时间是否提前。
更新:
您可能正在对Kafka消费者调用assignTimestampsAndWatermarks
,这将导致每个分区的水印。在这种情况下,如果您有一个空闲分区,该分区将不会产生任何水印,这将阻碍整个水印。相反,尝试在源生成的数据流上调用assignTimestampsAndWatermarks
,看看这样是否可以解决问题。(当然,如果没有每个分区的水印,您将无法使用AscendingTimestampExtractor,因为流将不按顺序排列。)
这篇关于未触发闪烁CEP事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!