本文介绍了Spring Boot Rest api with Spring Kafka的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我设计了一个 Spring boot REST API ADD 和 GET 方法

 @RestController(ProductV1Controller")公共类产品控制器{私人最终 IProductProducer _productProducer;公共产品控制器(IProductProducer productProducer){_productProducer = productProducer;}@PostMapping()void AddProduct(@Valid @RequestBody ProductViewModel 产品) {_productProducer.AddProduct(产品);}@GetMapping()列表产品() {var test = _productProducer.GetProducts();返回 _productProducer.GetProducts();}}

服务层

@Service公共类 ProductProducer 实现 IProductProducer{私人最终 KafkaTemplate_模板;公共产品生产者(KafkaTemplate_template){this._template = _template;}@覆盖公共列表获取产品(){this._template.send(ProductTopicConstants.GET_PRODUCTS,null);return List.of(new ProductViewModel("","",0,""));-->需要从 kafka 返回值}@覆盖公共无效添加产品(ProductViewModel 产品){this._template.send(ProductTopicConstants.ADD_PRODUCT, product);}}

卡夫卡监听器

 @KafkaListener(id = ProductTopicConstants.GET_PRODUCTS,topics = ProductTopicConstants.GET_PRODUCTS)公开列表<产品>获取产品(){返回 _productRepository.findAll();}

在服务层 GetProducts() 我需要返回来自 _productRepository.findAll();

的项目列表

使用 Spring kafka 执行 REST API 的最佳方法是什么.

解决方案

您需要使用 ReplyingKafkaTemplate 将结果返回给 rest 控制器.

参见 ReplyingKafkaTemplate.

2.1.3 版引入了 KafkaTemplate 的子类来提供请求/回复语义.该类名为 ReplyingKafkaTemplate,并且有一个方法(除了超类中的方法).

结果是一个用结果异步填充的 ListenableFuture(或异常,超时).结果还有一个sendFuture属性,它是调用KafkaTemplate.send()的结果.您可以使用此 future 来确定发送操作的结果.

文档中有一个例子.

编辑

@SpringBootApplication@RestController公共类 So63058608Application {私有静态最终记录器日志 = LoggerFactory.getLogger(So63058608Application.class);公共静态无效主(字符串 [] args){SpringApplication.run(So63058608Application.class, args);}@自动连线private ReplyingKafkaTemplate>回复模板;@GetMapping(path = "/get")公共列表getThem() 抛出异常 {RequestReplyFuture>未来 =this.replyTemplate.sendAndReceive(new ProducerRecord<>(so63058608-1", 0, null, null));LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());返回 future.get(10, TimeUnit.SECONDS).value();}@KafkaListener(id = "so63058608-1", 主题 = "so63058608-1", splitIterables = false)@发给公共列表returnList(@Payload(required = false) 字符串有效载荷) {return new ArrayList<>(List.of(foo", bar", baz"));}@豆角,扁豆public ReplyingKafkaTemplate>回复者(生产者工厂 pf,ConcurrentKafkaListenerContainerFactory>容器工厂) {containerFactory.setReplyTemplate(kafkaTemplate(pf));ConcurrentMessageListenerContainer>容器 = 回复容器(容器工厂);ReplyingKafkaTemplate>回复者 = 新的 ReplyingKafkaTemplate<>(pf, 容器);回复回复者;}@豆角,扁豆public ConcurrentMessageListenerContainer>回复容器(ConcurrentKafkaListenerContainerFactory>容器工厂) {ConcurrentMessageListenerContainer>容器 =containerFactory.createContainer("so63058608-2");container.getContainerProperties().setGroupId("so63058608-2");container.setBatchErrorHandler(new BatchLoggingErrorHandler());返回容器;}@豆角,扁豆公共 KafkaTemplatekafkaTemplate(ProducerFactory pf) {返回新的 KafkaTemplate<>(pf);}@豆角,扁豆公共新话题 topic1() {return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();}@豆角,扁豆公共新话题 topic3() {return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();}}
spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializerspring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializerspring.kafka.consumer.auto-offset-reset=最早spring.kafka.consumer.properties.spring.json.trusted.packages=*spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializerspring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
$ curl localhost:8080/get[foo"、bar"、baz"]

EDIT2

并返回一些对象的列表...

@SpringBootApplication@RestController公共类 So63058608Application {私有静态最终记录器日志 = LoggerFactory.getLogger(So63058608Application.class);公共静态无效主(字符串 [] args){SpringApplication.run(So63058608Application.class, args);}@自动连线private ReplyingKafkaTemplate>回复模板;@GetMapping(path = "/get")公共列表getThem() 抛出异常 {RequestReplyFuture>未来 =this.replyTemplate.sendAndReceive(new ProducerRecord<>(so63058608-1", 0, null, null));LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());列表结果 = future.get(10, TimeUnit.SECONDS).value();LOG.info(result.toString());返回结果;}@KafkaListener(id = "so63058608-1", 主题 = "so63058608-1", splitIterables = false)@发给公共列表returnList(@Payload(required = false) 字符串有效载荷) {return new ArrayList<>(List.of(new Foo(foo"), new Foo(bar"), new Foo(baz")));}@豆角,扁豆public ReplyingKafkaTemplate>回复者(生产者工厂 pf,ConcurrentKafkaListenerContainerFactory>容器工厂) {containerFactory.setReplyTemplate(kafkaTemplate(pf));ConcurrentMessageListenerContainer>容器 = 回复容器(容器工厂);ReplyingKafkaTemplate>回复者 = 新的 ReplyingKafkaTemplate<>(pf, 容器);回复回复者;}@豆角,扁豆public ConcurrentMessageListenerContainer>回复容器(ConcurrentKafkaListenerContainerFactory>容器工厂) {ConcurrentMessageListenerContainer>容器 =containerFactory.createContainer("so63058608-2");container.getContainerProperties().setGroupId("so63058608-2");container.setBatchErrorHandler(new BatchLoggingErrorHandler());返回容器;}@豆角,扁豆公共 KafkaTemplatekafkaTemplate(ProducerFactory pf) {返回新的 KafkaTemplate<>(pf);}@豆角,扁豆公共新话题 topic1() {return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();}@豆角,扁豆公共新话题 topic3() {return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();}public static JavaType returnType(byte[] data, Headers headers) {返回 TypeFactory.defaultInstance().constructCollectionLikeType(List.class, Foo.class);}}类 Foo {私人字符串栏;公共 Foo() {}公共 Foo(字符串栏){this.bar = 酒吧;}公共字符串 getBar() {返回 this.bar;}public void setBar(String bar) {this.bar = 酒吧;}@覆盖公共字符串 toString() {返回Foo [bar=";+ this.bar + "]";}}
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.So63058608Application.returnType
[Foo [bar=foo], Foo [bar=bar], Foo [bar=baz]]

I have designed a Spring boot REST API ADD and GET method

    @RestController("ProductV1Controller")
    public class ProductController 
     {

         private final IProductProducer _productProducer;
         public ProductController(IProductProducer productProducer) {
        _productProducer = productProducer;}

         @PostMapping()
            void AddProduct(@Valid @RequestBody ProductViewModel product) {
                _productProducer.AddProduct(product);
            }
        
        @GetMapping()
            List<ProductViewModel> Products() {
                var test = _productProducer.GetProducts();
                return _productProducer.GetProducts();
            }
}

Service layer

@Service

    public class ProductProducer implements IProductProducer{
        private final KafkaTemplate<String, Object> _template;
    
        public ProductProducer(KafkaTemplate<String, Object> _template) {
            this._template = _template;
        }
    
        @Override
        public List<ProductViewModel> GetProducts() {
            this._template.send(ProductTopicConstants.GET_PRODUCTS,null);
            return List.of(new ProductViewModel("","",0,"")); --> Need to return the value from the kafka
        }
    
        @Override
        public void AddProduct(ProductViewModel product) {
            this._template.send(ProductTopicConstants.ADD_PRODUCT, product);
        }
       
    }

Kafka Listener

 @KafkaListener(id = ProductTopicConstants.GET_PRODUCTS, topics = ProductTopicConstants.GET_PRODUCTS)
    public List<Product> GetProducts() {
        return _productRepository.findAll();
    }

In the service layer GetProducts() I need to return the list of the project that is coming from the _productRepository.findAll();

What is the best approach to do the REST API with Spring kafka.

You need to use a ReplyingKafkaTemplate to return a result to the rest controller.

See ReplyingKafkaTemplate.

The documentation has an example.

EDIT

@SpringBootApplication
@RestController
public class So63058608Application {

    private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So63058608Application.class, args);
    }

    @Autowired
    private ReplyingKafkaTemplate<String, String, List<String>> replyTemplate;

    @GetMapping(path = "/get")
    public List<String> getThem() throws Exception {
        RequestReplyFuture<String, String, List<String>> future =
                this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
        LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
        return future.get(10, TimeUnit.SECONDS).value();
    }

    @KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)
    @SendTo
    public List<String> returnList(@Payload(required = false) String payload) {
        return new ArrayList<>(List.of("foo", "bar", "baz"));
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, List<String>> replyer(ProducerFactory<String, String> pf,
            ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {

        containerFactory.setReplyTemplate(kafkaTemplate(pf));
        ConcurrentMessageListenerContainer<String, List<String>> container = replyContainer(containerFactory);
        ReplyingKafkaTemplate<String, String, List<String>> replyer = new ReplyingKafkaTemplate<>(pf, container);
        return replyer;
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, List<String>> replyContainer(
            ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {

        ConcurrentMessageListenerContainer<String, List<String>> container =
                containerFactory.createContainer("so63058608-2");
        container.getContainerProperties().setGroupId("so63058608-2");
        container.setBatchErrorHandler(new BatchLoggingErrorHandler());
        return container;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf);
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic3() {
        return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
    }

}
spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.spring.json.trusted.packages=*

spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
$ curl localhost:8080/get
["foo","bar","baz"]

EDIT2

And with a list of some object returned...

@SpringBootApplication
@RestController
public class So63058608Application {

    private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So63058608Application.class, args);
    }

    @Autowired
    private ReplyingKafkaTemplate<String, String, List<Foo>> replyTemplate;

    @GetMapping(path = "/get")
    public List<Foo> getThem() throws Exception {
        RequestReplyFuture<String, String, List<Foo>> future =
                this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
        LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
        List<Foo> result = future.get(10, TimeUnit.SECONDS).value();
        LOG.info(result.toString());
        return result;
    }

    @KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)
    @SendTo
    public List<Foo> returnList(@Payload(required = false) String payload) {
        return new ArrayList<>(List.of(new Foo("foo"), new Foo("bar"), new Foo("baz")));
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, List<Foo>> replyer(ProducerFactory<String, String> pf,
            ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {

        containerFactory.setReplyTemplate(kafkaTemplate(pf));
        ConcurrentMessageListenerContainer<String, List<Foo>> container = replyContainer(containerFactory);
        ReplyingKafkaTemplate<String, String, List<Foo>> replyer = new ReplyingKafkaTemplate<>(pf, container);
        return replyer;
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, List<Foo>> replyContainer(
            ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {

        ConcurrentMessageListenerContainer<String, List<Foo>> container =
                containerFactory.createContainer("so63058608-2");
        container.getContainerProperties().setGroupId("so63058608-2");
        container.setBatchErrorHandler(new BatchLoggingErrorHandler());
        return container;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf);
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic3() {
        return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
    }

    public static JavaType returnType(byte[] data, Headers headers) {
        return TypeFactory.defaultInstance()
                .constructCollectionLikeType(List.class, Foo.class);
    }

}

class Foo {

    private String bar;

    public Foo() {
    }

    public Foo(String bar) {
        this.bar = bar;
    }

    public String getBar() {
        return this.bar;
    }

    public void setBar(String bar) {
        this.bar = bar;
    }

    @Override
    public String toString() {
        return "Foo [bar=" + this.bar + "]";
    }

}
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.So63058608Application.returnType
[Foo [bar=foo], Foo [bar=bar], Foo [bar=baz]]

这篇关于Spring Boot Rest api with Spring Kafka的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-28 05:50