本文介绍了RequestReplyFuture<String, String, List<Product>>未映射,而是映射到 ArrayList<LinkedHashMap>的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 RequestReplyFuture将响应映射到列表,结果如下

I am using RequestReplyFuture<String, String, List> to mapped the response to List, the result is something like below

    @Service
    public class ProductProducer implements IProductProducer{
      private final ReplyingKafkaTemplate<String, String, List<Product>> _replyTemplate;
        private static final Logger LOG = LoggerFactory.getLogger(ProductProducer.class);
        public ProductProducer(ReplyingKafkaTemplate<String, String, List<Product>> replyTemplate) {
            this._replyTemplate = replyTemplate;
        }

     @Override
        public List<ProductViewModel> GetProducts() throws InterruptedException, ExecutionException, TimeoutException {
            RequestReplyFuture<String, String, List<Product>> future =
                    this._replyTemplate.sendAndReceive(new ProducerRecord<>(ProductTopicConstants.GET_PRODUCTS, 0, null, null));
                LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
                List<Product> products = future.get(10, TimeUnit.SECONDS).value(); --> Property not mapped to Product
var productViewModels = products.stream().map(item -> new ProductViewModel(item.getId(),item.getName(),item.getPrice(), item.getDescription())).collect(Collectors.toList());
                return productViewModels;
        }
    }

Kafka 配置

@Configuration
public class KafkaConfiguration {
    @Bean
    public ReplyingKafkaTemplate<String, String, List<Product>> replyer(ProducerFactory<String, String> pf,
                                                                        ConcurrentKafkaListenerContainerFactory<String, List<Product>> containerFactory) {

        containerFactory.setReplyTemplate(kafkaTemplate(pf));
        ConcurrentMessageListenerContainer<String, List<Product>> container = replyContainer(containerFactory);
        ReplyingKafkaTemplate<String, String, List<Product>> replyer = new ReplyingKafkaTemplate<>(pf, container);
        return replyer;
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, List<Product>> replyContainer(
            ConcurrentKafkaListenerContainerFactory<String, List<Product>> containerFactory) {

        ConcurrentMessageListenerContainer<String, List<Product>> container =
                containerFactory.createContainer(ProductTopicConstants.GET_PRODUCTS_CONTAINER);
        container.getContainerProperties().setGroupId(ProductTopicConstants.GET_PRODUCTS_CONTAINER);
        container.setBatchErrorHandler(new BatchLoggingErrorHandler());
        return container;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf);
    }



   @Bean
    public NewTopic GetProducts() {
        return TopicBuilder.name(ProductTopicConstants.GET_PRODUCTS).partitions(1).replicas(1).build();
    }
    @Bean
    public NewTopic GetProductsContainer() {
        return TopicBuilder.name(ProductTopicConstants.GET_PRODUCTS_CONTAINER).partitions(1).replicas(1).build();
    }
}

推荐答案

由于类型擦除,headers 中的类型信息让 Jackson 认为它是 List.

Due to type erasure, the type information in the headers makes Jackson think it's List<Object>.

您可以改用类型函数,为杰克逊提供更多信息...

You can use a type function instead, to give Jackson some more information...

spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.SomeClass.returnType
public static JavaType returnType(byte[] data, Headers headers) {
    return TypeFactory.defaultInstance()
            .constructCollectionLikeType(List.class, Product.class);
}

这篇关于RequestReplyFuture<String, String, List<Product>>未映射,而是映射到 ArrayList<LinkedHashMap>的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-01 15:38