本文介绍了Apache Kafka和Avro:org.apache.avro.generic.GenericData $ Record无法强制转换为com.harmeetsingh13.java.Customer的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

每当我尝试从kafka队列中读取消息时,我都会遇到以下异常:

Whenever I am trying to read the message from kafka queue, I am getting following exception :

[error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer
        at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:79)
        at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:87)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)

Kafka制片人代码:

Kafka Producer Code:

public class AvroSpecificProducer {
    private static Properties kafkaProps = new Properties();
    private static KafkaProducer<String, Customer> kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    public static void fireAndForget(ProducerRecord<String, Customer> record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord<String, Customer> record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: "+ recordMetaData.offset());
            System.out.println("Topic: "+ recordMetaData.topic());
            System.out.println("Partition: "+ recordMetaData.partition());
            System.out.println("Timestamp: "+ recordMetaData.timestamp());
        });
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        Customer customer1 = new Customer(1002, "Jimmy");
        ProducerRecord<String, Customer> record1 = new ProducerRecord<>("CustomerSpecificCountry",
                "Customer One 11 ", customer1
        );

        asyncSend(record1);

        Thread.sleep(1000);
    }
}

Kafka消费者代码:

Kafka Consumer Code:

public class AvroSpecificDeserializer {

    private static Properties kafkaProps = new Properties();

    static {
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1");
        kafkaProps.put("zookeeper.connect", "localhost:2181");
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
    }

    public static void infiniteConsumer() throws IOException {
        VerifiableProperties properties = new VerifiableProperties(kafkaProps);
        KafkaAvroDecoder keyDecoder = new KafkaAvroDecoder(properties);
        KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(properties);

        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put("NewTopic", 1);

        ConsumerConnector consumer = createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(kafkaProps));
        Map<String, List<KafkaStream<Object, Object>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);

        KafkaStream stream = consumerMap.get("NewTopic").get(0);
        ConsumerIterator it = stream.iterator();

        System.out.println("???????????????????????????????????????????????? ");
        while (it.hasNext()) {
            System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ");
            MessageAndMetadata messageAndMetadata = it.next();
            String key = (String) messageAndMetadata.key();
            GenericRecord record = (GenericRecord) messageAndMetadata.message();
            Customer customer = (Customer) SpecificData.get().deepCopy(Customer.SCHEMA$, record);
            System.out.println("Key: " + key);
            System.out.println("Value: " + customer);
        }

    }

    public static void main(String[] args) throws IOException {
        infiniteConsumer();
    }
}

我关注,这些例子:




  1. https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-producer/src/main/java/io/confluent/examples/producer/AvroClicksProducer.java
  2. https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-consumer/src/main/java/io/confluent/examples/consumer/AvroClicksSessionizer.java


推荐答案

在与@harmeen讨论后,这是最终的代码

This is the final code that would work, after discussing with @harmeen

static {
    kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
    kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1");
    kafkaProps.put("zookeeper.connect", "localhost:2181");
    kafkaProps.put("schema.registry.url", "http://localhost:8081");
    kafkaProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
}

public static void infiniteConsumer() throws IOException {

VerifiableProperties properties = new VerifiableProperties(kafkaProps);
StringDecoder keyDecoder = new StringDecoder(properties);
KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(properties);

Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put("BrandNewTopics", 1);

ConsumerConnector consumer = createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(kafkaProps));
Map<String, List<KafkaStream<String, Object>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);

KafkaStream stream = consumerMap.get("BrandNewTopics").get(0);
ConsumerIterator it = stream.iterator();

while (it.hasNext()) {
    MessageAndMetadata messageAndMetadata = it.next();
    String key = (String) messageAndMetadata.key();
    GenericRecord record = (GenericRecord) messageAndMetadata.message();
    Customer customer = (Customer) SpecificData.get().deepCopy(Customer.SCHEMA$, record);
    System.out.println("Key: " + key);
    System.out.println("Value: " + customer);
}

变化的事情:


  • SPECIFIC_AVRO_READER_CONFIG 属性添加为true。

  • 使用最小从主题的开头开始。

  • 使用 StringSerializer StringDeserializer 对于密钥。

  • 更改生产者和消费者以反映先前的更改

  • 调整客户的名称空间代表Avro记录的类。

  • Adding SPECIFIC_AVRO_READER_CONFIG property to true.
  • Using smallest to start from the beginning of the topic.
  • Using StringSerializer and StringDeserializer for keys.
  • Change both producer and consumer to reflect the previous change
  • Adjust the namespace for the Customer class that represents the Avro record.

这篇关于Apache Kafka和Avro:org.apache.avro.generic.GenericData $ Record无法强制转换为com.harmeetsingh13.java.Customer的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-28 02:36