问题描述
我正在使用基于注释的spring kafka侦听器来使用kafka消息,并且代码如下
I am using annotation based spring kafka listener to consume the kafka messages, and code is as below
- 消费员工对象
Class Employee{
private String name;
private String address;
private Object account;
//getters
//setters
}
- 帐户对象在运行时决定是保存帐户还是活期帐户等.
Class SavingAcc{
private BigDecimal balance;
}
Class CurrentAcc{
private BigDecimal balance;
private BigDecimal limit;
}
- 保存&具有BigDecimal字段来存储余额的经常账户.
- 因此,从Kafka生产者发送Employee对象时,所有字段均已正确映射并以BigDecimal的正确格式显示.
- 但是当在另一个服务中使用Employee对象时,帐户对象会随着LinkedHashMap和BigDecimal字段转换为Double出现.造成问题的原因.
- 据我了解,主要原因可能是a)将帐户声明为对象类型,而不是特定类型b)或者应该更特别地提供解串器. [我已经将Employee.class作为类型提供给kafka接收器反序列化器,因此Employee字段已正确映射,但帐户字段错误].
- Saving & Current Account having BigDecimal Fields to store balance.
- Hence while sending Employee object from Kafka producer, all the fields are correctly mapped and appears in correct format of BigDecimal, etc.
- But while consuming the Employee object in another service, account object is appearing as LinkedHashMap and BigDecimal fields are converted to Double. which is causing issues.
- As per my understanding, the main reason can be asa) Declaration of account as Object type instead of specific typeb) Or the deserializer should be provided more specifically. [I have already give Employee.class as type to kafka receiver deserializer, so Employee fields are correctly mapped but account fields wrong].
@Bean
public ConsumerFactory<String, Employee> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Employee.class));
}
需要有关如何映射或如何正确反序列化帐户字段的帮助.
Need help on how to map or how to get the account fields properly deserialize.
推荐答案
使用泛型和自定义 JavaType
方法.
Use Generics and a custom JavaType
method.
Class Employee<T> {
private String name;
private String address;
private T account;
//getters
//setters
}
JavaType withCurrent = TypeFactory.defaultInstance().constructParametricType(Employee.class, CurrentAcc.class);
JavaType withSaving = TypeFactory.defaultInstance().constructParametricType(Employee.class, SavingAcc.class);
public static JavaType determineType(String topic, byte[] data, Headers headers) {
// If it's a current account
return withCurrent;
// else
return withSaving;
}
如果您自己构造反序列化器,则使用
If you construct the deserializer yourself use
deser.setTypeResolver(MyClass::determineType);
使用属性进行配置时.
spring.json.value.type.method=com.mycompany.MyCass.determineType
您必须检查数据或标题(或主题)以确定所需的类型.
You have to inspect the data or headers (or topic) to determine which type you want.
编辑
这是一个完整的示例.在这种情况下,我在Account
对象中传递了类型提示,但另一种方法是在生产者端设置标头.
Here is a complete example. In this case, I pass a type hint in the Account
object, but an alternative would be to set a header on the producer side.
@SpringBootApplication
public class JacksonApplication {
public static void main(String[] args) {
SpringApplication.run(JacksonApplication.class, args);
}
@Data
public static class Employee<T extends Account> {
private String name;
private T account;
}
@Data
public static abstract class Account {
private final String type;
protected Account(String type) {
this.type = type;
}
}
@Data
public static class CurrentAccount extends Account {
private BigDecimal balance;
private BigDecimal limit;
public CurrentAccount() {
super("C");
}
}
@Data
public static class SavingAccount extends Account {
private BigDecimal balance;
public SavingAccount() {
super("S");
}
}
@KafkaListener(id = "empListener", topics = "employees")
public void listen(Employee<Account> e) {
System.out.println(e);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("employees").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Employee> template) {
return args -> {
Employee<CurrentAccount> emp1 = new Employee<>();
emp1.setName("someOneWithACurrentAccount");
CurrentAccount currentAccount = new CurrentAccount();
currentAccount.setBalance(BigDecimal.ONE);
currentAccount.setLimit(BigDecimal.TEN);
emp1.setAccount(currentAccount);
template.send("employees", emp1);
Employee<SavingAccount> emp2 = new Employee<>();
emp2.setName("someOneWithASavingAccount");
SavingAccount savingAccount = new SavingAccount();
savingAccount.setBalance(BigDecimal.ONE);
emp2.setAccount(savingAccount);
template.send("employees", emp2);
};
}
private static final JavaType withCurrent = TypeFactory.defaultInstance()
.constructParametricType(Employee.class, CurrentAccount.class);
private static final JavaType withSaving = TypeFactory.defaultInstance()
.constructParametricType(Employee.class, SavingAccount.class);
public static JavaType determineType(String topic, byte[] data, Headers headers) throws IOException {
if (JsonPath.read(new ByteArrayInputStream(data), "$.account.type").equals("C")) {
return withCurrent;
}
else {
return withSaving;
}
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.JacksonApplication.determineType
结果
JacksonApplication.Employee(name=someOneWithACurrentAccount, account=JacksonApplication.CurrentAccount(balance=1, limit=10))
JacksonApplication.Employee(name=someOneWithASavingAccount, account=JacksonApplication.SavingAccount(balance=1))
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>jackson</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
EDIT2
这是一个在标头中传达类型提示的示例...
And here is an example that conveys the type hint in a header instead...
@SpringBootApplication
public class JacksonApplication {
public static void main(String[] args) {
SpringApplication.run(JacksonApplication.class, args);
}
@Data
public static class Employee<T extends Account> {
private String name;
private T account;
}
@Data
public static abstract class Account {
}
@Data
public static class CurrentAccount extends Account {
private BigDecimal balance;
private BigDecimal limit;
}
@Data
public static class SavingAccount extends Account {
private BigDecimal balance;
}
@KafkaListener(id = "empListener", topics = "employees")
public void listen(Employee<Account> e) {
System.out.println(e);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("employees").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Employee> template) {
return args -> {
Employee<CurrentAccount> emp1 = new Employee<>();
emp1.setName("someOneWithACurrentAccount");
CurrentAccount currentAccount = new CurrentAccount();
currentAccount.setBalance(BigDecimal.ONE);
currentAccount.setLimit(BigDecimal.TEN);
emp1.setAccount(currentAccount);
template.send("employees", emp1);
Employee<SavingAccount> emp2 = new Employee<>();
emp2.setName("someOneWithASavingAccount");
SavingAccount savingAccount = new SavingAccount();
savingAccount.setBalance(BigDecimal.ONE);
emp2.setAccount(savingAccount);
template.send("employees", emp2);
};
}
private static final JavaType withCurrent = TypeFactory.defaultInstance()
.constructParametricType(Employee.class, CurrentAccount.class);
private static final JavaType withSaving = TypeFactory.defaultInstance()
.constructParametricType(Employee.class, SavingAccount.class);
public static JavaType determineType(String topic, byte[] data, Headers headers) throws IOException {
if (headers.lastHeader("accountType").value()[0] == 'C') {
return withCurrent;
}
else {
return withSaving;
}
}
public static class MySerializer extends JsonSerializer<Employee<?>> {
@Override
public byte[] serialize(String topic, Headers headers, Employee<?> emp) {
headers.add(new RecordHeader("accountType",
new byte[] { (byte) (emp.getAccount() instanceof CurrentAccount ? 'C' : 'S')}));
return super.serialize(topic, headers, emp);
}
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=com.example.demo2.JacksonApplication.MySerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo2.JacksonApplication.determineType
这篇关于Spring Kafka Consumer使用消息为LinkedHashMap,因此自动将BigDecimal转换为double的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!