本文介绍了Kafka Avro Deserializer无法将Kafka消息反序列化为特定的Avro记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将kafka中的Avro消息反序列化为从Avro Schema生成的POJO.我正在使用KafkaAvroDeserializer进行此转换.

I am trying to deserialize Avro messgaes that are in kafka to POJO generated from Avro Schema. I am using KafkaAvroDeserializer for this conversion.

我能够看到从kafka返回的ConsumerRecord<String, Data>记录中的GenericRecord.但是,当我尝试将此记录分配给生成的POJO类对象时,对于带有ClassCastException的POJO字段的date类型,此操作失败.当我检查Avro有效载荷时,该日期字段将以integer的形式出现.

I am able to see the GenericRecord in the ConsumerRecord<String, Data> record returned from kafka. But when I try to assign this record to generated POJO class object, It is failing for date type of POJO field with ClassCastException. When I checked the avro payload this date field is coming in as integer.

设置: Avro -1.9.1融合-5.4 commercehub gradle插件 0.20.0

Set up:Avro - 1.9.1Confluent - 5.4commercehub gradle plugin 0.20.0

在尝试反序列化Avro消息时,出现的错误为-

While trying to deserialze the Avro message, I am getting the error as -

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 66
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.time.LocalDate
    at com.sample.Data.put(Data.java:229) ~[main/:na]
    at org.apache.avro.generic.GenericData.setField(GenericData.java:795) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) ~[avro-1.9.1.jar:1.9.1]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:287) ~[kafka-avro-serializer-5.4.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:102) ~[kafka-avro-serializer-5.4.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:81) ~[kafka-avro-serializer-5.4.0.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) ~[kafka-avro-serializer-5.4.0.jar:na]
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1290) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ~[kafka-clients-2.5.0.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1091) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1047) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_241]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_241]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_241]

由于ClassCastException

{
    "name": "BIRTH_DT",
    "type": [
        "null",
        {
            "type": "int",
            "logicalType": "date"
        }
    ],
    "default": null
}

生成的POJO中的代码段

