本文介绍了Apache Kafka和Avro:org.apache.avro.generic.GenericData $ Record无法强制转换为com.harmeetsingh13.java.Customer的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
每当我尝试从kafka队列中读取消息时,我都会遇到以下异常:
Whenever I am trying to read the message from kafka queue, I am getting following exception :
[error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer
at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:79)
at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:87)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
Kafka制片人代码:
Kafka Producer Code:
public class AvroSpecificProducer {
private static Properties kafkaProps = new Properties();
private static KafkaProducer<String, Customer> kafkaProducer;
static {
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
kafkaProps.put("schema.registry.url", "http://localhost:8081");
kafkaProducer = new KafkaProducer<>(kafkaProps);
}
public static void fireAndForget(ProducerRecord<String, Customer> record) {
kafkaProducer.send(record);
}
public static void asyncSend(ProducerRecord<String, Customer> record) {
kafkaProducer.send(record, (recordMetaData, ex) -> {
System.out.println("Offset: "+ recordMetaData.offset());
System.out.println("Topic: "+ recordMetaData.topic());
System.out.println("Partition: "+ recordMetaData.partition());
System.out.println("Timestamp: "+ recordMetaData.timestamp());
});
}
public static void main(String[] args) throws InterruptedException, IOException {
Customer customer1 = new Customer(1002, "Jimmy");
ProducerRecord<String, Customer> record1 = new ProducerRecord<>("CustomerSpecificCountry",
"Customer One 11 ", customer1
);
asyncSend(record1);
Thread.sleep(1000);
}
}
Kafka消费者代码:
Kafka Consumer Code:
public class AvroSpecificDeserializer {
private static Properties kafkaProps = new Properties();
static {
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1");
kafkaProps.put("zookeeper.connect", "localhost:2181");
kafkaProps.put("schema.registry.url", "http://localhost:8081");
}
public static void infiniteConsumer() throws IOException {
VerifiableProperties properties = new VerifiableProperties(kafkaProps);
KafkaAvroDecoder keyDecoder = new KafkaAvroDecoder(properties);
KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(properties);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put("NewTopic", 1);
ConsumerConnector consumer = createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(kafkaProps));
Map<String, List<KafkaStream<Object, Object>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
KafkaStream stream = consumerMap.get("NewTopic").get(0);
ConsumerIterator it = stream.iterator();
System.out.println("???????????????????????????????????????????????? ");
while (it.hasNext()) {
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ");
MessageAndMetadata messageAndMetadata = it.next();
String key = (String) messageAndMetadata.key();
GenericRecord record = (GenericRecord) messageAndMetadata.message();
Customer customer = (Customer) SpecificData.get().deepCopy(Customer.SCHEMA$, record);
System.out.println("Key: " + key);
System.out.println("Value: " + customer);
}
}
public static void main(String[] args) throws IOException {
infiniteConsumer();
}
}
我关注,这些例子:
- https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-producer/src/main/java/io/confluent/examples/producer/AvroClicksProducer.java
- https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-consumer/src/main/java/io/confluent/examples/consumer/AvroClicksSessionizer.java
推荐答案
在与@harmeen讨论后,这是最终的代码
This is the final code that would work, after discussing with @harmeen
static {
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1");
kafkaProps.put("zookeeper.connect", "localhost:2181");
kafkaProps.put("schema.registry.url", "http://localhost:8081");
kafkaProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
}
public static void infiniteConsumer() throws IOException {
VerifiableProperties properties = new VerifiableProperties(kafkaProps);
StringDecoder keyDecoder = new StringDecoder(properties);
KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(properties);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put("BrandNewTopics", 1);
ConsumerConnector consumer = createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(kafkaProps));
Map<String, List<KafkaStream<String, Object>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
KafkaStream stream = consumerMap.get("BrandNewTopics").get(0);
ConsumerIterator it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata messageAndMetadata = it.next();
String key = (String) messageAndMetadata.key();
GenericRecord record = (GenericRecord) messageAndMetadata.message();
Customer customer = (Customer) SpecificData.get().deepCopy(Customer.SCHEMA$, record);
System.out.println("Key: " + key);
System.out.println("Value: " + customer);
}
变化的事情:
- 将
SPECIFIC_AVRO_READER_CONFIG
属性添加为true。 - 使用最小从主题的开头开始。
- 使用
StringSerializer
和StringDeserializer
对于密钥。 - 更改生产者和消费者以反映先前的更改
- 调整
客户的名称空间
代表Avro记录的类。
- Adding
SPECIFIC_AVRO_READER_CONFIG
property to true. - Using smallest to start from the beginning of the topic.
- Using
StringSerializer
andStringDeserializer
for keys. - Change both producer and consumer to reflect the previous change
- Adjust the namespace for the
Customer
class that represents the Avro record.
这篇关于Apache Kafka和Avro:org.apache.avro.generic.GenericData $ Record无法强制转换为com.harmeetsingh13.java.Customer的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!