kafkaListenerContainerFactory

kafkaListenerContainerFactory

在spring-kafka文档中,我们可以读取默认容器工厂,假定它具有Bean名称为kafkaListenerContainerFactory的情况,除非已通过配置提供了明确的默认值。

我想问一下是否可以更改配置以使用我的自定义容器工厂bean(例如customKafkaListenerContainerFactory)而不是kafkaListenerContainerFactory吗?

代码示例->如果我们键入

@KafkaListener(id = "cat", topics = "myTopic")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

那么默认的containerFactory bean是customKafkaListenerContainerFactory而不是kafkaListenerContainerFactory
更准确地说是->如果我不提供任何containerFactory属性,则使用customKafkaListenerContainerFactory而不是kafkaListenerContainerFactory

最佳答案

是的,您可以通过在@KafkaListener批注中使用containerFactory属性来设置自定义的kafka容器工厂bean

KafkaListenerContainerFactory的bean名称,用于创建负责服务于此端点的消息侦听器容器。

@KafkaListener(id = "cat", topics = "myTopic", containerFactory="customKafkaListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
   ...
  ack.acknowledge();
}

或者,您可以覆盖Config类中的默认kafkaListenerContainerFactory。就像@Gary Russell所说的那样,只是使用相同的bean名称,它将替换Boots,而Boots的条件是该bean存在
@Configuration
@EnableKafka
public class Config {

   @Bean
   @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
   ConcurrentKafkaListenerContainerFactory<Integer, String>
                    kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory,consumerFactory());
    // set custom properties
    return factory;
   }

   @Bean
   public ConsumerFactory<Integer, String> consumerFactory() {
      return new DefaultKafkaConsumerFactory<>(consumerConfigs());
   }

   @Bean
   public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
    ...
    return props;
  }
}

07-24 13:41