问题描述
我一直在使用webflux启动器(spring-boot-starter-webflux
)使用spring-boot 2.0.0.RC1
.我创建了一个返回无限通量的简单控制器.我希望发布者仅在有客户端(订阅者)的情况下进行工作.假设我有一个像这样的控制器:
I've been working with spring-boot 2.0.0.RC1
using the webflux starter (spring-boot-starter-webflux
). I created a simple controller that returns a infinite flux. I would like that the Publisher only does its work if there is a client (Subscriber). Let's say I have a controller like this one:
@RestController
public class Demo {
@GetMapping(value = "/")
public Flux<String> getEvents(){
return Flux.create((FluxSink<String> sink) -> {
while(!sink.isCancelled()){
// TODO e.g. fetch data from somewhere
sink.next("DATA");
}
sink.complete();
}).doFinally(signal -> System.out.println("END"));
}
}
现在,当我尝试运行该代码并使用Chrome访问端点 http://localhost:8080/时,那么我可以看到数据.但是,一旦关闭浏览器,由于没有取消事件被触发,因此while循环将继续. 关闭浏览器后如何立即终止/取消流式传输?
Now, when I try to run that code and access the endpoint http://localhost:8080/ with Chrome, then I can see the data. However, once I close the browser the while-loop continues since no cancel event has been fired. How can I terminate/cancel the streaming as soon as I close the browser?
从此答案我引用:
我认为,由于HTTP协议不支持反压,因此这也意味着不会发出取消请求.
I assume that, since backpressure is not supported by the HTTP protocol, it means that no cancel request will be made either.
通过分析网络流量进一步调查发现,关闭浏览器后,浏览器立即发送TCP FIN
.有没有一种方法可以配置Netty(或其他方式),以使半关闭的连接将触发发布者上的cancel事件,从而使while循环停止?
Investigating a little bit further, by analyzing the network traffic, showed that the browser sends a TCP FIN
as soon as I close the browser. Is there a way to configure Netty (or something else) so that a half-closed connection will trigger a cancel event on the publisher, making the while-loop stop?
还是我必须编写与实现自己的订户的org.springframework.http.server.reactive.ServletHttpHandlerAdapter
类似的适配器?
Or do I have to write my own adapter similar to org.springframework.http.server.reactive.ServletHttpHandlerAdapter
where I implement my own Subscriber?
感谢您的帮助.
如果没有客户端,则在尝试将数据写入套接字时会出现一个IOException
.您可以在堆栈跟踪中看到.
An IOException
will be raised on the attempt to write data to the socket if there is no client. As you can see in the stack trace.
但这还不够好,因为下一个数据块准备好发送之前可能要花一些时间,因此检测到消失的客户端需要花费相同的时间.正如 Brian Clozel的答案中指出的那样,这是Reactor Netty中的一个已知问题.我尝试通过将依赖项添加到POM.xml
来使用Tomcat.像这样:
But that's not good enough, since it might take a while before the next chunk of data will be ready to send and therefore it takes the same amount of time to detect the gone client. As pointed out in Brian Clozel's answer it is a known issue in Reactor Netty. I tried to use Tomcat instead by adding the dependency to the POM.xml
. Like this:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
尽管它代替了Netty并改用Tomcat,但由于浏览器不显示任何数据,因此它似乎没有反应.但是,控制台中没有警告/信息/例外. 该版本(2.0.0.RC1
)中的spring-boot-starter-webflux
是否应该与Tomcat一起使用?
Although it replaces Netty and uses Tomcat instead, it does not seem reactive due to the fact that the browser does not show any data. However, there is no warning/info/exception in the console. Is spring-boot-starter-webflux
as of this version (2.0.0.RC1
) supposed to work together with Tomcat?
推荐答案
由于这是一个已知问题(请参见 Brian Clozel的回答),最后我使用一个Flux
来获取我的真实数据,并使用另一个Flux
来实现某种类型的ping/心跳机制.结果,我将两者与Flux.merge()
合并在一起.
Since this is a known issue (see Brian Clozel's answer), I ended up using one Flux
to fetch my real data and having another one in order to implement some sort of ping/heartbeat mechanism. As a result, I merge both together with Flux.merge()
.
在这里您可以看到我的解决方案的简化版本:
Here you can see a simplified version of my solution:
@RestController
public class Demo {
public interface Notification{}
public static class MyData implements Notification{
…
public boolean isEmpty(){…}
}
@GetMapping(value = "/", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<? extends Notification>> getNotificationStream() {
return Flux.merge(getEventMessageStream(), getHeartbeatStream());
}
private Flux<ServerSentEvent<Notification>> getHeartbeatStream() {
return Flux.interval(Duration.ofSeconds(2))
.map(i -> ServerSentEvent.<Notification>builder().event("ping").build())
.doFinally(signalType ->System.out.println("END"));
}
private Flux<ServerSentEvent<MyData>> getEventMessageStream() {
return Flux.interval(Duration.ofSeconds(30))
.map(i -> {
// TODO e.g. fetch data from somewhere,
// if there is no data return an empty object
return data;
})
.filter(data -> !data.isEmpty())
.map(data -> ServerSentEvent
.builder(data)
.event("message").build());
}
}
我将所有内容打包为ServerSentEvent<? extends Notification>
. Notification
只是一个标记界面.我使用ServerSentEvent
类中的event
字段来区分数据事件和ping事件.由于心跳Flux
始终以短间隔发送事件,因此检测到客户端消失的时间最多为该间隔的时间.请记住,我需要这样做是因为可能要花一些时间才能获得一些可以发送的真实数据,结果,它可能还需要一段时间才能检测到客户端已消失.这样,它将在客户端无法发送ping(或可能是消息事件)后立即检测到客户端已消失.
I wrap everything up as ServerSentEvent<? extends Notification>
. Notification
is just a marker interface. I use the event
field from the ServerSentEvent
class in order to separate between data and ping events. Since the heartbeat Flux
sends events constantly and in short intervals, the time it takes to detect that the client is gone is at most the length of that interval. Remember, I need that because it might take a while before I get some real data that can be sent and, as a result, it might also take a while before it detects that the client is gone. Like this, it will detect that the client is gone as soon as it can’t sent the ping (or possibly the message event).
标记界面上的最后一个注释,我称为通知".这并不是真正必要的,但是它提供了一些类型安全性.否则,我们可以写Flux<ServerSentEvent<?>>
而不是Flux<ServerSentEvent<? extends Notification>>
作为getNotificationStream()方法的返回类型.或者也可以使getHeartbeatStream()返回Flux<ServerSentEvent<MyData>>
.但是,这样可以允许发送任何对象,而这是我所不希望的.结果,我添加了界面.
One last note on the marker interface, which I called Notification. This is not really necessary, but it gives some type safety. Without that, we could write Flux<ServerSentEvent<?>>
instead of Flux<ServerSentEvent<? extends Notification>>
as return type for the getNotificationStream() method. Or also possible, make getHeartbeatStream() return Flux<ServerSentEvent<MyData>>
. However, like this it would allow that any object could be sent, which I don’t want. As a consequence, I added the interface.
这篇关于Spring Boot Webflux/Netty-检测关闭的连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!