问题描述
我想实现发送和接收 Java 序列化对象的 Kafka 生产者.我试过这个:
I want to implement Kafka producer which sends and receives Java Serialized Objects. I tried this:
制作人:
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactory.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() {
return new KafkaTemplate<>(saleRequestFactoryProducerFactory());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
发送对象:
@Autowired
private KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate;
private static String topic = "tp-sale";
private void perform(){
SaleRequestFactory obj = new SaleRequestFactory();
obj.setId(100);
ListenableFuture<SendResult<String, SaleRequestFactory>> send = saleRequestFactoryKafkaTemplate.send(topic, obj);
}
消费者:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
private String groupId = "test";
@Bean
public ConsumerFactory<String, SaleResponseFactory> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactory.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
//接收对象
@KafkaListener(topics = "tp-sale")
public SaleResponseFactory transactionElavonAuthorizeProcess(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception {
System.out.println(tf.getId());
SaleResponseFactory resObj = new SaleResponseFactory();
resObj.setUnique_id("123123");
return resObj;
}
自定义对象
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleRequestFactory implements Serializable{
private static final long serialVersionUID = 1744050117179344127L;
private int id;
}
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleResponseFactory implements Serializable{
private static final long serialVersionUID = 1744050117179344127L;
private String unique_id;
}
当我尝试发送消息时出现错误:
When I try to send message I get error:
org.apache.kafka.common.KafkaException:SaleRequestFactory 类不是 org.apache.kafka.common.serialization.Serializer 的实例
你知道我该如何解决这个问题吗?
Do you know how I can fix this issue?
推荐答案
选项 1
SaleRequestFactory
实现序列化器,SaleResponseFactory
实现反序列化器.
SaleRequestFactory
implements Serializer and SaleResponseFactory
implements Deserializer.
public class SaleRequestFactory implements Serializable, org.apache.kafka.common.serialization.Serializer<SaleRequestFactory> {
// ...
@Override
public byte[] serialize(String topic, SaleRequestFactory data) {
// convert data to byte[]
try(ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(out)) {
outputStream.writeObject(value);
}
return out.toByteArray();
}
}
// ...
public class SaleResponseFactory implements Serializable, org.apache.kafka.common.serialization.Deserializer<SaleRequestFactory> {
// ...
@Override
public SaleResponseFactory deserialize(String topic, byte[] data) {
// convert data to SaleResponseFactory
try(ByteArrayInputStream bis = new ByteArrayInputStream(data);
ObjectInputStream in = new ObjectInputStream(bis)) {
return (SaleResponseFactory) in.readObject();
}
}
}
选项 2
序列化器和反序列化器在两个不同的类中.
Serializer and Deserializer in two separate classes.
public class SaleRequestFactorySerializer implements org.apache.kafka.common.serialization.Serializer<SaleRequestFactory> {
// ...
@Override
public byte[] serialize(String topic, SaleRequestFactory data) {
// convert data to byte[]
}
}
// ...
public class SaleResponseFactoryDeserializer implements org.apache.kafka.common.serialization.Deserializer<SaleRequestFactory> {
// ...
@Override
public SaleResponseFactory deserialize(String topic, byte[] data) {
// convert data to SaleResponseFactory
}
}
然后改变
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactory.class);
到
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDeserializer.class);
和
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactory.class);
到
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class);
这篇关于org.apache.kafka.common.KafkaException:SaleRequestFactory 类不是 org.apache.kafka.common.serialization.Serializer 的实例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!