我正在尝试重复这样的关于Flux的订阅:
DirectProcessor<String> stringDirectProcessor = DirectProcessor.create();
stringDirectProcessor
.repeat(3)
.subscribe(item -> System.out.println(item));
stringDirectProcessor.onNext("one");
stringDirectProcessor.onNext("two");
stringDirectProcessor.onNext("three");
stringDirectProcessor.onComplete();
我的期望是看到以下输出:
one
two
three
one
two
three
one
two
three
one
two
three
但是我只会
one
two
three
但是,如果我使用
Flux.just()
而不是DirectProcessor
,则会得到预期的输出。怎么了?
最佳答案
这是DirectProcessor
的预期行为。我只是通读了documentation,发现了有关DirectProcessor
的以下内容:
一旦处理器终止(通常是通过接收器的error(Throwable)或调用complete()方法),它可以让更多订阅者订阅,但立即向他们重播终止信号。
因此,由于repeat
简单地重新订阅,因此onComplete
处理程序将立即在其上调用。您确定需要DirectProcessor
吗?
编辑:此行为也记录在here
注意:如果没有订户,则会删除上游项目,并且仅保留终端事件。终止的DirectProcessor将向晚订户发出终端信号。