我正在将Kafka用于微服务项目。每当我将记录保存到数据库时,我都想调用一个事件。我一直在看有关Spring Cloud Stream的教程。它们都使用@ EnableBinding,@ Input,@ Output批注。当我尝试使用它们时,它说它们已被弃用。我正在使用spring initialzr。发行说明说,我应该使用Supplier,Consumer和Function代替诸如Input,Output和Process之类的旧方法。
@Bean
public Supplier<String> toUpperCase() {
return () -> {
return "hello from supplier";
};
}
当我使用这样的供应商时,它会每秒生成一条消息,这在教程中也已突出显示。我不希望它每秒发布一次。我希望在需要时发布它。它说我应该调用它的get()方法,但是我不知道怎么做。教程使用不推荐使用的功能来实现这种功能。如何在不使用不赞成使用的功能的情况下实现这种行为,或者如何在不说不赞成使用EnableBinder注释的情况下使用它? 最佳答案
您可以在https://github.com/HabeebCycle/spring-cloud-stream-implemention上查看我的回购中的演示项目
它显示了如何使用RabbitMQ和Kafka为供应商和消费者实现云流,以及如何对这两种服务进行端到端测试。
对于您的情况:
在您的供应商bean中执行以下操作:
@Bean
public Supplier<DataEvent<String, User>> savedMessage() {
return () -> {
return null;
};
}
Spring在功能包中提供了StreamBridge,可用于发送事件。假设您有一个保存到数据库中的服务层。首先要做的是创建一个自动构造的StreamBridge,该构造器由构造函数绑定(bind)注入(inject),并使用它按照以下方式发送消息。请注意,供应商名称应为输出的绑定(bind)名称,如文档中所述。
private final StreamBridge stream;
private final UserRepository repo;
// Store your topic/binding name as the supplier name as follows
private static final String SUPPLIER_BINDING_NAME = "savedMessage-out-0"
public UserService(UserRepository repo, StreamBridge stream) {
this.repo = repo;
this.stream = stream;
}
// Your save method
public void saveUser(User user) {
// Do some checking...
//save your record
User user = repo.save(user);
//check if user is saved or not null
//create your message event (Assuming you have a DataEvent class)
DataEvent<String, User> event = new DataEvent<>("User Saved", user);
boolean sent = stream.send(SUPPLIER_BINDING_NAME, event));
// Check the repo above for proper implementation.
}
对于消费者实现,请检查上面的我的仓库。尽管使用Kotlin编写,但这里也有一个实现
https://piotrminkowski.com/2020/06/05/introduction-to-event-driven-microservices-with-spring-cloud-stream/
您还可以在GitHub上检查GitHub上的Spring最新项目https://github.com/spring-cloud/spring-cloud-stream-samples/