我正在使用一个具有kafka集成的spring boot应用程序,并且我想实现一个端点来停止和启动kafka从发布消息。
该消息由另一个端点以异步方式触发。
豆KafkaTemplate<String, String>
或ProducerFactory<String, String> producerFactory()
没有任何停止和暂停操作。
我的目标是能够模拟连接失败,并确保将这些消息存储在我拥有的后备机制中。
有任何想法吗?
最佳答案
KafkaTemplate
没有这些回调,因为它是一个被动组件,只有在我们调用它的情况下,它才能做这些事情。
为了模拟连接失败,我建议您实现一个自定义的ProducerFactory
来生成带有模拟或覆盖的KafkaProducer
的Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
。在那里,您可以在ProducerFactory
中实现适当的生命周期回调,并对提到的send()
实现中的状态做出反应。org.apache.kafka.clients.producer.MockProducer
可能有一些内容可供您重用或借用。例如,查看其close()
或fenceProducer()
。