本文介绍了如何提高WebFlux WebClient的吞吐量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想发送一些请求并高频率地接收响应。

我的代码如下:

  Flux.fromIterable(params)
            .delayElements(Duration.ofMillis(8))
            .subscribe(
              param -> {
                webClient
                    .get()
                    .uri(prefix.concat(Utils.urlEncoder(param)))
                    .header("K1", "V1"))
                    .exchange()
                    .subscribe(clientResponse -> {
                        log.info("");
                    });
            }
        );
代码运行良好,频率也很好。但当我尝试更改delayElements方法时的值(使用Duration.ofMillis(4)),我发现了一些异常。

    ERROR [reactor-http-nio-2] ContextHandler --- Error cannot be forwarded to user-facing Mono
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.io.IOException: Connection closed prematurely
Caused by: java.io.IOException: Connection closed prematurely

代码仍可以运行,但它生成了一些异常。

之后我将值减少到Duration.ofMillis(0)。代码几乎无法运行。我的控制台充满了异常。

Caused by: java.net.SocketException: Too many open files in system

我可以使代码的吞吐量好些了吗?

8ms已经足够了,但如果我能提高频率,那就太好了!谢谢!

推荐答案

在这种情况下,可能会发生以下几种情况:

  • 如果您泛洪远程主机,该主机可能会认为您正在尝试进行DoS攻击,并将关闭传入连接
  • 您的本地主机可能会用完文件描述符,因为您创建的连接太多

一般来说,您应该使用limitRate来限制并发调用的数量。delayElements只是引入了人为延迟,但没有考虑网络延迟或服务器变慢。

您可以尝试:

Flux.fromIterable(params)
    .limitRate(25)
    .flatMap(param -> webClient
                         .get()
                         .uri(prefix.concat(Utils.urlEncoder(param)))
                         .header("K1", "V1"))
                         .exchange()
    )
    .subscribe(clientResponse -> { log.info(""); }, error -> { log.error(e); });

这篇关于如何提高WebFlux WebClient的吞吐量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-16 17:04