我是 Spring Web-Flux 的初学者。我写了一个 Controller ,如下所示:

@RestController
public class FirstController
{
    @GetMapping("/first")
    public Mono<String> getAllTweets()
    {
        return Mono.just("I am First Mono")
    }
}

我知道被动的好处之一是背压,它可以平衡请求或响应速度。我想了解如何在 Spring Web-Flux 中具有背压机制。

最佳答案

WebFlux中的背压

为了了解Backpressure在WebFlux框架的当前实现中是如何工作的,我们必须在此处重述默认使用的传输层。我们可能还记得,浏览器和服务器之间的正常通信(服务器之间的通信通常也相同)是通过TCP连接完成的。 WebFlux还使用该传输方式在客户端和服务器之间进行通信。
然后,为了获得背压控制术语的含义,我们必须从“ react 流”规范角度回顾一下背压的含义。



因此,从该陈述中,我们可以得出结论,在 react 流中,背压是一种通过传输(通知)接收者可以消耗多少元素来调节需求的机制。这是一个棘手的问题。 TCP具有字节抽象,而不是逻辑元素抽象。我们通常希望通过说背压控制来控制发送到网络或从网络接收的逻辑元素的数量。即使TCP有自己的流控制(请参见here和动画there的含义),该流控制仍然是针对字节而不是逻辑元素。

在WebFlux模块的当前实现中,背压由传输流控制来调节,但它并未暴露出接收者的实际需求。为了最终看到交互流程,请参见下图:

java - Spring Web-Flux中的背压机制-LMLPHP

为简单起见,上图显示了两个微服务之间的通信,其中左一个发送数据流,而右一个使用该流。以下编号列表提供了对该图的简要说明:

  • 这是WebFlux框架,它对将逻辑元素转换为字节并向回传输以及从TCP(网络)传输/接收逻辑元素进行了适当的照顾。
  • 这是元素长时间运行处理的开始,一旦作业完成,该处理请求下一个元素。
  • 在这里,虽然没有业务逻辑的需求,但是WebFlux排队来自网络的字节,而没有得到它们的确认(业务逻辑没有需求)。
  • 由于TCP流量控制的性质,服务A仍可能将数据发送到网络。

  • 从上图中我们可能会注意到,接收者暴露的需求不同于发送者的需求(此处以逻辑元素表示需求)。这意味着两者的需求是隔离的,仅适用于WebFlux 业务逻辑(服务)交互,而对服务A 服务B交互的背压较小。

    这意味着WebFlux中的背压控制并不像我们期望的那样公平。

    但是我仍然想知道如何控制背压

    如果我们仍然想对WebFlux中的反压进行不公平的控制,则可以在Project Reactor运算符(如 limitRate() )的支持下进行。以下示例说明了如何使用该运算符:
    @PostMapping("/tweets")
    public Mono<Void> postAllTweets(Flux<Tweet> tweetsFlux) {
    
        return tweetService.process(tweetsFlux.limitRate(10))
                           .then();
    }
    

    从示例中我们可以看到,limitRate()运算符允许定义一次要预取的元素数量。这意味着,即使最终订户请求Long.MAX_VALUE元素,limitRate运算符也将需求拆分为多个块,并且不允许一次消耗更多。我们可以对元素发送过程做同样的事情:
    @GetMapping("/tweets")
    public Flux<Tweet> getAllTweets() {
    
        return tweetService.retreiveAll()
                           .limitRate(10);
    }
    

    上面的示例显示,即使WebFlux一次请求了10个以上的元素,limitRate()也会将需求限制为预取大小,并防止一次消耗超过指定数量的元素。

    另一种选择是实现自己的Subscriber或从Project Reactor扩展BaseSubscriber。例如,以下是我们如何做到这一点的幼稚示例:
    class MyCustomBackpressureSubscriber<T> extends BaseSubscriber<T> {
    
        int consumed;
        final int limit = 5;
    
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            request(limit);
        }
    
        @Override
        protected void hookOnNext(T value) {
            // do business logic there
    
            consumed++;
    
            if (consumed == limit) {
                consumed = 0;
    
                request(limit);
            }
        }
    }
    

    使用RSocket协议(protocol)的合理背压

    为了通过网络边界实现逻辑元素背压,我们需要适当的协议(protocol)。幸运的是,有一个叫做RScoket protocol。 RSocket是一种应用程序级别的协议(protocol),允许通过网络边界传输实际需求。
    该协议(protocol)有一个RSocket-Java实现,可以设置RSocket服务器。对于服务器到服务器的通信,相同的RSocket-Java库也提供了客户端实现。要了解有关如何使用RSocket-Java的更多信息,请参见以下示例here
    对于浏览器-服务器通信,有一个RSocket-JS实现,该实现允许通过WebSocket在浏览器和服务器之间连接流通信。

    RSocket之上的已知框架

    如今,有一些框架是基于RSocket协议(protocol)构建的。

    变形菌

    其中一个框架是Proteus项目,该项目提供了基于RSocket之上的成熟的微服务。而且,Proteus与Spring框架很好地集成在一起,因此现在我们可以实现合理的背压控制(请参阅示例there)

    进一步阅读
  • https://www.netifi.com/proteus
  • https://medium.com/netifi
  • http://scalecube.io/
  • 关于java - Spring Web-Flux中的背压机制,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/52244808/

    10-10 21:00