我正在将Kafka用于微服务项目。每当我将记录保存到数据库时,我都想调用一个事件。我一直在看有关Spring Cloud Stream的教程。它们都使用@ EnableBinding,@ Input,@ Output批注。当我尝试使用它们时,它说它们已被弃用。我正在使用spring initialzr。发行说明说,我应该使用Supplier,Consumer和Function代替诸如Input,Output和Process之类的旧方法。

@Bean
public Supplier<String> toUpperCase() {
    return () -> {
        return "hello from supplier";
    };
}
当我使用这样的供应商时,它会每秒生成一条消息,这在教程中也已突出显示。我不希望它每秒发布一次。我希望在需要时发布它。它说我应该调用它的get()方法,但是我不知道怎么做。教程使用不推荐使用的功能来实现这种功能。如何在不使用不赞成使用的功能的情况下实现这种行为,或者如何在不说不赞成使用EnableBinder注释的情况下使用它?

最佳答案

您可以在https://github.com/HabeebCycle/spring-cloud-stream-implemention上查看我的回购中的演示项目
它显示了如何使用RabbitMQ和Kafka为供应商和消费者实现云流,以及如何对这两种服务进行端到端测试。
对于您的情况:
在您的供应商bean中执行以下操作:

@Bean
public Supplier<DataEvent<String, User>> savedMessage() {
    return () -> {
        return null;
    };
}
Spring在功能包中提供了StreamBridge,可用于发送事件。
假设您有一个保存到数据库中的服务层。首先要做的是创建一个自动构造的StreamBridge,该构造器由构造函数绑定(bind)注入(inject),并使用它按照以下方式发送消息。请注意,供应商名称应为输出的绑定(bind)名称,如文档中所述。
private final StreamBridge stream;
private final UserRepository repo;

// Store your topic/binding name as the supplier name as follows
private static final String SUPPLIER_BINDING_NAME = "savedMessage-out-0"

public UserService(UserRepository repo, StreamBridge stream) {
   this.repo = repo;
   this.stream = stream;
}

// Your save method
public void saveUser(User user) {
  // Do some checking...

  //save your record
  User user = repo.save(user);

  //check if user is saved or not null
  //create your message event (Assuming you have a DataEvent class)
  DataEvent<String, User> event = new DataEvent<>("User Saved", user);
  boolean sent = stream.send(SUPPLIER_BINDING_NAME, event));

  // Check the repo above for proper implementation.
}
对于消费者实现,请检查上面的我的仓库。
尽管使用Kotlin编写,但这里也有一个实现
https://piotrminkowski.com/2020/06/05/introduction-to-event-driven-microservices-with-spring-cloud-stream/
您还可以在GitHub上检查GitHub上的Spring最新项目https://github.com/spring-cloud/spring-cloud-stream-samples/

07-28 02:22
查看更多