问题描述
我目前正在Spring Kafka涉足,并成功向我的侦听器添加了一个KafkaListenerContainerFactory.现在,我想添加多个KafkaListenerContainerFactory(一个用于在json中包含消息的主题,另一个用于字符串).参见下面的代码:
Hi I'm currently dabbling in Spring Kafka and succeeded in adding a single KafkaListenerContainerFactory to my listener. Now I'd like to add multiple KafkaListenerContainerFactorys (One for a topic that will have messages in json, another one for strings). See code below:
@EnableKafka
@Configuration
public class KafkaConsumersConfig {
private final KafkaConfiguration kafkaConfiguration;
@Autowired
public KafkaConsumersConfig(KafkaConfiguration kafkaConfiguration) {
this.kafkaConfiguration = kafkaConfiguration;
}
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(jsonConsumerFactory());
factory.setConcurrency(3);
factory.setAutoStartup(true);
return factory;
}
@Bean
public ConsumerFactory<String,Record> jsonConsumerFactory(){
JsonDeserializer<Record> jsonDeserializer = new JsonDeserializer<>(Record.class);
return new DefaultKafkaConsumerFactory<>(jsonConsumerConfigs(),new StringDeserializer(), jsonDeserializer);
}
@Bean
public Map<String,Object> jsonConsumerConfigs(){
Map<String,Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBrokerAddress());
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getJsonGroupId());
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
return propsMap;
}
@Bean
public KafkaListenerContainerFactory<?> kafkaFileListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(fileConsumerFactory());
factory.setConcurrency(3);
factory.setAutoStartup(true);
return factory;
}
@Bean
public ConsumerFactory<String,String> fileConsumerFactory(){
return new DefaultKafkaConsumerFactory<>(fileConsumerConfigs());
}
@Bean
public Map<String,Object> fileConsumerConfigs(){
Map<String,Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBrokerAddress());
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getFileGroupId());
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
return propsMap;
}
}
运行此命令会给我以下错误:
Running this gives me the following error:
Description:
Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
- Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans 'jsonConsumerFactory', 'fileConsumerFactory'
Action:
Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.
我在做什么错了?
推荐答案
好像您不会依赖Spring Boot的 Kafka自动配置.
Looks like you are not going to rely on the Spring Boot's Kafka Auto Configuration.
Spring Boot在KafkaAutoConfiguration
中提供:
Spring Boot provides in the KafkaAutoConfiguration
:
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
由于您具有jsonConsumerFactory
和fileConsumerFactory
,它们会覆盖自动配置提供的那个.
Since you have jsonConsumerFactory
and fileConsumerFactory
, they override that one provided by the auto-config.
但是,另一方面,在KafkaAnnotationDrivenConfiguration
中,不能应用您的任何工厂:
But on the other hand, in the KafkaAnnotationDrivenConfiguration
, non of your factories can be applied:
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
因为您的ConsumerFactory
豆不是ConsumerFactory<Object, Object>
类型.
所以:
- 只需将以下内容添加到应用程序属性文件中,即可从Spring Boot自动配置中排除
KafkaAutoConfiguration
:spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
- 或将您的
KafkaListenerContainerFactory
bean之一重命名为kafkaListenerContainerFactory
以在引导程序中覆盖它 - 或将
ConsumerFactory
豆之一作为ConsumerFactory<Object, Object>
类型.
- Just exclude
KafkaAutoConfiguration
from the Spring Boot auto configuration by adding the following to the application properties file:spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
- or rename one of your
KafkaListenerContainerFactory
beans to thekafkaListenerContainerFactory
to override it in the Boot - or make one of the
ConsumerFactory
beans as aConsumerFactory<Object, Object>
type.
这篇关于添加多个KafkaListenerContainerFactories时出现问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!