我有一个kafka消费者Java应用程序。
我正在从该应用程序中读取两个单独的kafka主题。
对于主题1,有一个名称为kafkaListenerContainerFactory的KafkaListenerContainerFactory,下面是代码片段。该消息是avro格式。 Pojo1是pojo类
使用avro模式构建。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Pojo1> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Pojo1> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
为了使用来自topic1的消息,我们有以下方法@kafkaListener (topics="topic1") public boolean readTopic1(ConsumerRecord<String,Pojo1> record){ // logic }
现在,该应用程序也从主题2中读取消息,该消息也具有avro消息,而Pojo2是使用相应avro模式构建的pojo类。要阅读topic2,我们有以下方法
@kafkaListener (topics="topic2") public boolean readTopic2(ConsumerRecord<String,Pojo2> record){ // logic }
现在,我不了解主题2消耗的消息如何按预期工作。kafkaListenerContainerFactory已配置Pojo1,因此如何从topic2读取Pojo2的消息。
据我所知,仅当缺少名称为“kafkaListenerContainerFactory”的bean时,kafka才会创建默认容器工厂,但是在我的应用程序中
我们已经为topic1建立了一个kafkaListenerContainerFactory。
因此,应该为Pojo2创建另一个KafkaListenerContainerFactory,并且在使用
@kafkaListener (topics="topic2", containerFactory="factoryForTopic2")
时应提供参考有人可以帮助我了解KafkaListenerContainerFactory如何针对具有不同消息模式的不同主题进行工作。
最佳答案
这称为类型擦除;泛型类型仅在编译时适用。在运行时,只要您的反序列化器创建正确的类型,它将起作用。
使用ConcurrentKafkaListenerContainerFactory<String, Object>
可能会更干净,以避免任何混乱;尤其是在使用可以创建不同类型的avro或json解串器时。
就是说,您的bean称为importDecisionMessageKafkaListenerContainerFactory
。为了使侦听器使用非标准工厂,必须在侦听器的containerFactory
属性中指定其名称;否则使用kafkaListenerContainerFactory
。