本文介绍了KafkaException:类不是 org.apache.kafka.common.serialization.Deserializer 的实例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述 29岁程序员,3月因学历无情被辞! 我想实现发送和接收 Java 序列化对象的 Kafka 生产者.我试过这个:制作人:@Configuration公共类 KafkaProducerConfig {@Value(value = "${kafka.bootstrapAddress}")私人字符串引导地址;@豆角,扁豆公共 ProducerFactorysaleRequestFactoryProducerFactory() {映射configProps = new HashMap();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SaleRequestFactory.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactory.class);返回新的 DefaultKafkaProducerFactory<>(configProps);}@豆角,扁豆公共 ProducerFactory生产者工厂(){映射configProps = new HashMap();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);/*序列化配置*/返回新的 DefaultKafkaProducerFactory<>(configProps);}@豆角,扁豆公共 KafkaTemplatesaleRequestFactoryKafkaTemplate() {返回新的 KafkaTemplate<>(saleRequestFactoryProducerFactory());}@豆角,扁豆公共 KafkaTemplate卡夫卡模板(){返回新的 KafkaTemplate<>(producerFactory());}}发送对象:@Autowired私人 KafkaTemplatesaleRequestFactoryKafkaTemplate;private static String topic = "tp-sale";私有无效执行(){SaleRequestFactory obj = new SaleRequestFactory();obj.setId(100);ListenableFuture发送 = saleRequestFactoryKafkaTemplate.send(topic, obj);}消费者:@EnableKafka@配置公共类 KafkaConsumerConfig {@Value(value = "${kafka.bootstrapAddress}")私人字符串引导地址;private String groupId =测试";@豆角,扁豆public ConsumerFactory消费者工厂(){映射props = new HashMap();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactory.class);返回新的 DefaultKafkaConsumerFactory<>(props);}@豆角,扁豆public ConcurrentKafkaListenerContainerFactorykafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory工厂 =新的 ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());返厂;}}//接收对象 @KafkaListener(topics = "tp-sale")public SaleResponseFactory transactionElavonAuthorizeProcess(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) 抛出异常 {System.out.println(tf.getId());SaleResponseFactory resObj = new SaleResponseFactory();resObj.setUnique_id(123123");返回 resObj;}当我部署 Producer 时,我在部署过程中遇到错误: 引起:org.apache.kafka.common.KafkaException: class org.engine.plugin.transactions.factory.SaleResponseFactory 不是 org.apache.kafka.common.serialization.Deserializer 的实例自定义对象import org.apache.kafka.common.serialization.Serializer;@Getter@Setter@NoArgsConstructor@AllArgsConstructor@Builder(toBuilder = true)公共类 SaleRequestFactory 实现了 Serializable,Serializer {私有静态最终长serialVersionUID = 1744050117179344127L;私有整数 ID;@覆盖公共字节 [] 序列化(字符串 s,对象 o){返回新字节[0];}}导入 org.apache.kafka.common.serialization.Deserializer;@Getter@Setter@NoArgsConstructor@AllArgsConstructor@Builder(toBuilder = true)公共类 SaleResponseFactory 实现了可序列化、反序列化器 {私有静态最终长serialVersionUID = 1744050117179344127L;私人字符串unique_id;@覆盖公共对象反序列化(字符串s,字节[]字节){返回空;}}你知道我该如何解决这个问题吗?我试过这个:制作人:@Configuration公共类 KafkaProducerConfig {@Value(value = "${kafka.bootstrapAddress}")私人字符串引导地址;@豆角,扁豆公共 ProducerFactorysaleRequestFactoryProducerFactory() {映射configProps = new HashMap();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class);返回新的 DefaultKafkaProducerFactory<>(configProps);}@豆角,扁豆公共 ProducerFactory生产者工厂(){映射configProps = new HashMap();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);返回新的 DefaultKafkaProducerFactory<>(configProps);}@豆角,扁豆公共 KafkaTemplatesaleRequestFactoryKafkaTemplate() {返回新的 KafkaTemplate<>(saleRequestFactoryProducerFactory());}@豆角,扁豆公共 KafkaTemplate卡夫卡模板(){返回新的 KafkaTemplate<>(producerFactory());}}发送对象:@Autowired私人 KafkaTemplatesaleRequestFactoryKafkaTemplate;private static String topic = "tp-sale";私有无效执行(){SaleRequestFactory obj = new SaleRequestFactory();obj.setId(100);ListenableFuture发送 = saleRequestFactoryKafkaTemplate.send(topic, obj);}消费者:@EnableKafka@配置公共类 KafkaConsumerConfig {@Value(value = "${kafka.bootstrapAddress}")私人字符串引导地址;private String groupId =测试";@豆角,扁豆public ConsumerFactory消费者工厂(){映射props = new HashMap();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDe​​serializer.class);返回新的 DefaultKafkaConsumerFactory<>(props);}@豆角,扁豆public ConcurrentKafkaListenerContainerFactorykafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory工厂 =新的 ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());返厂;}}//接收对象 @KafkaListener(topics = "tp-sale")public SaleResponseFactory transactionElavonAuthorizeProcess(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) 抛出异常 {System.out.println(tf.getId());SaleResponseFactory resObj = new SaleResponseFactory();resObj.setUnique_id(123123");返回 resObj;}自定义对象 @Getter@Setter@NoArgsConstructor@AllArgsConstructor@Builder(toBuilder = true)公共类 SaleRequestFactory 实现可序列化{私有静态最终长serialVersionUID = 1744050117179344127L;私有整数 ID;}公共类 SaleRequestFactorySerializer 实现了 Serializable,Serializer{@覆盖公共字节 [] 序列化(字符串主题,SaleRequestFactory 数据){//将数据转换为字节[]ByteArrayOutputStream out = new ByteArrayOutputStream();尝试{ObjectOutputStream outputStream = new ObjectOutputStream(out);outputStream.writeObject(data);关闭();}捕获(IOException e){e.printStackTrace();}返回 out.toByteArray();}}@Getter@Setter@NoArgsConstructor@AllArgsConstructor@Builder(toBuilder = true)公共类 SaleResponseFactory 实现可序列化{私有静态最终长serialVersionUID = 1744050117179344127L;私人字符串unique_id;}公共类 SaleResponseFactoryDe​​serializer 实现了 Serializable、Deserializer<SaleResponseFactory>{@覆盖公共销售响应工厂反序列化(字符串主题,字节 [] 数据){//将数据转换为 SaleResponseFactorySaleResponseFactory saleResponseFactory = null;尝试{ByteArrayInputStream bis = new ByteArrayInputStream(data);ObjectInputStream in = new ObjectInputStream(bis);saleResponseFactory = (SaleResponseFactory) in.readObject();附寄();}catch (IOException | ClassNotFoundException e){e.printStackTrace();}退货销售响应工厂;}}当我尝试发送消息时出现错误:Caused by: org.apache.kafka.common.errors.SerializationException: 在偏移量 0 处反序列化分区 tp-sale-0 的键/值时出错.如果需要,请寻找记录以继续消费.引起:java.lang.ClassCastException: null21:27:51.152 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] 调试 KafkaMessageListenerContainer$ListenerConsumer[debug:296] - 提交列表:{}21:27:51.153 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] 错误 KafkaMessageListenerContainer$ListenerConsumer[error:149] - 消费者异常java.lang.IllegalStateException:此错误处理程序无法直接处理SerializationException";请考虑在值和/或键解串器中配置ErrorHandlingDeserializer"在 org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145)在 org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103)在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1265)在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1022)在 java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)在 java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)在 java.base/java.lang.Thread.run(Thread.java:835)引起:org.apache.kafka.common.errors.SerializationException:在偏移量0处反序列化分区tp-sale-0的键/值时出错.如果需要,请查看记录以继续消费.引起:java.lang.ClassCastException:null你知道我该如何解决这个问题吗?我设法实现了这些改进:制作人:@Configuration公共类 KafkaProducerConfig {@Value(value = "${kafka.bootstrapAddress}")私人字符串引导地址;@豆角,扁豆公共 ProducerFactorysaleRequestFactoryProducerFactory() {映射configProps = new HashMap();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class);返回新的 DefaultKafkaProducerFactory<>(configProps);}@豆角,扁豆公共 ProducerFactory生产者工厂(){映射configProps = new HashMap();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);返回新的 DefaultKafkaProducerFactory<>(configProps);}@豆角,扁豆公共 KafkaTemplatesaleRequestFactoryKafkaTemplate() {返回新的 KafkaTemplate<>(saleRequestFactoryProducerFactory());}@豆角,扁豆公共 KafkaTemplate卡夫卡模板(){返回新的 KafkaTemplate<>(producerFactory());}}发送对象:@Autowired私人 KafkaTemplatesaleRequestFactoryKafkaTemplate;private static String topic = "tp-sale";私有无效执行(){SaleRequestFactory obj = new SaleRequestFactory();obj.setId(100);ListenableFuture发送 = saleRequestFactoryKafkaTemplate.send(topic, obj);}消费者:@EnableKafka@配置公共类 KafkaConsumerConfig {@Value(value = "${kafka.bootstrapAddress}")私人字符串引导地址;private String groupId =测试";@豆角,扁豆public ConsumerFactory消费者工厂(){映射props = new HashMap();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDe​​serializer.class);返回新的 DefaultKafkaConsumerFactory<>(props);}@豆角,扁豆public ConcurrentKafkaListenerContainerFactorykafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory工厂 =新的 ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());返厂;}}接收对象 @KafkaListener(topics = "tp-sale")public SaleResponseFactory transactionElavonAuthorizeProcess(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) 抛出异常 {System.out.println(tf.getId());SaleResponseFactory resObj = new SaleResponseFactory();resObj.setUnique_id(123123");返回 resObj;}自定义对象 @Getter@Setter@NoArgsConstructor@AllArgsConstructor@Builder(toBuilder = true)公共类 SaleRequestFactory 实现可序列化{私有静态最终长serialVersionUID = 1744050117179344127L;私有整数 ID;}公共类 SaleRequestFactorySerializer 实现了 Serializable,Serializer{@覆盖公共字节 [] 序列化(字符串主题,SaleRequestFactory 数据){//将数据转换为字节[]ByteArrayOutputStream out = new ByteArrayOutputStream();尝试{ObjectOutputStream outputStream = new ObjectOutputStream(out);outputStream.writeObject(data);关闭();}捕获(IOException e){e.printStackTrace();}返回 out.toByteArray();}}@Getter@Setter@NoArgsConstructor@AllArgsConstructor@Builder(toBuilder = true)公共类 SaleResponseFactory 实现可序列化{私有静态最终长serialVersionUID = 1744050117179344127L;私人字符串unique_id;}公共类 SaleResponseFactoryDe​​serializer 实现了 Serializable、Deserializer<SaleResponseFactory>{@覆盖公共销售响应工厂反序列化(字符串主题,字节 [] 数据){//将数据转换为 SaleResponseFactorySaleResponseFactory saleResponseFactory = null;尝试{ByteArrayInputStream bis = new ByteArrayInputStream(data);ObjectInputStream in = new ObjectInputStream(bis);saleResponseFactory = (SaleResponseFactory) in.readObject();附寄();}catch (IOException | ClassNotFoundException e){e.printStackTrace();}退货销售响应工厂;}}当我发送一些消息时出现错误:13:03:53.675 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG RecordMessagingMessageListenerAdapter[debug:296] - 监听器方法返回结果 [org.factory.SaleResponseFactory@69c400ab] -为其生成响应消息13:03:53.675 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG RecordMessagingMessageListenerAdapter[debug:296] - 没有replyTopic来处理回复:org.factory.SaleResponseFactory@69c400ab你知道我该如何解决这个问题吗? 解决方案 您正在使用与序列化对象不同的类型来强制转换对象.不知道为什么你需要这样做.您可以将反序列化更新为如下所示的内容.public class SaleRequestFactoryDe​​serializer 实现了Serializable,Deserializer{@覆盖公共销售请求工厂反序列化(字符串主题,字节 [] 数据){...saleRequestFactory = (SaleRequestFactory) in.readObject();}}java.lang.ClassCastException: null这也意味着您的序列化没有按预期工作.在尝试投射之前,请确保您拥有有效的有效载荷.I want to implement Kafka producer which sends and receives Java Serialized Objects. I tried this:Producer:@Configurationpublic class KafkaProducerConfig {@Value(value = "${kafka.bootstrapAddress}")private String bootstrapAddress;@Beanpublic ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SaleRequestFactory.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactory.class); return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); /* Serialization configuration */ return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() { return new KafkaTemplate<>(saleRequestFactoryProducerFactory());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory());}}Send object:@Autowiredprivate KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate;private static String topic = "tp-sale";private void perform(){ SaleRequestFactory obj = new SaleRequestFactory(); obj.setId(100); ListenableFuture<SendResult<String, SaleRequestFactory>> send = saleRequestFactoryKafkaTemplate.send(topic, obj);}Consumer:@EnableKafka@Configurationpublic class KafkaConsumerConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; private String groupId = "test"; @Bean public ConsumerFactory<String, SaleResponseFactory> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactory.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; }}// Receive Object @KafkaListener(topics = "tp-sale")public SaleResponseFactory transactionElavonAuthorizeProcess(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception { System.out.println(tf.getId()); SaleResponseFactory resObj = new SaleResponseFactory(); resObj.setUnique_id("123123"); return resObj;}When I deploy the Producer I get error during deployment: Caused by: org.apache.kafka.common.KafkaException: class org.engine.plugin.transactions.factory.SaleResponseFactory is not an instance of org.apache.kafka.common.serialization.DeserializerCustom objectimport org.apache.kafka.common.serialization.Serializer;@Getter@Setter@NoArgsConstructor@AllArgsConstructor@Builder(toBuilder = true)public class SaleRequestFactory implements Serializable, Serializer { private static final long serialVersionUID = 1744050117179344127L; private int id; @Override public byte[] serialize(String s, Object o) { return new byte[0]; }} import org.apache.kafka.common.serialization.Deserializer; @Getter @Setter @NoArgsConstructor @AllArgsConstructor @Builder(toBuilder = true) public class SaleResponseFactory implements Serializable, Deserializer { private static final long serialVersionUID = 1744050117179344127L; private String unique_id; @Override public Object deserialize(String s, byte[] bytes) { return null; } }Do you know how I can fix this issue?EDIT: I tried this:Producer:@Configurationpublic class KafkaProducerConfig {@Value(value = "${kafka.bootstrapAddress}")private String bootstrapAddress;@Beanpublic ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class); return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() { return new KafkaTemplate<>(saleRequestFactoryProducerFactory());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory());}}Send object:@Autowiredprivate KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate;private static String topic = "tp-sale";private void perform(){ SaleRequestFactory obj = new SaleRequestFactory(); obj.setId(100); ListenableFuture<SendResult<String, SaleRequestFactory>> send = saleRequestFactoryKafkaTemplate.send(topic, obj);}Consumer:@EnableKafka@Configurationpublic class KafkaConsumerConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; private String groupId = "test"; @Bean public ConsumerFactory<String, SaleResponseFactory> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; }}// Receive Object @KafkaListener(topics = "tp-sale")public SaleResponseFactory transactionElavonAuthorizeProcess(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception { System.out.println(tf.getId()); SaleResponseFactory resObj = new SaleResponseFactory(); resObj.setUnique_id("123123"); return resObj;}Custom objects @Getter @Setter @NoArgsConstructor @AllArgsConstructor @Builder(toBuilder = true) public class SaleRequestFactory implements Serializable{ private static final long serialVersionUID = 1744050117179344127L; private int id; }public class SaleRequestFactorySerializer implements Serializable, Serializer<SaleRequestFactory> { @Override public byte[] serialize(String topic, SaleRequestFactory data) { // convert data to byte[] ByteArrayOutputStream out = new ByteArrayOutputStream(); try { ObjectOutputStream outputStream = new ObjectOutputStream(out); outputStream.writeObject(data); out.close(); } catch (IOException e) { e.printStackTrace(); } return out.toByteArray(); }} @Getter @Setter @NoArgsConstructor @AllArgsConstructor @Builder(toBuilder = true) public class SaleResponseFactory implements Serializable{ private static final long serialVersionUID = 1744050117179344127L; private String unique_id; }public class SaleResponseFactoryDeserializer implements Serializable, Deserializer<SaleResponseFactory> { @Override public SaleResponseFactory deserialize(String topic, byte[] data) { // convert data to SaleResponseFactory SaleResponseFactory saleResponseFactory = null; try { ByteArrayInputStream bis = new ByteArrayInputStream(data); ObjectInputStream in = new ObjectInputStream(bis); saleResponseFactory = (SaleResponseFactory) in.readObject(); in.close(); } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); } return saleResponseFactory; }}When I try to send message I get error:Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale-0 at offset 0. If needed, please seek past the record to continue consumption.Caused by: java.lang.ClassCastException: null21:27:51.152 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:296] - Commit list: {}21:27:51.153 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR KafkaMessageListenerContainer$ListenerConsumer[error:149] - Consumer exceptionjava.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145) at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1265) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1022) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.lang.Thread.run(Thread.java:835)Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale-0 at offset 0. If needed, please seek past the record to continue consumption.Caused by: java.lang.ClassCastException: nullDo you know how I can fix this issue?EDIT:I managed to implement these improvements:Producer:@Configurationpublic class KafkaProducerConfig {@Value(value = "${kafka.bootstrapAddress}")private String bootstrapAddress;@Beanpublic ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class); return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() { return new KafkaTemplate<>(saleRequestFactoryProducerFactory());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory());}}Send object:@Autowiredprivate KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate;private static String topic = "tp-sale";private void perform(){ SaleRequestFactory obj = new SaleRequestFactory(); obj.setId(100); ListenableFuture<SendResult<String, SaleRequestFactory>> send = saleRequestFactoryKafkaTemplate.send(topic, obj);}Consumer:@EnableKafka@Configurationpublic class KafkaConsumerConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; private String groupId = "test"; @Bean public ConsumerFactory<String, SaleResponseFactory> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; }}Receive Object @KafkaListener(topics = "tp-sale")public SaleResponseFactory transactionElavonAuthorizeProcess(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception { System.out.println(tf.getId()); SaleResponseFactory resObj = new SaleResponseFactory(); resObj.setUnique_id("123123"); return resObj;}Custom objects @Getter @Setter @NoArgsConstructor @AllArgsConstructor @Builder(toBuilder = true) public class SaleRequestFactory implements Serializable{ private static final long serialVersionUID = 1744050117179344127L; private int id; }public class SaleRequestFactorySerializer implements Serializable, Serializer<SaleRequestFactory> { @Override public byte[] serialize(String topic, SaleRequestFactory data) { // convert data to byte[] ByteArrayOutputStream out = new ByteArrayOutputStream(); try { ObjectOutputStream outputStream = new ObjectOutputStream(out); outputStream.writeObject(data); out.close(); } catch (IOException e) { e.printStackTrace(); } return out.toByteArray(); }} @Getter @Setter @NoArgsConstructor @AllArgsConstructor @Builder(toBuilder = true) public class SaleResponseFactory implements Serializable{ private static final long serialVersionUID = 1744050117179344127L; private String unique_id; }public class SaleResponseFactoryDeserializer implements Serializable, Deserializer<SaleResponseFactory> { @Override public SaleResponseFactory deserialize(String topic, byte[] data) { // convert data to SaleResponseFactory SaleResponseFactory saleResponseFactory = null; try { ByteArrayInputStream bis = new ByteArrayInputStream(data); ObjectInputStream in = new ObjectInputStream(bis); saleResponseFactory = (SaleResponseFactory) in.readObject(); in.close(); } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); } return saleResponseFactory; }}When I send some message I get error:13:03:53.675 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG RecordMessagingMessageListenerAdapter[debug:296] - Listener method returned result [org.factory.SaleResponseFactory@69c400ab] - generating response message for it13:03:53.675 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG RecordMessagingMessageListenerAdapter[debug:296] - No replyTopic to handle the reply: org.factory.SaleResponseFactory@69c400abDo you know how I can solve this issue? 解决方案 You are using different type to cast the object than what it was serialize with. Not sure why you need to do that. You can update your deserialize to something like below.public class SaleRequestFactoryDeserializer implements Serializable, Deserializer<SaleRequestFactory> { @Override public SaleRequestFactory deserialize(String topic, byte[] data) { ... saleRequestFactory = (SaleRequestFactory) in.readObject(); }}java.lang.ClassCastException: nullThis also means your serialization didn't work as expected. Make sure you have valid payload before you try to cast. 这篇关于KafkaException:类不是 org.apache.kafka.common.serialization.Deserializer 的实例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持! 上岸,阿里云!
07-29 11:24