也许我只是真的了解subscribeOnobserveOn的内部工作原理,但是最近我遇到了一些非常奇怪的事情。我的印象是,subscribeOn确定调度程序从何处开始处理(尤其是当我们有很多map更改数据流时),然后observeOn可以在以下任何地方使用:那些maps可以在适当的时候更改Scheduler(首先进行联网,然后进行计算,最后更改UI线程)。

但是,我注意到,当不直接将这些调用链接到我的Observable或Single时,它将无法工作。这是一个最小的工作示例JUnit测试:

import org.junit.Test;
import rx.Single;
import rx.schedulers.Schedulers;

public class SubscribeOnTest {

  @Test public void not_working_as_expected() throws Exception {
    Single<Integer> single = Single.<Integer>create(singleSubscriber -> {
      System.out.println("Doing some computation on thread " + Thread.currentThread().getName());
      int i = 1;
      singleSubscriber.onSuccess(i);
    });
    single.subscribeOn(Schedulers.computation()).observeOn(Schedulers.io());

    single.subscribe(integer -> {
      System.out.println("Observing on thread " + Thread.currentThread().getName());
    });
    System.out.println("Doing test on thread " + Thread.currentThread().getName());
    Thread.sleep(1000);
  }

  @Test public void working_as_expected() throws Exception {
    Single<Integer> single = Single.<Integer>create(singleSubscriber -> {
      System.out.println("Doing some computation on thread " + Thread.currentThread().getName());
      int i = 1;
      singleSubscriber.onSuccess(i);
    }).subscribeOn(Schedulers.computation()).observeOn(Schedulers.io());

    single.subscribe(integer -> {
      System.out.println("Observing on thread " + Thread.currentThread().getName());
    });
    System.out.println("Doing test on thread " + Thread.currentThread().getName());
    Thread.sleep(1000);
  }
}


测试not_working_as_expected()给我以下输出

Doing some computation on thread main
Observing on thread main
Doing test on thread main


working_as_expected()给我

Doing some computation on thread RxComputationScheduler-1
Doing test on thread main
Observing on thread RxIoScheduler-2


唯一的不同是,在第一个测试中,在创建单一对象之后,会出现分号,然后才应用调度程序,并且在工作示例中,方法调用直接链接到单一对象的创建上。但这不是无关紧要的吗?

最佳答案

运算符执行的所有“修改”都是不可变的,这意味着它们返回一个新流,该流以与上一个不同的方式接收通知。由于您只是调用了subscribeOnobserveOn运算符,并且没有存储它们的结果,因此以后进行的订阅将保持不变。

旁注:我不太了解您对subscribeOn行为的定义。如果您是说地图操作员会受到某种程度的影响,那是不对的。 subscribeOn定义一个调度程序,在其上调用OnSubscribe函数。在您的情况下,将函数传递给create()方法。另一方面,observeOn定义了调度程序,在该调度程序上,每个连续流(由应用操作员返回的流)正在处理来自上游的排放。

10-04 10:00