问题描述
由于代码错误或缺乏验证,可能会发生进入 Flink 作业的数据触发异常.我的目标是提供一致的异常处理方式,我们的团队可以在 Flink 作业中使用这种方式,而不会导致生产停机.
It may happen that data that enters Flink job triggers exception either due to bug in code or lack of validation.My goal is to provide consistent way of exception handling that our team could use within Flink jobs that won't cause any downtime in production.
重启策略在这里似乎不适用,因为:
Restart strategies do not seem to be applicable here as:
- 简单的重启不能解决问题,我们陷入了重启循环
- 我们不能简单地跳过事件
- 它们可以解决 OOME 或一些暂时性问题
- 我们不能添加自定义的
keyBy"函数中的 try/catch 块并没有完全帮助:
try/catch block in "keyBy" function does not fully help as:
- 处理异常后无法跳过keyBy"中的事件
示例代码:
env.addSource(kafkaConsumer)
.keyBy(keySelector) // must return one result for one entry
.flatMap(mapFunction) // we can skip some entries here in case of errors
.addSink(new PrintSinkFunction<>());
env.execute("Flink Application");
我希望能够跳过导致keyBy"问题的事件的处理,以及应该只返回一个结果的类似方法.
I'd like to have ability to skip processing of event that caused issue in "keyBy" and similar methods that are supposed to return exactly one result.
推荐答案
除了@phanhuy152 的建议(这对我来说似乎完全合法)为什么不在 keyBy
之前使用 filter
?
Beside the suggestion of @phanhuy152 (which seems totally legit to me) why not filter
before keyBy
?
env.addSource(kafkaConsumer)
.filter(invalidKeys)
.keyBy(keySelector) // must return one result for one entry
.flatMap(mapFunction) // we can skip some entries here in case of errors
.addSink(new PrintSinkFunction<>());
env.execute("Flink Application");
这篇关于Apache Flink - “keyBy"中的异常处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!