我正在尝试使用库reactor-kafka开发Spring Boot应用程序,以对从Kafka主题读取的某些消息使用react。

我有一个构建类KafkaReceiver的配置类。

@Configuration
public class MyConfiguration {

    @Bean
    public KafkaReceiver<String, String> kafkaReceiver() {
        Map<String, Object> props = new HashMap<>();
        // Options initialisation...
        final ReceiverOptions<String, String> receiverOptions =
                ReceiverOptions.<String, string>create(props)
                               .subscription(Collections.singleton(consumer.getTopic()));
        return KafkaReceiver.create(receiverOptions);
    }
}

好吧...现在呢?使用不太活跃的spring-kafka库,我可以用@KafkaListener注释一个方法,Spring Boot会为我创建一个从Kafka主题监听的线程。

我应该在哪里放置KafkaReceiver呢?在所有示例中,我发现直接使用main方法,但这不是Boot方法。

我正在使用Spring Boot 2.1.3和Reactor-Kafka 1.1.0

提前致谢。

最佳答案

由于您拥有了KafkaReceiver bean,因此您现在可以执行以下操作:

@Bean
public ApplicationRunner runner(KafkaReceiver<String, String> kafkaReceiver) {
        return args -> {
                kafkaReceiver.receive()
                          ...
                          .sunbscribe();
        };
}

准备好ApplicationRunner时,将踢该ApplicationContext bean。有关更多信息,请参见其JavaDocs。

10-06 07:28