本文介绍了使用 Kafka 作为 EventStore 时在 Flink 中恢复状态一致性的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将微服务作为事件溯源聚合实现,而该聚合又作为 Flink FlatMapFunction 实现.在基本设置中,聚合从两个 kafka 主题中读取事件和命令.然后,它将新事件写入第一个主题,并在第三个主题中处理结果.因此,Kafka 充当事件存储.希望这张图有帮助:

I am implementing a microservice as an event-sourcing aggregate which, in turn, is implemented as a Flink FlatMapFunction. In the basic setup, the aggregate reads events and commands from two kafka topics. Then, it writes new events to that first topic and processing results in a third topic. Therefore, Kafka acts as the event store. Hope this drawing helps:

  RPC Request                              RPC Result
  |                                                 |
  ~~~~> Commands-|              |---> Results ~~~~~~|
                 |-->Aggregate--|
  ~> Input evs. -|              |---> output evs. ~~~
  |                                                 |
  ~~~~~<~~~~~~~~~~~<~~~feedbak loop~~~~~<~~~~~~~~<~~~

由于 Kafka 未进行检查,因此命令可能会重播两次,而且输出事件似乎也可以写入主题的两倍.

Due to the fact that Kafka is not checkpoined, commands could potentially be replayed twice and it seems that output events could also be written twice the topic.

在重复消息的情况下,如何恢复状态?聚合是否有可能知道其输入流何时是最新的以开始处理命令?

How could the state be recovered in those cases with repeated messages? Is it possible for the aggregate to know when its input streams are up-to-date to start processing commands?

我想了几个解决办法:

  1. 如果 Flink 实现了一个回滚未确认事件,则可以实现一个 Sink,它将从事件源获取当前的偏移量.重新启动时,此接收器将删除 kafka 主题中更新的事件.按照他的方式,KafkaSource 和 KafkaSink 将从同一个构建器生成,然后暴露给拓扑.鉴于其他服务可能会读取主题中较新的事件并导致不一致,因此该解决方案存在严重问题.

  1. If Flink implements a rollback unconfirmed events, a Sink could be implemented which will get the current offset from the event source. When restarted, this sink would remove newer-than-offset events in kafka topic. It his way, KafkaSource and KafkaSink would be generated from the same builder and then exposed to the topology. This solution has a strong problem given that other services could read the newer events in the topic and cause inconsistency.

如果在 2 中无法从 Flink 中删除事件,则有状态源可能会从偏移量中读取事件并尝试匹配聚合中的重复事件并删除它们.此选项似乎并不可靠,因为在某些情况下,补丁不是确定性的,并且可能存在缺陷,因为应该为每个聚合和拓扑重新考虑它,并且它不能保证恢复(例如,在连续重启的情况下).因此,这是一个糟糕的解决方案.

If removing events from Flink is not possible in 2, the statefull source could potentially read events from the offset and try to match the repeated events in the aggregate and drop them. This options seems not robust as there can be situations where patches are not deterministic and subject to flaws as it should be rethought for each aggregate and topology and it would not warranty recovery (e.g. in case of consecutive restarts). Therefore this is a bad solution.

这是一种不同的方法.就是创建一个特殊的KafkaSource,带有两个特殊的水印:第一个,KafkaSourceStartedWatermark,在source启动时会一直发送,通知依赖的operator.发送此水印时,源内部会记录当前的 Kafka 偏移量.第二个,KafkaSourceUpToDateWatermark,在达到偏移量时由源发送.这些水印将沿着拓扑透明地传播.操作员应该能够处理这些水印,实现一个特殊的 WatermarkNotifiable 接口.然后,聚合将能够缓冲或删除 RPC 命令,直到它在每个输入源中都是最新的.

A different approach is this one. It is to create a special KafkaSource with two special watermarks: First one, KafkaSourceStartedWatermark, will be always sent at source startup to notify dependant operators. When this watermark is sent, the source internally records the current Kafka offset. Second one, KafkaSourceUpToDateWatermark, is sent by the source when the offset is reached. These watermarks would travel along the topology transparently. The operator should be able to handle these Watermarks, implementing a special WatermarkNotifiable interface.Then, the aggregate will be able to buffer or drop RPC commands until it is up-to-date in every input source.

interface WatermarkNotifiable  {
    void started(String watermarkId);//KafkaSourceStartedWatermark watermark
    void upToDate(String watermarkId);//KafkaSOurceUpToDateWatermark watermark
}

  • 如果无法实现 3 中的基础设施,KafkaSource 可以实现一个构造函数,指定一个特殊的水印事件,该事件可以传播到运营商,但这将要求所有运营商依赖于这些水印并重新发出然后.

  • If implementing the infrastructure in 3 is not possible, the KafkaSource could implement a constructor specifying a special watermark event that could travel to the operators, but this would require that all the operators depend on these watermarks an re-emits then.

    其他不同的方法是不处理比标准更旧的命令.例如,命令有一个入口时间戳.如果使用时间,时间同步是至关重要的.

    Other different approach is to not process commands older that a criteria. For example, commands have an entry timestamp. If time is used, time synchronization is critical.

    相关的 StackOverflow 问题

    1. 使用 Kafka 作为 (CQRS) 事件存储.好主意?
    2. Kafka - 了解消费者是否是最新的
    3. Kafka &重启时 Flink 重复消息

    推荐答案

    创建新的 Conmuter 运算符类型.这就像一个来源.它由几个代表事件和命令主题的源组成.它以恢复"状态开始.在这种状态下,它从事件主题中读取最新的事件主题.同时,对于命令,它存储或删除它们.更新后,它会考虑恢复并打开"命令的方式.它可以作为源和运算符单独实现.

    Create a new Conmuter operator type. This is like a Source. It is made of several Sources representing Event and Command topics. It starts in "recovering" state. In this state, it reads from the events topics up to their latest. Meanwhile, for the commands, it stores or drops them. Once up to date, it considers recovered and "opens" the way to commands. It could be implemented separately as a source plus an operator.

    FlinkKafkaProducerXX 不足以做到这一点,但它是实现它的基础.

    FlinkKafkaProducerXX is not enough to do this, but it would be the base to implement it.

    这篇关于使用 Kafka 作为 EventStore 时在 Flink 中恢复状态一致性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

  • 07-25 02:34