也许我只是真的了解subscribeOn
和observeOn
的内部工作原理,但是最近我遇到了一些非常奇怪的事情。我的印象是,subscribeOn
确定调度程序从何处开始处理(尤其是当我们有很多map
更改数据流时),然后observeOn
可以在以下任何地方使用:那些maps
可以在适当的时候更改Scheduler(首先进行联网,然后进行计算,最后更改UI线程)。
但是,我注意到,当不直接将这些调用链接到我的Observable或Single时,它将无法工作。这是一个最小的工作示例JUnit测试:
import org.junit.Test;
import rx.Single;
import rx.schedulers.Schedulers;
public class SubscribeOnTest {
@Test public void not_working_as_expected() throws Exception {
Single<Integer> single = Single.<Integer>create(singleSubscriber -> {
System.out.println("Doing some computation on thread " + Thread.currentThread().getName());
int i = 1;
singleSubscriber.onSuccess(i);
});
single.subscribeOn(Schedulers.computation()).observeOn(Schedulers.io());
single.subscribe(integer -> {
System.out.println("Observing on thread " + Thread.currentThread().getName());
});
System.out.println("Doing test on thread " + Thread.currentThread().getName());
Thread.sleep(1000);
}
@Test public void working_as_expected() throws Exception {
Single<Integer> single = Single.<Integer>create(singleSubscriber -> {
System.out.println("Doing some computation on thread " + Thread.currentThread().getName());
int i = 1;
singleSubscriber.onSuccess(i);
}).subscribeOn(Schedulers.computation()).observeOn(Schedulers.io());
single.subscribe(integer -> {
System.out.println("Observing on thread " + Thread.currentThread().getName());
});
System.out.println("Doing test on thread " + Thread.currentThread().getName());
Thread.sleep(1000);
}
}
测试
not_working_as_expected()
给我以下输出Doing some computation on thread main
Observing on thread main
Doing test on thread main
而
working_as_expected()
给我Doing some computation on thread RxComputationScheduler-1
Doing test on thread main
Observing on thread RxIoScheduler-2
唯一的不同是,在第一个测试中,在创建单一对象之后,会出现分号,然后才应用调度程序,并且在工作示例中,方法调用直接链接到单一对象的创建上。但这不是无关紧要的吗?
最佳答案
运算符执行的所有“修改”都是不可变的,这意味着它们返回一个新流,该流以与上一个不同的方式接收通知。由于您只是调用了subscribeOn
和observeOn
运算符,并且没有存储它们的结果,因此以后进行的订阅将保持不变。
旁注:我不太了解您对subscribeOn
行为的定义。如果您是说地图操作员会受到某种程度的影响,那是不对的。 subscribeOn
定义一个调度程序,在其上调用OnSubscribe函数。在您的情况下,将函数传递给create()
方法。另一方面,observeOn
定义了调度程序,在该调度程序上,每个连续流(由应用操作员返回的流)正在处理来自上游的排放。