我有一个主题可以从中接收不同类型的json。但是,似乎在消费者尝试读取消息时出现异常。我尝试添加其他bean名称,但是没有用。似乎它试图从该主题读取并试图转换为从该主题读取的所有类型。有没有一种方法可以指定只为特定的输入类型启用特定的工厂。还有其他方法可以解决此问题。

错误


  造成原因:
  org.springframework.messaging.converter.MessageConversionException:
  无法转换
  [com.lte.assessment.assesssments.AssessmentAttemptRequest]至
  [com.lte.assessmentanalytics.data.SiteLevelAnalyticsRequest]用于
  通用消息
  [payload=com.lte.assessment.assesssments.AssessmentAttemptRequest@68eb637f,
  标头= {kafka_offset = 22,
  kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@252d8ffb,
  kafka_timestampType = CREATE_TIME,kafka_receivedMessageKey =空,
  kafka_receivedPartitionId = 0,kafka_receivedTopic = ltetopic,
  kafka_receivedTimestamp = 1546117529267}


设定档

@EnableKafka
@Configuration
public class KafkaConfig {
    static Map<String, Object> config = new HashMap();

    static {
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    }


    @Bean
    public ConsumerFactory<String, AssessmentAttemptRequest> assessmentAttemptDetailsEntityConsumerFactory() {
        JsonDeserializer<AssessmentAttemptRequest> deserializer = new JsonDeserializer<>();
        deserializer.addTrustedPackages("com.lte.assessment.assessments");
        return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);
    }

    @Bean(name="aaKafkaListenerFactory")
    public ConcurrentKafkaListenerContainerFactory aaKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, AssessmentAttemptRequest> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(assessmentAttemptDetailsEntityConsumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, AssessmentQuestionAnalyticsEntity> assessmentQuestionAnalyticssEntityConsumerFactory() {
        JsonDeserializer<AssessmentQuestionAnalyticsEntity> deserializer = new JsonDeserializer<>();
        deserializer.addTrustedPackages("com.lte.assessment.assessments");
        return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);
    }

    @Bean(name="aqKafkaListenerFactory")
    public ConcurrentKafkaListenerContainerFactory aqKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, AssessmentQuestionAnalyticsEntity> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(assessmentQuestionAnalyticssEntityConsumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, SiteLevelAnalyticsEntity> siteLevelAnalyticsEntityConsumerFactory() {
        JsonDeserializer<SiteLevelAnalyticsEntity> deserializer = new JsonDeserializer<>();
        deserializer.addTrustedPackages("com.lte.assessment.assessments");
        return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);
    }

    @Bean("slaKafkaListenerFactory")
    public ConcurrentKafkaListenerContainerFactory slaKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, SiteLevelAnalyticsEntity> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(siteLevelAnalyticsEntityConsumerFactory());
        return factory;
    }
}


服务

@Service
public class TopicObserver implements
        ConsumerSeekAware.ConsumerSeekCallback,ConsumerSeekAware{

    @Autowired
    private AssessmentAttemptService assessmentAttemptService;

    @Autowired
    private AssessmentQuestionService assessmentQuestionService;

    @Autowired
    private SiteLevelAnalyticsService siteLevelAnalyticsService;

    private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

    @KafkaListener(topics = "ltetopic", groupId = "group_id", containerFactory = "aaKafkaListenerFactory")
    public void consumeAttemptDetails(AssessmentAttemptRequest request) {
        assessmentAttemptService.storeAttempDetails(request);
    }

    @KafkaListener(topics = "ltetopic", groupId = "group_id", containerFactory = "aqKafkaListenerFactory")
    public void setAssessmentQeustionAnalytics(AssessmentQuestionRequest request) {
        assessmentQuestionService.storeQuestionDetails(request);
    }

    @KafkaListener(topics = "ltetopic", groupId = "group_id", containerFactory = "slaKafkaListenerFactory")
    public void siteLevelAnalytics(SiteLevelAnalyticsRequest request) {
        siteLevelAnalyticsService.storeSiteLevelDetailsDetails(request);
    }
}

最佳答案

@Bean
public ConsumerFactory<String, SiteLevelAnalyticsEntity> siteLevelAnalyticsEntityConsumerFactory() {
    JsonDeserializer<SiteLevelAnalyticsEntity> deserializer = new JsonDeserializer<>();
    deserializer.addTrustedPackages("com.lte.assessment.assessments");
    return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);
}


在消费工厂中,您分配SiteLevelAnalyticsEntityJsonDeserializer是评估包。
请定义-deserializer.addTrustedPackages("com.lte.assessment.SiteLevelAnalyticsEntity");

关于java - 如何从kafka spring boot中的一个主题读取多种类型的json,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53973375/

10-12 06:22