本文介绍了RequestReplyFuture<String, String, List<Product>>未映射,而是映射到 ArrayList<LinkedHashMap>的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在使用 RequestReplyFuture将响应映射到列表,结果如下
I am using RequestReplyFuture<String, String, List> to mapped the response to List, the result is something like below
@Service
public class ProductProducer implements IProductProducer{
private final ReplyingKafkaTemplate<String, String, List<Product>> _replyTemplate;
private static final Logger LOG = LoggerFactory.getLogger(ProductProducer.class);
public ProductProducer(ReplyingKafkaTemplate<String, String, List<Product>> replyTemplate) {
this._replyTemplate = replyTemplate;
}
@Override
public List<ProductViewModel> GetProducts() throws InterruptedException, ExecutionException, TimeoutException {
RequestReplyFuture<String, String, List<Product>> future =
this._replyTemplate.sendAndReceive(new ProducerRecord<>(ProductTopicConstants.GET_PRODUCTS, 0, null, null));
LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Product> products = future.get(10, TimeUnit.SECONDS).value(); --> Property not mapped to Product
var productViewModels = products.stream().map(item -> new ProductViewModel(item.getId(),item.getName(),item.getPrice(), item.getDescription())).collect(Collectors.toList());
return productViewModels;
}
}
Kafka 配置
@Configuration
public class KafkaConfiguration {
@Bean
public ReplyingKafkaTemplate<String, String, List<Product>> replyer(ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, List<Product>> containerFactory) {
containerFactory.setReplyTemplate(kafkaTemplate(pf));
ConcurrentMessageListenerContainer<String, List<Product>> container = replyContainer(containerFactory);
ReplyingKafkaTemplate<String, String, List<Product>> replyer = new ReplyingKafkaTemplate<>(pf, container);
return replyer;
}
@Bean
public ConcurrentMessageListenerContainer<String, List<Product>> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, List<Product>> containerFactory) {
ConcurrentMessageListenerContainer<String, List<Product>> container =
containerFactory.createContainer(ProductTopicConstants.GET_PRODUCTS_CONTAINER);
container.getContainerProperties().setGroupId(ProductTopicConstants.GET_PRODUCTS_CONTAINER);
container.setBatchErrorHandler(new BatchLoggingErrorHandler());
return container;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public NewTopic GetProducts() {
return TopicBuilder.name(ProductTopicConstants.GET_PRODUCTS).partitions(1).replicas(1).build();
}
@Bean
public NewTopic GetProductsContainer() {
return TopicBuilder.name(ProductTopicConstants.GET_PRODUCTS_CONTAINER).partitions(1).replicas(1).build();
}
}
推荐答案
由于类型擦除,headers 中的类型信息让 Jackson 认为它是 List
.
Due to type erasure, the type information in the headers makes Jackson think it's List<Object>
.
您可以改用类型函数,为杰克逊提供更多信息...
You can use a type function instead, to give Jackson some more information...
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.SomeClass.returnType
public static JavaType returnType(byte[] data, Headers headers) {
return TypeFactory.defaultInstance()
.constructCollectionLikeType(List.class, Product.class);
}
这篇关于RequestReplyFuture<String, String, List<Product>>未映射,而是映射到 ArrayList<LinkedHashMap>的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!