问题描述
在库更新之前的 Spring Boot/Kafka 应用程序中,我使用了以下类 org.telegram.telegrambots.api.objects.Update
以便将消息发布到 Kafka 主题.现在我使用以下 org.telegram.telegrambots.meta.api.objects.Update
.正如您所看到的 - 他们有不同的包.
In my Spring Boot/Kafka application before the library update, I used the following class org.telegram.telegrambots.api.objects.Update
in order to post messages to the Kafka topic. Right now I use the following org.telegram.telegrambots.meta.api.objects.Update
. As you may see - they have different packages.
应用程序重新启动后,我遇到了以下问题:
After application restart I ran into the following issue:
[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition telegram.fenix.bot.update-0 at offset 4223. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'org.telegram.telegrambots.api.objects.Update' is not in the trusted packages: [java.util, java.lang, org.telegram.telegrambots.meta.api.objects]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:221) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:967) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:93) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1144) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) ~[kafka-clients-1.1.0.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_171]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
这是我的配置:
@EnableAsync
@Configuration
public class ApplicationConfig {
@Bean
public StringJsonMessageConverter jsonConverter() {
return new StringJsonMessageConverter();
}
}
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15000000);
return props;
}
@Bean
public ProducerFactory<String, Update> updateProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Update> updateKafkaTemplate() {
return new KafkaTemplate<>(updateProducerFactory());
}
}
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.consumer.max.poll.interval.ms}")
private String kafkaConsumerMaxPollIntervalMs;
@Value("${kafka.consumer.max.poll.records}")
private String kafkaConsumerMaxPollRecords;
@Value("${kafka.topic.telegram.fenix.bot.update.consumer.concurrency}")
private Integer updateConsumerConcurrency;
@Bean
public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(consumerFactory(kafkaProperties));
return factory;
}
@Bean
public ConsumerFactory<String, Update> updateConsumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Update.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Update> updateKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);
ConcurrentKafkaListenerContainerFactory<String, Update> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(updateConsumerFactory(kafkaProperties));
factory.setConcurrency(updateConsumerConcurrency);
return factory;
}
}
application.properties
application.properties
spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=postfenix
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
如何解决这个问题,让Kafka将旧消息反序列化为新消息?
How to solve this issue and let Kafka deserialize old messages into the new ones ?
更新
这是我的听众
@Component
public class UpdateConsumer {
@KafkaListener(topics = "${kafka.topic.update}", containerFactory = "updateKafkaListenerContainerFactory")
public void onUpdateReceived(ConsumerRecord<String, Update> consumerRecord, Acknowledgment ack) {
//do some logic here
ack.acknowledge();
}
}
推荐答案
参见 文档.
从 2.1 版本开始,类型信息可以在记录头中传送,允许处理多种类型.此外,可以使用 Kafka 属性配置序列化器/反序列化器.
JsonSerializer.ADD_TYPE_INFO_HEADERS(默认为真);设置为 false 以在 JsonSerializer 上禁用此功能(设置 addTypeInfo 属性).
JsonSerializer.ADD_TYPE_INFO_HEADERS (default true); set to false to disable this feature on the JsonSerializer (sets the addTypeInfo property).
JsonDeserializer.KEY_DEFAULT_TYPE;如果不存在标头信息,则用于反序列化键的回退类型.
JsonDeserializer.KEY_DEFAULT_TYPE; fallback type for deserialization of keys if no header information is present.
JsonDeserializer.VALUE_DEFAULT_TYPE;如果不存在标头信息,则用于反序列化值的回退类型.
JsonDeserializer.VALUE_DEFAULT_TYPE; fallback type for deserialization of values if no header information is present.
JsonDeserializer.TRUSTED_PACKAGES (默认 java.util, java.lang);允许反序列化的包模式的逗号分隔列表;* 表示全部反序列化.
JsonDeserializer.TRUSTED_PACKAGES (default java.util, java.lang); comma-delimited list of package patterns allowed for deserialization; * means deserialize all.
默认情况下,序列化程序会将类型信息添加到标头中.
By default, the serializer will add type information to the headers.
见 启动文档.
同样,您可以禁用在标头中发送类型信息的 JsonSerializer 默认行为:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
或者您可以在入站消息转换器中添加类型映射,以将源类型映射到目标类型.
Or you can add type mapping to the inbound message converter to map the source type to the destination type.
编辑
说了这么多,你用的是什么版本?
Having said that, what version are you using?
这篇关于Spring Kafka 类不在可信包中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!