问题描述
我正在阅读一篇博文:http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html
据说是哪里
无论你订阅的是什么调度器,只有一次发射是允许一次沿着 Observable 操作符链向上移动.在下面,您可以观察到必须一直推动发射在下一次发射开始之前从源到订阅者.
在引用文本的正上方是一个例子:
Right above that quoted text is an example written as:
public static void main(String[] args) {
Observable<Integer> source = Observable.range(1,10);
source.map(i -> i * 100)
.doOnNext(i -> System.out.println("Emitting " + i
+ " on thread " + Thread.currentThread().getName()))
.observeOn(Schedulers.computation())
.map(i -> i * 10)
.subscribe(i -> System.out.println("Received " + i + " on thread "
+ Thread.currentThread().getName()));
sleep(3000);
}
输出为:(ref output1)
Emitting 100 on thread main
Emitting 200 on thread main
Emitting 300 on thread main
Emitting 400 on thread main
Emitting 500 on thread main
Emitting 600 on thread main
Emitting 700 on thread main
Emitting 800 on thread main
Emitting 900 on thread main
Emitting 1000 on thread main
Received 1000 on thread RxComputationThreadPool-3
Received 2000 on thread RxComputationThreadPool-3
Received 3000 on thread RxComputationThreadPool-3
Received 4000 on thread RxComputationThreadPool-3
Received 5000 on thread RxComputationThreadPool-3
Received 6000 on thread RxComputationThreadPool-3
Received 7000 on thread RxComputationThreadPool-3
Received 8000 on thread RxComputationThreadPool-3
Received 9000 on thread RxComputationThreadPool-3
Received 10000 on thread RxComputationThreadPool-3
最初,我认为最终结果如下:(ref output2)
Initially, I thought the end result would look like: (ref output2)
Emitting 100 on thread main
Received 1000 on thread RxComputationThreadPool-3
Emitting 200 on thread main
Received 2000 on thread RxComputationThreadPool-3
Emitting 300 on thread main
Received 3000 on thread RxComputationThreadPool-3
Emitting 400 on thread main
Received 4000 on thread RxComputationThreadPool-3
Emitting 500 on thread main
Received 5000 on thread RxComputationThreadPool-3
Emitting 600 on thread main
Received 6000 on thread RxComputationThreadPool-3
Emitting 700 on thread main
Received 7000 on thread RxComputationThreadPool-3
Emitting 800 on thread main
Received 8000 on thread RxComputationThreadPool-3
Emitting 900 on thread main
Received 9000 on thread RxComputationThreadPool-3
Emitting 1000 on thread main
Received 10000 on thread RxComputationThreadPool-3
但我可以看到调用 observeOn 允许一个流切换到另一个流,以便它可以继续运行.所以在这个例子中,似乎初始地图和 doOnNext 在第二个地图和订阅之前都完成了.
but I can see that calling observeOn allows one stream to handoff to another so that it can keep going. So in the example, it seems like the intial map and doOnNext all complete before the second map and subscribe.
问题:
从理论上说,输出看起来像ref output2"是否正确,还是总是看起来与ref output1"相同?
Is it correct to say that theoretically, the output could look like "ref output2" or would it always look identical to "ref output1"?
同样,我对此的推理是,在上面的示例中,似乎一个操作员必须完全处理所有可观察量,然后才能将其移交给下一个操作员.
Again, my reasoning for this is that it seems that one operator has to process all of the observables completely before it gets handed off to the next operator with the example above.
推荐答案
理论上,ref output1 和 ref output2 都是可行的,但在实践中,会更接近参考输出1.根据 Schedulers.computation()
的热度,您可能会看到一些交错,但不太可能获得完美的乒乓模式.
In theory, both ref output1 and ref output2 are possible but in practice, it will be closer to ref output1. Depending on how hot Schedulers.computation()
becomes, you may see some interleaving but it is unlikely you get a perfect ping-pong pattern.
发生的情况是 observeOn
预取 128 个元素,接收到的第一个元素将触发内部队列中元素的异步重新发送.由于执行器中的线程通常不会足够快地启动/恢复,因此重新发射开始时主线程已经发射了它的所有值.
What happens is that observeOn
has a prefetch of 128 elements and the first element received will trigger an asynchronous re-emission of elements in an internal queue. Since Threads in executors often don't start/resume quickly enough, the time the re-emission starts the main thread has already emitted all of its values.
您可以通过将 observeOn
配置为通过过载使预取值为 1 来获得乒乓效应.但是,在这种情况下,原始发射可能会被拖到重新发射线程而不是停留在原始线程上,并且您还必须在 range
之后引入 subscribeOn
以确保第一个 doOnNext
停留在它自己的线程上.(另请注意,您无法将标准调度程序的执行固定到主线程,您需要一个 阻塞调度程序 为此.)
You can get the ping-pong effect by configuring observeOn
to have a prefetch value of 1 via an overload. However, in this case the original emission may get dragged to the re-emission thread instead of staying on the original thread and you also have to introduce subscribeOn
after range
to make sure the first doOnNext
stays on its own thread. (Also note that you can't pin execution with standard schedulers to the main thread, you need a blocking scheduler for that.)
这篇关于RxJava - “一次只允许一个发射沿着 Observable 链向上传播......";的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!