我正在尝试使用库reactor-kafka
开发Spring Boot应用程序,以对从Kafka主题读取的某些消息使用react。
我有一个构建类KafkaReceiver
的配置类。
@Configuration
public class MyConfiguration {
@Bean
public KafkaReceiver<String, String> kafkaReceiver() {
Map<String, Object> props = new HashMap<>();
// Options initialisation...
final ReceiverOptions<String, String> receiverOptions =
ReceiverOptions.<String, string>create(props)
.subscription(Collections.singleton(consumer.getTopic()));
return KafkaReceiver.create(receiverOptions);
}
}
好吧...现在呢?使用不太活跃的
spring-kafka
库,我可以用@KafkaListener
注释一个方法,Spring Boot会为我创建一个从Kafka主题监听的线程。我应该在哪里放置
KafkaReceiver
呢?在所有示例中,我发现直接使用main
方法,但这不是Boot方法。我正在使用Spring Boot 2.1.3和Reactor-Kafka 1.1.0
提前致谢。
最佳答案
由于您拥有了KafkaReceiver
bean,因此您现在可以执行以下操作:
@Bean
public ApplicationRunner runner(KafkaReceiver<String, String> kafkaReceiver) {
return args -> {
kafkaReceiver.receive()
...
.sunbscribe();
};
}
准备好
ApplicationRunner
时,将踢该ApplicationContext
bean。有关更多信息,请参见其JavaDocs。