每当我同时使用subscribeOn和publishOn时,都不会打印任何内容。
如果我只使用一个,它将打印。
如果我使用subscriptionOn(Schedulers.immediate())或弹性的,它的工作原理。
有什么想法吗?
我的理解是publishOn会影响它发布的线程,并在订阅者运行的线程上进行订阅。您能指出我正确的方向吗?
fun test() {
val testPublisher = EmitterProcessor.create<String>().connect()
testPublisher
.publishOn(Schedulers.elastic())
.map { it ->
println("map on ${Thread.currentThread().name}")
it
}
.subscribeOn(Schedulers.parallel())
.subscribe { println("subscribe on ${Thread.currentThread().name}") }
testPublisher.onNext("a")
testPublisher.onNext("b")
testPublisher.onNext("c")
Thread.sleep(5000)
println("---")
}
最佳答案
subscribeOn
会影响订阅的发生位置。也就是说,触发源发出元素的初始事件。另一方面,Subscriber
的onNext
Hook 受链中最接近的publishOn
的影响(很像map
)。
但是EmitterProcessor
与大多数Processor
一样,更为高级,可以完成一些窃取工作。我不确定为什么您的机箱上没有打印任何东西(您的样本转换为Java可以在我的机器上使用),但是我敢打赌它与该处理器有关。
此代码将更好地演示subscribeOn
与publishOn
:
Flux.just("a", "b", "c") //this is where subscription triggers data production
//this is influenced by subscribeOn
.doOnNext(v -> System.out.println("before publishOn: " + Thread.currentThread().getName()))
.publishOn(Schedulers.elastic())
//the rest is influenced by publishOn
.doOnNext(v -> System.out.println("after publishOn: " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.parallel())
.subscribe(v -> System.out.println("received " + v + " on " + Thread.currentThread().getName()));
Thread.sleep(5000);
打印输出:
before publishOn: parallel-1
before publishOn: parallel-1
before publishOn: parallel-1
after publishOn: elastic-2
received a on elastic-2
after publishOn: elastic-2
received b on elastic-2
after publishOn: elastic-2
received c on elastic-2