Code Snippets from Generated POJO

  @Deprecated public java.time.LocalDate BIRTH_DT;

  // Used by DatumReader.  Applications should not call.
  @SuppressWarnings(value="unchecked")
  public void put(int field$, java.lang.Object value$) {
    switch (field$) {
    .
    .
    case 8: BIRTH_DT = (java.time.LocalDate)value$; break;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }

  public java.time.LocalDate getBIRTHDT() {
    return BIRTH_DT;
  }

  public void setBIRTHDT(java.time.LocalDate value) {
      this.BIRTH_DT = value;
  }

Kafka消费者方法

Kafka Consumer Method

    @KafkaListener(topics = "${spring.kafka.consumer.properties.topic}",
                     groupId = "${spring.kafka.consumer.group-id}")
    // Data is a POJO generated by Avro tools
    public void consume(ConsumerRecord<String, Data> record,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
                        @Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) throws IOException {

        logger.info(String.format("#### -> Consumed message -> partiion: %s, offset: %s", partition, offset));
        Data row = record.value();
        ack.acknowledge();
    }

build.gradle

build.gradle

buildscript {
    repositories {
        jcenter {
            url "https://nexus.abc.com:8443/content/repositories/jcenter/"
        }
    }
    dependencies {
        classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.20.0"
    }
}

plugins {
    id 'org.springframework.boot' version '2.3.1.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'java'
    id 'idea'
    id 'eclipse'
}

repositories {
    maven { url nexusPublicRepoURL }
    maven { url "https://nexus.abc.com:8443/content/repositories/confluence.io-maven/" }
    jcenter()
    maven { url "https://nexus.abc.com:8443/content/repositories/jcenter/" }
}

group = 'com.abc.cscm'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '8'
targetCompatibility = '8'

ext {
    springCloudVersion = 'Hoxton.SR6'
    confluentVersion = '5.4.0'
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.kafka:spring-kafka'

    implementation "io.confluent:kafka-avro-serializer:${confluentVersion}"

    implementation 'org.apache.avro:avro:1.9.1'

    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
    testImplementation 'org.springframework.kafka:spring-kafka-test'
}

springBoot {
    buildInfo()
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

test {
    useJUnitPlatform()
}

wrapper {
    distributionUrl = "https://nexus.abc.com:8443/service/local/repositories/thirdparty/content/org/gradle/gradle/6.5/gradle-6.5.zip"
}

apply plugin: "com.commercehub.gradle.plugin.avro"
apply plugin: 'idea'

./gradlew依赖项--configuration compileClasspath(输出)

./gradlew dependencies --configuration compileClasspath (Output)

> Task :dependencies

------------------------------------------------------------
Root project
------------------------------------------------------------

compileClasspath - Compile classpath for source set 'main'.
                        ** omiting spring deps
+--- io.confluent:kafka-avro-serializer:5.4.0
|    +--- org.apache.avro:avro:1.9.1
|    |    +--- com.fasterxml.jackson.core:jackson-core:2.9.9 -> 2.11.0
|    |    +--- com.fasterxml.jackson.core:jackson-databind:2.9.9.3 -> 2.11.0 (*)
|    |    +--- org.apache.commons:commons-compress:1.19
|    |    \--- org.slf4j:slf4j-api:1.7.25 -> 1.7.30
|    +--- io.confluent:kafka-schema-registry-client:5.4.0
|    |    +--- org.apache.kafka:kafka-clients:5.4.0-ccs -> 2.5.0 (*)
|    |    +--- io.confluent:common-config:5.4.0
|    |    |    +--- io.confluent:common-utils:5.4.0
|    |    |    |    \--- org.slf4j:slf4j-api:1.7.26 -> 1.7.30
|    |    |    \--- org.slf4j:slf4j-api:1.7.26 -> 1.7.30
|    |    +--- org.apache.avro:avro:1.9.1 (*)
|    |    +--- com.fasterxml.jackson.core:jackson-databind:2.9.10.1 -> 2.11.0 (*)
|    |    +--- io.swagger:swagger-annotations:1.5.22
|    |    +--- io.swagger:swagger-core:1.5.3
|    |    |    +--- org.apache.commons:commons-lang3:3.2.1 -> 3.10
|    |    |    +--- org.slf4j:slf4j-api:1.6.3 -> 1.7.30
|    |    |    +--- com.fasterxml.jackson.core:jackson-annotations:2.4.5 -> 2.11.0
|    |    |    +--- com.fasterxml.jackson.core:jackson-databind:2.4.5 -> 2.11.0 (*)
|    |    |    +--- com.fasterxml.jackson.datatype:jackson-datatype-joda:2.4.5 -> 2.11.0
|    |    |    |    +--- com.fasterxml.jackson.core:jackson-core:2.11.0
|    |    |    |    \--- joda-time:joda-time:2.9.9
|    |    |    +--- com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.4.5 -> 2.11.0
|    |    |    |    +--- com.fasterxml.jackson.core:jackson-databind:2.11.0 (*)
|    |    |    |    +--- org.yaml:snakeyaml:1.26
|    |    |    |    \--- com.fasterxml.jackson.core:jackson-core:2.11.0
|    |    |    +--- io.swagger:swagger-models:1.5.3
|    |    |    |    +--- com.fasterxml.jackson.core:jackson-annotations:2.4.5 -> 2.11.0
|    |    |    |    +--- org.slf4j:slf4j-api:1.6.3 -> 1.7.30
|    |    |    |    \--- io.swagger:swagger-annotations:1.5.3 -> 1.5.22
|    |    |    \--- com.google.guava:guava:18.0 -> 29.0-android
|    |    |         +--- com.google.guava:failureaccess:1.0.1
|    |    |         +--- com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava
|    |    |         +--- com.google.code.findbugs:jsr305:3.0.2
|    |    |         +--- org.checkerframework:checker-compat-qual:2.5.5
|    |    |         +--- com.google.errorprone:error_prone_annotations:2.3.4
|    |    |         \--- com.google.j2objc:j2objc-annotations:1.3
|    |    \--- io.confluent:common-utils:5.4.0 (*)
|    +--- io.confluent:common-config:5.4.0 (*)
|    \--- io.confluent:common-utils:5.4.0 (*)
\--- org.apache.avro:avro:1.9.1 (*)

./gradlew buildEnvironment(输出)

./gradlew buildEnvironment (Output)

classpath
+--- com.commercehub.gradle.plugin:gradle-avro-plugin:0.20.0
|    \--- org.apache.avro:avro-compiler:1.9.2    <<<<<<<<<<<<<<<<<<<<<<<<<<
|         +--- org.apache.avro:avro:1.9.2
|         |    +--- com.fasterxml.jackson.core:jackson-core:2.10.2 -> 2.11.0
|         |    +--- com.fasterxml.jackson.core:jackson-databind:2.10.2 -> 2.11.0
|         |    |    +--- com.fasterxml.jackson.core:jackson-annotations:2.11.0
|         |    |    \--- com.fasterxml.jackson.core:jackson-core:2.11.0
|         |    +--- org.apache.commons:commons-compress:1.19
|         |    \--- org.slf4j:slf4j-api:1.7.25 -> 1.7.30
|         +--- org.apache.commons:commons-lang3:3.9 -> 3.10
|         +--- org.apache.velocity:velocity-engine-core:2.2
|         |    +--- org.apache.commons:commons-lang3:3.9 -> 3.10
|         |    \--- org.slf4j:slf4j-api:1.7.30
|         +--- com.fasterxml.jackson.core:jackson-databind:2.10.2 -> 2.11.0 (*)
|         +--- joda-time:joda-time:2.10.1
|         \--- org.slf4j:slf4j-api:1.7.25 -> 1.7.30

我不确定我应该编辑生成的POJO类还是缺少某些内容.

I am not sure if I should be editing my generated POJO class or I am missing something.

我能够通过更改以下问题中提到的模式将avro消息转换为POJO.但是我认为这很棘手,问题尚未解决.

I was able to convert avro message into POJO by changing the schema as mentioned in below question. But I think it is hacky and problem is not yet solved.

问题- Avro不是能够反序列化字段中具有逻辑类型的联合

推荐答案

能否请您说明使用哪个版本的avro-maven-plugin生成POJO?从avro版本1.9.0开始,不赞成使用Joda-Time,而推荐使用Java8 JSR310,并且将Java8设置为默认值.参见 Apache Avro 1.9.0发行说明.

Could you please clarify what version of avro-maven-plugin are you using to generate the POJO ? As of avro version 1.9.0, Joda-Time has been deprecated in favor of Java8 JSR310 and Java8 was set as default. See Apache Avro 1.9.0 release notes.

当我从头开始生成POJO时,得到的是java.time.LocalDate BIRTH_DT而不是org.joda.time.LocalDate BIRTH_DT.

When I generate a POJO from scratch, I get java.time.LocalDate BIRTH_DT and not org.joda.time.LocalDate BIRTH_DT.

   @Deprecated public java.time.LocalDate BIRTH_DT;

因此,在您的情况下,我认为类路径中的avro版本或过时的pojo极有可能不匹配.我建议通过mvn dependency:tree -Dincludes=org.apache.avro:avro调用验证avro版本并重新生成POJO.

So, in your case I think there is most likely an avro versions mismatch inside the classpath or a stale pojo. I would recommend verifying the avro version via mvn dependency:tree -Dincludes=org.apache.avro:avro call and regenerating POJO.

这篇关于Kafka Avro Deserializer无法将Kafka消息反序列化为特定的Avro记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 17:15
查看更多