kafkaListenerContainerFactory

kafkaListenerContainerFactory

我有一个kafka消费者Java应用程序。
我正在从该应用程序中读取两个单独的kafka主题。
对于主题1,有一个名称为kafkaListenerContainerFactory的KafkaListenerContainerFactory,下面是代码片段。该消息是avro格式。 Pojo1是pojo类
使用avro模式构建。

 @Bean
public ConcurrentKafkaListenerContainerFactory<String, Pojo1> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Pojo1> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
}
为了使用来自topic1的消息,我们有以下方法@kafkaListener (topics="topic1") public boolean readTopic1(ConsumerRecord<String,Pojo1> record){ // logic }现在,该应用程序也从主题2中读取消息,该消息也具有avro消息,而Pojo2是使用相应avro模式构建的pojo类。
要阅读topic2,我们有以下方法@kafkaListener (topics="topic2") public boolean readTopic2(ConsumerRecord<String,Pojo2> record){ // logic }现在,我不了解主题2消耗的消息如何按预期工作。
kafkaListenerContainerFactory已配置Pojo1,因此如何从topic2读取Pojo2的消息。
据我所知,仅当缺少名称为“kafkaListenerContainerFactory”的bean时,kafka才会创建默认容器工厂,但是在我的应用程序中
我们已经为topic1建立了一个kafkaListenerContainerFactory。
因此,应该为Pojo2创建另一个KafkaListenerContainerFactory,并且在使用@kafkaListener (topics="topic2", containerFactory="factoryForTopic2")时应提供参考
有人可以帮助我了解KafkaListenerContainerFactory如何针对具有不同消息模式的不同主题进行工作。

最佳答案

这称为类型擦除;泛型类型仅在编译时适用。在运行时,只要您的反序列化器创建正确的类型,它将起作用。
使用ConcurrentKafkaListenerContainerFactory<String, Object>可能会更干净,以避免任何混乱;尤其是在使用可以创建不同类型的avro或json解串器时。
就是说,您的bean称为importDecisionMessageKafkaListenerContainerFactory。为了使侦听器使用非标准工厂,必须在侦听器的containerFactory属性中指定其名称;否则使用kafkaListenerContainerFactory

10-01 17:15