问题描述
我设计了一个 Spring boot REST API ADD 和 GET 方法
@RestController(ProductV1Controller")公共类产品控制器{私人最终 IProductProducer _productProducer;公共产品控制器(IProductProducer productProducer){_productProducer = productProducer;}@PostMapping()void AddProduct(@Valid @RequestBody ProductViewModel 产品) {_productProducer.AddProduct(产品);}@GetMapping()列表产品() {var test = _productProducer.GetProducts();返回 _productProducer.GetProducts();}}
服务层
@Service公共类 ProductProducer 实现 IProductProducer{私人最终 KafkaTemplate_模板;公共产品生产者(KafkaTemplate_template){this._template = _template;}@覆盖公共列表获取产品(){this._template.send(ProductTopicConstants.GET_PRODUCTS,null);return List.of(new ProductViewModel("","",0,""));-->需要从 kafka 返回值}@覆盖公共无效添加产品(ProductViewModel 产品){this._template.send(ProductTopicConstants.ADD_PRODUCT, product);}}
卡夫卡监听器
@KafkaListener(id = ProductTopicConstants.GET_PRODUCTS,topics = ProductTopicConstants.GET_PRODUCTS)公开列表<产品>获取产品(){返回 _productRepository.findAll();}
在服务层 GetProducts()
我需要返回来自 _productRepository.findAll();
使用 Spring kafka 执行 REST API 的最佳方法是什么.
您需要使用 ReplyingKafkaTemplate
将结果返回给 rest 控制器.
2.1.3 版引入了 KafkaTemplate 的子类来提供请求/回复语义.该类名为 ReplyingKafkaTemplate,并且有一个方法(除了超类中的方法).
结果是一个用结果异步填充的 ListenableFuture(或异常,超时).结果还有一个sendFuture属性,它是调用KafkaTemplate.send()的结果.您可以使用此 future 来确定发送操作的结果.
文档中有一个例子.
编辑
@SpringBootApplication@RestController公共类 So63058608Application {私有静态最终记录器日志 = LoggerFactory.getLogger(So63058608Application.class);公共静态无效主(字符串 [] args){SpringApplication.run(So63058608Application.class, args);}@自动连线private ReplyingKafkaTemplate>回复模板;@GetMapping(path = "/get")公共列表getThem() 抛出异常 {RequestReplyFuture>未来 =this.replyTemplate.sendAndReceive(new ProducerRecord<>(so63058608-1", 0, null, null));LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());返回 future.get(10, TimeUnit.SECONDS).value();}@KafkaListener(id = "so63058608-1", 主题 = "so63058608-1", splitIterables = false)@发给公共列表returnList(@Payload(required = false) 字符串有效载荷) {return new ArrayList<>(List.of(foo", bar", baz"));}@豆角,扁豆public ReplyingKafkaTemplate>回复者(生产者工厂 pf,ConcurrentKafkaListenerContainerFactory>容器工厂) {containerFactory.setReplyTemplate(kafkaTemplate(pf));ConcurrentMessageListenerContainer>容器 = 回复容器(容器工厂);ReplyingKafkaTemplate>回复者 = 新的 ReplyingKafkaTemplate<>(pf, 容器);回复回复者;}@豆角,扁豆public ConcurrentMessageListenerContainer>回复容器(ConcurrentKafkaListenerContainerFactory>容器工厂) {ConcurrentMessageListenerContainer>容器 =containerFactory.createContainer("so63058608-2");container.getContainerProperties().setGroupId("so63058608-2");container.setBatchErrorHandler(new BatchLoggingErrorHandler());返回容器;}@豆角,扁豆公共 KafkaTemplatekafkaTemplate(ProducerFactory pf) {返回新的 KafkaTemplate<>(pf);}@豆角,扁豆公共新话题 topic1() {return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();}@豆角,扁豆公共新话题 topic3() {return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();}}
spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializerspring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializerspring.kafka.consumer.auto-offset-reset=最早spring.kafka.consumer.properties.spring.json.trusted.packages=*spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializerspring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
$ curl localhost:8080/get[foo"、bar"、baz"]
EDIT2
并返回一些对象的列表...
@SpringBootApplication@RestController公共类 So63058608Application {私有静态最终记录器日志 = LoggerFactory.getLogger(So63058608Application.class);公共静态无效主(字符串 [] args){SpringApplication.run(So63058608Application.class, args);}@自动连线private ReplyingKafkaTemplate>回复模板;@GetMapping(path = "/get")公共列表getThem() 抛出异常 {RequestReplyFuture>未来 =this.replyTemplate.sendAndReceive(new ProducerRecord<>(so63058608-1", 0, null, null));LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());列表结果 = future.get(10, TimeUnit.SECONDS).value();LOG.info(result.toString());返回结果;}@KafkaListener(id = "so63058608-1", 主题 = "so63058608-1", splitIterables = false)@发给公共列表returnList(@Payload(required = false) 字符串有效载荷) {return new ArrayList<>(List.of(new Foo(foo"), new Foo(bar"), new Foo(baz")));}@豆角,扁豆public ReplyingKafkaTemplate>回复者(生产者工厂 pf,ConcurrentKafkaListenerContainerFactory>容器工厂) {containerFactory.setReplyTemplate(kafkaTemplate(pf));ConcurrentMessageListenerContainer>容器 = 回复容器(容器工厂);ReplyingKafkaTemplate>回复者 = 新的 ReplyingKafkaTemplate<>(pf, 容器);回复回复者;}@豆角,扁豆public ConcurrentMessageListenerContainer>回复容器(ConcurrentKafkaListenerContainerFactory>容器工厂) {ConcurrentMessageListenerContainer>容器 =containerFactory.createContainer("so63058608-2");container.getContainerProperties().setGroupId("so63058608-2");container.setBatchErrorHandler(new BatchLoggingErrorHandler());返回容器;}@豆角,扁豆公共 KafkaTemplatekafkaTemplate(ProducerFactory pf) {返回新的 KafkaTemplate<>(pf);}@豆角,扁豆公共新话题 topic1() {return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();}@豆角,扁豆公共新话题 topic3() {return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();}public static JavaType returnType(byte[] data, Headers headers) {返回 TypeFactory.defaultInstance().constructCollectionLikeType(List.class, Foo.class);}}类 Foo {私人字符串栏;公共 Foo() {}公共 Foo(字符串栏){this.bar = 酒吧;}公共字符串 getBar() {返回 this.bar;}public void setBar(String bar) {this.bar = 酒吧;}@覆盖公共字符串 toString() {返回Foo [bar=";+ this.bar + "]";}}
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.So63058608Application.returnType
[Foo [bar=foo], Foo [bar=bar], Foo [bar=baz]]
I have designed a Spring boot REST API ADD and GET method
@RestController("ProductV1Controller")
public class ProductController
{
private final IProductProducer _productProducer;
public ProductController(IProductProducer productProducer) {
_productProducer = productProducer;}
@PostMapping()
void AddProduct(@Valid @RequestBody ProductViewModel product) {
_productProducer.AddProduct(product);
}
@GetMapping()
List<ProductViewModel> Products() {
var test = _productProducer.GetProducts();
return _productProducer.GetProducts();
}
}
Service layer
@Service
public class ProductProducer implements IProductProducer{
private final KafkaTemplate<String, Object> _template;
public ProductProducer(KafkaTemplate<String, Object> _template) {
this._template = _template;
}
@Override
public List<ProductViewModel> GetProducts() {
this._template.send(ProductTopicConstants.GET_PRODUCTS,null);
return List.of(new ProductViewModel("","",0,"")); --> Need to return the value from the kafka
}
@Override
public void AddProduct(ProductViewModel product) {
this._template.send(ProductTopicConstants.ADD_PRODUCT, product);
}
}
Kafka Listener
@KafkaListener(id = ProductTopicConstants.GET_PRODUCTS, topics = ProductTopicConstants.GET_PRODUCTS)
public List<Product> GetProducts() {
return _productRepository.findAll();
}
In the service layer GetProducts()
I need to return the list of the project that is coming from the _productRepository.findAll();
What is the best approach to do the REST API with Spring kafka.
You need to use a ReplyingKafkaTemplate
to return a result to the rest controller.
The documentation has an example.
EDIT
@SpringBootApplication
@RestController
public class So63058608Application {
private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);
public static void main(String[] args) {
SpringApplication.run(So63058608Application.class, args);
}
@Autowired
private ReplyingKafkaTemplate<String, String, List<String>> replyTemplate;
@GetMapping(path = "/get")
public List<String> getThem() throws Exception {
RequestReplyFuture<String, String, List<String>> future =
this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
return future.get(10, TimeUnit.SECONDS).value();
}
@KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)
@SendTo
public List<String> returnList(@Payload(required = false) String payload) {
return new ArrayList<>(List.of("foo", "bar", "baz"));
}
@Bean
public ReplyingKafkaTemplate<String, String, List<String>> replyer(ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {
containerFactory.setReplyTemplate(kafkaTemplate(pf));
ConcurrentMessageListenerContainer<String, List<String>> container = replyContainer(containerFactory);
ReplyingKafkaTemplate<String, String, List<String>> replyer = new ReplyingKafkaTemplate<>(pf, container);
return replyer;
}
@Bean
public ConcurrentMessageListenerContainer<String, List<String>> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {
ConcurrentMessageListenerContainer<String, List<String>> container =
containerFactory.createContainer("so63058608-2");
container.getContainerProperties().setGroupId("so63058608-2");
container.setBatchErrorHandler(new BatchLoggingErrorHandler());
return container;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
}
}
spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
$ curl localhost:8080/get
["foo","bar","baz"]
EDIT2
And with a list of some object returned...
@SpringBootApplication
@RestController
public class So63058608Application {
private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);
public static void main(String[] args) {
SpringApplication.run(So63058608Application.class, args);
}
@Autowired
private ReplyingKafkaTemplate<String, String, List<Foo>> replyTemplate;
@GetMapping(path = "/get")
public List<Foo> getThem() throws Exception {
RequestReplyFuture<String, String, List<Foo>> future =
this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Foo> result = future.get(10, TimeUnit.SECONDS).value();
LOG.info(result.toString());
return result;
}
@KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)
@SendTo
public List<Foo> returnList(@Payload(required = false) String payload) {
return new ArrayList<>(List.of(new Foo("foo"), new Foo("bar"), new Foo("baz")));
}
@Bean
public ReplyingKafkaTemplate<String, String, List<Foo>> replyer(ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {
containerFactory.setReplyTemplate(kafkaTemplate(pf));
ConcurrentMessageListenerContainer<String, List<Foo>> container = replyContainer(containerFactory);
ReplyingKafkaTemplate<String, String, List<Foo>> replyer = new ReplyingKafkaTemplate<>(pf, container);
return replyer;
}
@Bean
public ConcurrentMessageListenerContainer<String, List<Foo>> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {
ConcurrentMessageListenerContainer<String, List<Foo>> container =
containerFactory.createContainer("so63058608-2");
container.getContainerProperties().setGroupId("so63058608-2");
container.setBatchErrorHandler(new BatchLoggingErrorHandler());
return container;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
}
public static JavaType returnType(byte[] data, Headers headers) {
return TypeFactory.defaultInstance()
.constructCollectionLikeType(List.class, Foo.class);
}
}
class Foo {
private String bar;
public Foo() {
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.So63058608Application.returnType
[Foo [bar=foo], Foo [bar=bar], Foo [bar=baz]]
这篇关于Spring Boot Rest api with Spring Kafka的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!