问题描述
我正在使用 Flink CEP 来检测针对来自 Kafka 的事件的模式.为简单起见,事件只有一种类型.我试图检测连续事件流中字段值的变化.代码如下
I am using Flink CEP to detect patterns against events from Kafka. For simplicity, events only have one type. I am trying to detect the change in the value of a field in the continuous event stream. The code looks like the following
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.addSource(new FlinkKafkaConsumer[..]())
.filter(...)
.map(...)
.assignTimestampsAndWatermarks(
WatermarkStrategy.forMonotonousTimestamps[Event]().withTimestampAssigner(..)
)
.keyBy(...)(TypeInformation.of(classOf[...]))
val pattern: Pattern[Event, _] =
Pattern.begin[Event]("start", AfterMatchSkipStrategy.skipPastLastEvent()).times(1)
.next("middle")
.oneOrMore()
.optional()
.where(new IterativeCondition[Event] {
override def filter(event: Event, ctx:...): Boolean = {
val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
startTrafficEvent.getFieldValue().equals(event.getFieldValue())
}
})
.next("end").times(1)
.where(new IterativeCondition[Event] {
override def filter(event: Event, ctx:...): Boolean = {
val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
!startTrafficEvent.getFieldValue().equals(event.getFieldValue())
}
})
.within(Time.seconds(30))
Kafka 主题有 104 个分区,事件在分区之间均匀分布.当我提交作业时,parallelism
设置为 104.
Kafka topic has 104 partitions, events are distributed evenly across the partitions. When I submitted the job, parallelism
was set to 104.
从Web UI来看,有2个任务:第一个是Source->filter->map->timestamp/watermark
;第二个是 CepOperator->sink
.每个任务有 104 个并行度.
From Web UI, there were 2 tasks: the first one is Source->filter->map->timestamp/watermark
; the second one is CepOperator->sink
. Each task got 104 parallelism.
子任务的工作量不均匀,应该来自keyBy
.子任务之间的水印不同,但开始卡在一个值上,很长时间没有变化.从日志中,我可以看到 CEP 不断评估事件,并将匹配的结果推送到下游接收器.
The workload on subtasks was uneven, it should come from keyBy
. Watermarks among subtasks were different, but they started to be stuck at a value, no change for a long time. From logs, I can see CEP kept evaluating events, and matched results being pushed to downstream sink.
事件速率为10k/s,第一个任务的背压保持high
,第二个ok
.
The event rate was 10k/s, and the first task's backpressure kept high
and the second one ok
.
请帮助解释 CEP 中发生的情况以及如何解决此问题
Please help explain what happened in CEP and how to fix the issue
谢谢
推荐答案
更仔细地考虑您的问题后,我正在修改我的答案.
Having given your question more careful consideration, I'm revising my answer.
听起来 CEP 正在继续生成匹配项,并且它们正在被推送到接收器,但是 CEP+接收器任务正在产生高背压.找出背压的原因会有所帮助.
It sounds like CEP is continuing to produce matches and they are being pushed to the sink, but the CEP+sink task is producing high backpressure. What would help is to identity the cause of the backpressure.
如果事件可以从所有分区读取,但水印只是勉强推进,这听起来背压严重到足以阻止事件被摄取.
If events are available to read from all partitions, and yet the watermarks are only barely advancing, it sounds like the backpressure is severe enough to prevent events from being ingested at all.
我怀疑
- CEP 引擎中工作量的组合爆炸式增长,和/或
- 足够的匹配让接收器跟不上
可能的原因.
获得更多洞察力的一些想法:
A few ideas for getting more insight:
(1) 尝试使用分析器来确定 CepOperator 是否是瓶颈,并确定它在做什么.
(1) Try using a profiler to determine if the CepOperator is the bottleneck, and perhaps identify what it is doing.
(2) 禁用 CepOperator 和接收器之间的操作符链接以隔离 CEP —— 只是作为调试步骤.这将使您更好地了解(通过指标和背压监控)CEP 和接收器各自在做什么.
(2) Disable operator chaining between the CepOperator and the sink in order to isolate CEP -- simply as a debugging step. This will give you better visibility (via the metrics and backpressure monitoring) as to what CEP and the sink are each doing.
(3) 在较小的设置中进行测试,并扩展 CEP 日志记录.
(3) Test this in a smaller setup, and expand the CEP logging.
这篇关于水印在 Flink CEP 中远远落后的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!