在spring-kafka文档中,我们可以读取默认容器工厂,假定它具有Bean名称为kafkaListenerContainerFactory
的情况,除非已通过配置提供了明确的默认值。
我想问一下是否可以更改配置以使用我的自定义容器工厂bean(例如customKafkaListenerContainerFactory
)而不是kafkaListenerContainerFactory
吗?
代码示例->如果我们键入
@KafkaListener(id = "cat", topics = "myTopic")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
那么默认的containerFactory bean是
customKafkaListenerContainerFactory
而不是kafkaListenerContainerFactory
更准确地说是->如果我不提供任何containerFactory属性,则使用
customKafkaListenerContainerFactory
而不是kafkaListenerContainerFactory
最佳答案
是的,您可以通过在@KafkaListener
批注中使用containerFactory属性来设置自定义的kafka容器工厂bean
KafkaListenerContainerFactory的bean名称,用于创建负责服务于此端点的消息侦听器容器。
@KafkaListener(id = "cat", topics = "myTopic", containerFactory="customKafkaListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
或者,您可以覆盖Config类中的默认kafkaListenerContainerFactory。就像@Gary Russell所说的那样,只是使用相同的bean名称,它将替换Boots,而Boots的条件是该bean存在
@Configuration
@EnableKafka
public class Config {
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<Integer, String>
kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory,consumerFactory());
// set custom properties
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}