问题描述
我有两个服务应该通过 Kafka
进行通信.让我们调用第一个服务 WriteService 和第二个服务 QueryService.
I have two services that should communicate via Kafka
.Let's call the first service WriteService and the second service QueryService.
在WriteService方面,我为生产者提供了以下配置.
On the WriteService side, I have the following configuration for producers.
@Configuration
public class KafkaProducerConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
我正在尝试发送 com.example.project.web.routes.dto.RouteDto
在QueryService端,消费者配置定义如下.
On the QueryService side, the consumer configuration is defined as follows.
@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.groupid}")
private String serviceGroupId;
@Value("${spring.kafka.consumer.trusted-packages}")
private String trustedPackage;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, serviceGroupId);
props.put(JsonDeserializer.TRUSTED_PACKAGES, trustedPackage);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
侦听器具有以下定义.负载类具有完全限定名称 - com.example.project.clientqueryview.module.routes.messaging.kafka.RouteDto
The listener has the following definition. The payload class has the fully qualified name - com.example.project.clientqueryview.module.routes.messaging.kafka.RouteDto
@KafkaListener(topics = "${spring.kafka.topics.routes}",
containerFactory = "kafkaListenerContainerFactory")
public void listenForRoute(ConsumerRecord<String, RouteDto> cr,
@Payload RouteDto payload) {
logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
typeIdHeader(cr.headers()), payload, cr.toString());
}
private static String typeIdHeader(Headers headers) {
return StreamSupport.stream(headers.spliterator(), false)
.filter(header -> header.key().equals("__TypeId__"))
.findFirst().map(header -> new String(header.value())).orElse("N/A");
}
发送消息时,我收到以下错误
When a message is sent, I get the following error
原因:org.springframework.messaging.converter.MessageConversionException:无法解析类名.未找到课程[com.example.project.web.routes.dto.RouteDto];嵌套异常是java.lang.ClassNotFoundException:com.example.project.web.routes.dto.RouteDto
错误很明显.但是我不明白为什么默认情况下它有这种行为.我不希望在不同的服务中有相同的包,这完全没有意义.
The error is clear enough. However I cannot understand why does it has this behaviour by default. I don't expect to have the same package in different services, that makes no sense at all.
我还没有找到一种方法来禁用它并使用提供给侦听器的类,用 @Payload
I haven't found a way to disable this and use a class provided to the listener, annotated with @Payload
如何在不手动配置映射器的情况下解决这个问题?
How this could be solved, without manually configuring the mapper?
推荐答案
如果您使用的是 spring-kafka-2.2.x,您可以通过 JsonDeserializer
的重载构造函数禁用默认标头 docs
If you are using spring-kafka-2.2.x you can disable the default header by overloaded constructors of JsonDeserializer
docs
从版本 2.2 开始,您可以通过使用具有布尔值 useHeadersIfPresent(默认情况下为 true)的重载构造函数之一,显式配置反序列化器以使用提供的目标类型并忽略标头中的类型信息.以下示例显示了如何执行此操作:
DefaultKafkaConsumerFactory<String, Object> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
如果使用较低版本,请使用 MessageConverter
(您可能会在 spring-kafka-2.1.x 及以上版本中看到此问题)
If with lower version use the MessageConverter
(you might see this problem from spring-kafka-2.1.x and above)
Spring for Apache Kafka 通过 MessagingMessageConverter 实现及其 StringJsonMessageConverter 和 BytesJsonMessageConverter 自定义提供 MessageConverter 抽象.您可以直接将 MessageConverter 注入到 KafkaTemplate 实例中,也可以使用 @KafkaListener.containerFactory() 属性的 AbstractKafkaListenerContainerFactory bean 定义.以下示例显示了如何执行此操作:
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, RouteDto> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(RouteDto dto) {
...
}
注意:只有在方法层面声明了@KafkaListener注解,才能实现这种类型推断.对于类级别的@KafkaListener,负载类型用于选择要调用的@KafkaHandler 方法,因此在选择方法之前必须已经转换了它.
这篇关于Spring Kafka JsonDesirialization MessageConversionException 无法解析类名 Class not found的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!