我正在尝试使用Project Reactor及其Flux类运行一个基本示例。源应创建1到10之间的整数,并仅打印出发出的整数。
所有示例均在应用程序的main方法中执行,没有其他代码在运行。
使基础运行很容易:
Flux.range(1, 10).subscribe(System.out::println);
下一步是在另一个线程中发出整数。这可以通过
Flux.range(1, 10)
.publishOn(Schedulers.newSingle("OtherThread"))
.subscribe(System.out::println);
作为项目参考说明,
Schedulers.newSingle("OtherThread")
创建“每个呼叫专用线程”(请参见Project Reactor Reference)。该参考文献解释说,还有一个Schedulers.single()
可以访问“单个可重用线程”的执行上下文,并“为所有调用方重用同一线程”。由于在本示例
publishOn(...)
中我仅在一点上使用线程,所以我的理解是,这两种方法(newSingle(...)
和single()
)可以互换使用。 Flux.range(1, 10)
.publishOn(Schedulers.single())
.subscribe(System.out::println);
但是最后一个示例不打印任何内容。老实说,经过几个小时的搜索和玩耍,我仍然不明白为什么。
我发现此博客文章Flight of the Flux 3 - Hopping Threads and Schedulers指出
single()
为“可以在唯一的ExecutorService上运行的一次性任务”。但这并没有将光带入黑暗。通常,我希望对以下问题有一个简单的答案:为什么在这个基本示例中
newSingle(...)
和single()
的行为不同?这会让我感到愚蠢。但是,如果它能最终解决我的困惑,我会感到非常高兴。一个有趣的网站说明是,通过引入
log()
,示例打印像一个饰物 Flux.range(1, 10)
.log()
.publishOn(Schedulers.single())
.subscribe(System.out::println);
更新:根据Martin Tarjányi的答案,我创建了一个gist,它用一个小的代码段演示了不同的行为并解释了文本。
最佳答案
使用newSingle(String)
by default it creates a new non-daemon thread创建新的单个调度程序时,这意味着它将阻止应用程序退出,直到未关闭其线程池为止。
但是,如果使用内置的single()
,它将使用守护程序线程,即使尚未完成应用程序,该线程也不会阻止应用程序退出。这就是您在示例中所看到的:主线程通过组装反应性管道来完成其工作,并且VM退出,而不管守护程序单线程的状态如何。
要在两种情况下都具有相同的行为,可以用doOnNext()
和blockLast()
替换订阅:
Flux.range(1, 10)
.publishOn(Schedulers.single())
.doOnNext(System.out::println)
.blockLast();
通常在反应式编程中不建议使用块。但是,如果您的主线程无事可做,则可以在响应链上调用
block()
。