我有一个用例,我应该向用户发送电子邮件。
首先,我创建电子邮件正文。

Mono<String> emailBody = ...cache();

然后,我选择用户并将电子邮件发送给他们:
Flux.fromIterable(userRepository.findAllByRole(Role.USER))
            .map(User::getEmail)
            .doOnNext(email -> sendEmail(email, emailBody.block(), massSendingSubject))
            .subscribe();

我不喜欢的
  • 如果不使用cache()方法,则emailBody Mono将在每个迭代步骤中进行计算。
  • 为了获得emailBody的值,我使用了emailBody.block(),但是也许有一种 react 方式,而不是在Flux流内部调用block方法?
  • 最佳答案

    此代码示例中有几个问题。
    我假设这是一个响应式Web应用程序。

    首先,不清楚如何创建电子邮件正文。您是从数据库还是远程服务中获取东西?如果它主要是受CPU限制的(而不是I/O),则不需要将其包装为响应类型。现在,如果应该将其包装在Publisher中并且所有用户的电子邮件内容都相同,那么使用cache运算符并不是一个坏选择。

    另外,Flux.fromIterable(userRepository.findAllByRole(Role.USER))建议您从响应上下文调用阻塞存储库。

    您应该绝对不要doOn***运算符中执行繁重的I/O操作。这些是为测井或轻微副作用而设计的。您需要对其进行.block()的事实是您将阻塞整个响应式管道的另一个线索。

    最后一个:您不应该在Web应用程序中的任何地方调用subscribe。如果这绑定(bind)到HTTP请求,则基本上是在触发响应式(Reactive)管道,而不能保证资源或完成。调用subscribe会触发管道,但不会等到完成(此方法返回Disposable)。

    更为“典型”的示例如下所示:

    Flux<User> users = userRepository.findAllByRole(Role.USER);
    String emailBody = emailContentGenerator.createEmail();
    
    
    // sendEmail() should return Mono<Void> to signal when the send operation is done
    Mono<Void> sendEmailsOperation = users
         .flatMap(user -> sendEmail(user.getEmail(), emailBody, subject))
         .then();
    
    // something else should subscribe to that reactive type,
    // you could plug that as a return value of a Controller for example
    

    如果您因阻塞组件而陷入困境(例如sendEmail一个),则应在特定的调度程序上调度这些阻塞操作,以避免阻塞整个响应式管道。为此,请查看Schedulers section on the reactor reference documentation

    10-02 09:30