我正在尝试重复这样的关于Flux的订阅:

    DirectProcessor<String> stringDirectProcessor = DirectProcessor.create();

    stringDirectProcessor
            .repeat(3)
            .subscribe(item -> System.out.println(item));

    stringDirectProcessor.onNext("one");
    stringDirectProcessor.onNext("two");
    stringDirectProcessor.onNext("three");
    stringDirectProcessor.onComplete();


我的期望是看到以下输出:

one
two
three
one
two
three
one
two
three
one
two
three


但是我只会

one
two
three


但是,如果我使用Flux.just()而不是DirectProcessor,则会得到预期的输出。

怎么了?

最佳答案

这是DirectProcessor的预期行为。我只是通读了documentation,发现了有关DirectProcessor的以下内容:


  一旦处理器终止(通常是通过接收器的error(Throwable)或调用complete()方法),它可以让更多订阅者订阅,但立即向他们重播终止信号。


因此,由于repeat简单地重新订阅,因此onComplete处理程序将立即在其上调用。您确定需要DirectProcessor吗?

编辑:此行为也记录在here


  注意:如果没有订户,则会删除上游项目,并且仅保留终端事件。终止的DirectProcessor将向晚订户发出终端信号。

10-08 09:00