本文介绍了Kafka Spring Deserialzer returnType 静态方法从未调用过的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我得到的错误:

org.apache.kafka.common.errors.SerializationException: Error deserializing
key/value for partition distance-0 at offset 0. If needed, please seek past
the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers
and no default type provided
    at org.springframework.util.Assert.state(Assert.java:73)
    at org.

springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:370)

springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:370)

和 applicaiton-dev.yml:

And applicaiton-dev.yml:

spring:
  json:
    use:
      type:
        headers: false
    value:
      default:
        type: Object
        method: com.mycompany.mypackage.KafkaConfig.returnType

也尝试过:

consumer:
  bootstrap-servers: 10.10.5.189:9092
  value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
  enable-auto-commit: false
  auto-offset-reset: earliest
  properties:
    spring:
      json:
        trusted:
          packages: '*'
        value:
          default:
            method: com.mycompany.mypackage.KafkaConfig.returnTypee

没有警告,但我将 printlin 置于静态方法中并且它从未触发过?什么给?

There were no warning but i put printlin in static method and it never fired? What gives?

 @KafkaListener(topics = "distance", groupId = "${kafka.myinfo.id}")
 public void handle(CustDeletedEvent custDeletedEvent) {
        log.debug("received jsonNode: "+ userDeletedEvent);

KafkaConfig.java

KafkaConfig.java

  // NEVER CALLED!!!
  public static JavaType returnType(byte[] data, Headers headers) {
    System.out.println("return type called data.length="+data.length);
    JavaType custDeletedEvent =
  TypeFactory.defaultInstance().constructType(CustDeletedEvent.class);
   return custDeletedEvent;
 }

最新配置:

  kafka:
bootstrap-servers: 10.10.5.189:9092
producer:
  bootstrap-servers: 10.10.5.189:9092
  value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
  bootstrap-servers: 10.10.5.189:9092
  value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
  enable-auto-commit: false
  auto-offset-reset: earliest
  properties:
    spring:
      json:
        trusted:
          packages: '*'
        value:
          method: mypackage.config.KafkaConfig.returnType

这里配置的Spring Kafka

Spring Kafka configured here

<dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
</dependency>

这里是确切的 yaml:

here is exact yaml:

spring:
  profiles:
    active: dev
  kafka:
    bootstrap-servers: 10.10.5.189:9092
    producer:
      bootstrap-servers: 10.10.5.189:9092
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      bootstrap-servers: 10.10.5.189:9092
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      enable-auto-commit: false
      auto-offset-reset: earliest
      properties:
        spring:
          json:
            trusted:
              packages: '*'
            value:
              method: com.service.cust.impl.returnType

推荐答案

需要在spring.kafka.consumer.properties.spring.json....

这篇关于Kafka Spring Deserialzer returnType 静态方法从未调用过的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-05 09:52