问题描述
Per 这里我曾经有代码
EmitterProcessor<String> emitter = EmitterProcessor.create();
FluxSink<String> sink = emitter.sink(FluxSink.OverflowStrategy.LATEST);
sink.onCancel(() -> {
cancelSink(id, request);
});
例如,当使用 rSocket
时,浏览器打开会话并请求一些数据,当客户端关闭浏览器时调用 EmitterProcessor
publisher
喜欢
and when for example with rSocket
a browser opened a session and asked for some data, calling the EmitterProcessor
when a client shut down their browser the publisher
like
Flux<String> out = Flux
.from(emitter
.log(log.getName()));
会知道 Flux 订阅者被取消(当浏览器关闭时)并且会调用 onCancel
句柄.
would know that the Flux subscriber was cancelled (when a browser was closed) and that would call the onCancel
handle.
使用 Sinks.Many()
我已经实现了
Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
sink.asFlux().doOnCancel(() -> {
cancelSink(id, request);
});
Flux<String> out = Flux
.from(sink.asFlux()
.log(log.getName()));
并且字符串通过通量发布到浏览器,但是当客户端关闭会话时,不再有 onCancel
来处理一些整理工作.
and the strings are published via a flux to the browser, but when the client closes the session there is no longer the onCancel
to handle some tidying up.
看起来这是这里讨论过和这里 但我不明白解决方案.请问是什么?
It looks like this was discussed here and also here but I don't understand the solutions. What is it please?
推荐答案
sink.asFlux().doOnCancel(...)
和 sink.asFlux()
是两种不同的情况.您没有重用已设置取消处理逻辑的逻辑,这就是为什么您没有观察 out
变量上的 cancelSink
清理.
sink.asFlux().doOnCancel(...)
and sink.asFlux()
are two different instances. You're not reusing the one where you have set up cancel handling logic, and that is why you don't observe the cancelSink
cleanup on your out
variable.
做一些类似的事情:
Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
Flux<String> fluxWithCancelSupport = sink.asFlux().doOnCancel(() -> {
cancelSink(id, request);
});
Flux<String> out = fluxWithCancelSupport
.log(log.getName()));
(PS:你不需要 Flux.from(sink.asFlux())
因为后者已经给了你一个 Flux
).
(PS: you don't need the Flux.from(sink.asFlux())
since the later already gives you a Flux
).
这篇关于在 Reactor Sinks.Many() 中等效于 EmitterProcessor onCancel的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!