我正在使用一个具有kafka集成的spring boot应用程序,并且我想实现一个端点来停止和启动kafka从发布消息。
该消息由另一个端点以异步方式触发。

KafkaTemplate<String, String>ProducerFactory<String, String> producerFactory()没有任何停止和暂停操作。

我的目标是能够模拟连接失败,并确保将这些消息存储在我拥有的后备机制中。

有任何想法吗?

最佳答案

KafkaTemplate没有这些回调,因为它是一个被动组件,只有在我们调用它的情况下,它才能做这些事情。

为了模拟连接失败,我建议您实现一个自定义的ProducerFactory来生成带有模拟或覆盖的KafkaProducerFuture<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);。在那里,您可以在ProducerFactory中实现适当的生命周期回调,并对提到的send()实现中的状态做出反应。

org.apache.kafka.clients.producer.MockProducer可能有一些内容可供您重用或借用。例如,查看其close()fenceProducer()

09-27 23:05