本文介绍了我可以使用从 Spring5 的 WebClient 返回的 Flux 的 block() 方法吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了 Spring Boot 2.0 演示应用程序,其中包含两个使用 WebClient 进行通信的应用程序.当我从 WebClient 的响应中使用 Flux 的 block() 方法时,我很痛苦,他们经常停止通信.由于某些原因,我想使用 List 而不是 Flux.

I created Spring Boot 2.0 demo application which contains two applications that communicate using WebClient. And I'm suffering that they often stop communicating when I use block() method of Flux from the WebClient's response. I want to use List not Flux by some reasons.

服务端应用是这样的.它只返回 Flux 对象.

The server side application is like this. It just returns Flux object.

@GetMapping
public Flux<Item> findAll() {
    return Flux.fromIterable(items);
}

而客户端(或 BFF 端)应用程序是这样的.我从服务器获取 Flux 并通过调用 block() 方法将其转换为 List.

And the client side (or BFF side) application is like this. I get Flux from the server and convert it to List by calling block() method.

@GetMapping
public List<Item> findBlock() {
    return webClient.get()
        .retrieve()
        .bodyToFlux(Item.class)
        .collectList()
        .block(Duration.ofSeconds(10L));
}

虽然一开始运行良好,但多次访问后 findBlock() 不会响应并超时.当我修改 findBlock() 方法以返回 Flux 删除 collectList() 和 block() 时,它运行良好.然后我假设 block() 方法导致了这个问题.
而且,当我修改 findAll() 方法以返回 List 时,没有任何变化.

While it works well at first, findBlock() won't respond and timeouts after several times access. When I modify the findBlock() method to return Flux deleting collectList() and block(), it works well. Then I assume that block() method cause this problem.
And, when I modify the findAll() method to return List, nothing changes.

整个示例应用程序的源代码在这里.
https://github.com/cero-t/webclient-example

Source code of the entire example application is here.
https://github.com/cero-t/webclient-example

resource"是服务器应用程序,front"是客户端应用程序.运行这两个应用程序后,当我访问 localhost:8080 时,它运行良好,我可以随时重新加载,但是当我访问 localhost:8080/block 时,它似乎运行良好,但在多次重新加载后它不会响应.

"resource" is the server application, and "front" is the client application. After running both application, when I access to localhost:8080 it works well and I can reload any times, but when I access to localhost:8080/block it seems to work well but after several reloads it won't respond.

顺便说一下,当我将spring-boot-starter-web"依赖添加到前端"应用程序(不是资源应用程序)的 pom.xml 中时,这意味着我使用的是 tomcat,这个问题永远不会发生.这是 Netty 服务器的问题吗?

By the way, when I add "spring-boot-starter-web" dependency to the "front" applications's (not resource application's) pom.xml, which means I use tomcat, this problem never happens. Is this problem due to Netty server?

任何指导将不胜感激.

推荐答案

首先,让我指出,仅当 items 有时才建议使用 Flux.fromIterable(items)从内存中获取,不涉及 I/O.否则,您可能会使用阻塞 API 来获取它 - 这可能会破坏您的反应式应用程序.在这种情况下,这是一个内存列表,所以没问题.请注意,您也可以使用 Flux.just(item1, item2, item3).

First, let me point that using Flux.fromIterable(items) is advised only if items has been fetched from memory, no I/O involved. Otherwise chances are you'd be using a blocking API to get it - and this can break your reactive application. In this case, this is an in-memory list, so no problem. Note that you can also go Flux.just(item1, item2, item3).

使用以下方法最有效:

@GetMapping("/")
public Flux<Item> findFlux() {
  return webClient.get()
    .retrieve()
    .bodyToFlux(Item.class);
}

Item 实例将以一种非常有效的方式即时读取/写入、解码/编码.

Item instances will be read/written, decoded/encoded on the fly in a very efficient way.

另一方面,这不是首选方式:

On the other hand, this is not the preferred way:

@GetMapping("/block")
public List<Item> findBlock() {
  return webClient.get()
    .retrieve()
    .bodyToFlux(Item.class)
    .collectList()
    .block(Duration.ofSeconds(10L));
}

在这种情况下,您的前端应用程序使用 collectList 在内存中缓冲整个项目列表,但同时也阻塞了少数几个可用的服务器线程之一.这可能会导致性能非常差,因为您的服务器可能在等待该数据时被阻塞,并且无法同时为其他请求提供服务.

In this case, your front application is buffering in memory the whole items list with collectList but is also blocking one of the few server threads available. This might cause very poor performance because your server might be blocked waiting for that data and can't service other requests at the same time.

在这种特殊情况下,情况更糟,因为应用程序完全中断了.查看控制台,我们可以看到以下内容:

In this particular case it's worse, since the application totally breaks.Looking at the console, we can see the following:

WARN 3075 --- [ctor-http-nio-7] io.netty.util.concurrent.DefaultPromise  : An exception was thrown by reactor.ipc.netty.channel.PooledClientContextHandler$$Lambda$532/356589024.operationComplete()

reactor.core.Exceptions$BubblingException: java.lang.IllegalArgumentException: Channel [id: 0xab15f050, L:/127.0.0.1:59350 - R:localhost/127.0.0.1:8081] was not acquired from this ChannelPool
    at reactor.core.Exceptions.bubble(Exceptions.java:154) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]

这可能与 reactor-netty 客户端连接池问题有关应该在 0.7.4.RELEASE 中修复.我不知道具体情况,但我怀疑整个连接池都被破坏了,因为没有从客户端连接正确读取 HTTP 响应.

This is probably linked to a reactor-netty client connection pool issue that should be fixed in 0.7.4.RELEASE. I don't know the specifics of this, but I suspect the whole connection pool gets corrupted as HTTP responses aren't properly read from the client connections.

添加 spring-boot-starter-web 确实让你的应用程序使用了 Tomcat,但它主要是将你的 Spring WebFlux 应用程序变成了一个 Spring MVC 应用程序(它现在支持一些响应式返回类型,但有一个不同的运行时模型).如果您希望使用 Tomcat 测试您的应用程序,您可以将 spring-boot-starter-tomcat 添加到您的 POM,这将使用带有 Spring WebFlux 的 Tomcat.

Adding spring-boot-starter-web does make your application use Tomcat, but it mainly turns your Spring WebFlux application into a Spring MVC application (which now supports some reactive return types, but has a different runtime model). If you wish to test your application with Tomcat, you can add spring-boot-starter-tomcat to your POM and this will use Tomcat with Spring WebFlux.

这篇关于我可以使用从 Spring5 的 WebClient 返回的 Flux 的 block() 方法吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-22 08:11