本文介绍了从 EmitterProcessor 移动到 Sinks.many()的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
一段时间以来一直使用 create
一个 EmitterProcessor
内置 sink
如下:
For some time have been using create
an EmitterProcessor
with built in sink
as follows:
EmitterProcessor<String> emitter = EmitterProcessor.create();
FluxSink<String> sink = emitter.sink(FluxSink.OverflowStrategy.LATEST);
sink
使用 Flux
.from
命令发布
Flux<String> out = Flux
.from(emitter
.log(log.getName()));
和 sink
可以传递,并用字符串填充,只需使用 next
指令即可.
and the sink
can be passed around, and populated with strings, simply using the next
instruction.
现在我们看到 EmitterProcessor
已被弃用.
Now we see that EmitterProcessor
is deprecated.
全部替换为 Sinks.many()
像这样
It's all replaced with Sinks.many()
like this
Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
但是如何使用它来发布from
?
but how to use that to publish from
?
推荐答案
答案是将 Sinks.many()
转换为 asFlux()
Flux<String> out = Flux
.from(sink.asFlux()
.log(log.getName()));
也用于取消和终止通量
sink.asFlux().doOnCancel(() -> {
cancelSink(id, request);
});
/* Handle errors, eviction, expiration */
sink.asFlux().doOnTerminate(() -> {
disposeSink(id);
});
UPDATE 根据 这个问题
这篇关于从 EmitterProcessor 移动到 Sinks.many()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!