我正在尝试使用Project Reactor及其Flux类运行一个基本示例。源应创建1到10之间的整数,并仅打印出发出的整数。

所有示例均在应用程序的main方法中执行,没有其他代码在运行。

使基础运行很容易:

Flux.range(1, 10).subscribe(System.out::println);


下一步是在另一个线程中发出整数。这可以通过

        Flux.range(1, 10)
                .publishOn(Schedulers.newSingle("OtherThread"))
                .subscribe(System.out::println);


作为项目参考说明,Schedulers.newSingle("OtherThread")创建“每个呼叫专用线程”(请参见Project Reactor Reference)。该参考文献解释说,还有一个Schedulers.single()可以访问“单个可重用线程”的执行上下文,并“为所有调用方重用同一线程”。

由于在本示例publishOn(...)中我仅在一点上使用线程,所以我的理解是,这两种方法(newSingle(...)single())可以互换使用。

        Flux.range(1, 10)
                .publishOn(Schedulers.single())
                .subscribe(System.out::println);


但是最后一个示例不打印任何内容。老实说,经过几个小时的搜索和玩耍,我仍然不明白为什么。

我发现此博客文章Flight of the Flux 3 - Hopping Threads and Schedulers指出single()为“可以在唯一的ExecutorService上运行的一次性任务”。但这并没有将光带入黑暗。

通常,我希望对以下问题有一个简单的答案:为什么在这个基本示例中newSingle(...)single()的行为不同?这会让我感到愚蠢。但是,如果它能最终解决我的困惑,我会感到非常高兴。

一个有趣的网站说明是,通过引入log(),示例打印像一个饰物

        Flux.range(1, 10)
                .log()
                .publishOn(Schedulers.single())
                .subscribe(System.out::println);




更新:根据Martin Tarjányi的答案,我创建了一个gist,它用一个小的代码段演示了不同的行为并解释了文本。

最佳答案

使用newSingle(String) by default it creates a new non-daemon thread创建新的单个调度程序时,这意味着它将阻止应用程序退出,直到未关闭其线程池为止。

但是,如果使用内置的single(),它将使用守护程序线程,即使尚未完成应用程序,该线程也不会阻止应用程序退出。这就是您在示例中所看到的:主线程通过组装反应性管道来完成其工作,并且VM退出,而不管守护程序单线程的状态如何。

要在两种情况下都具有相同的行为,可以用doOnNext()blockLast()替换订阅:

Flux.range(1, 10)
    .publishOn(Schedulers.single())
    .doOnNext(System.out::println)
    .blockLast();


通常在反应式编程中不建议使用块。但是,如果您的主线程无事可做,则可以在响应链上调用block()

07-28 01:52
查看更多