我进行了这些单元测试,结果完全不是我期望的:

// This one outputs "subscribe.onError"
@Test
public void observable_doOnError_subscribingToError() throws InterruptedException {
    Observable<String> obs = getErrorProducingObservable();
    obs.doOnError(throwable -> System.out.println("doOnError"));
    obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(
        s -> {},
        error -> System.out.println("subscribe.onError")
    );
    Thread.sleep(300);
}

// This one outputs "subscribe.onError"
@Test
public void observable_onErrorReturn() throws InterruptedException {
    Observable<String> obs = getErrorProducingObservable();
    obs.onErrorReturn(throwable -> "Yeah I got this");
    obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(
        s -> System.out.println("got: " + s),
        error -> System.out.println("subscribe.onError")
    );
    Thread.sleep(300);
}

private Observable<String> getErrorProducingObservable()  {
    return Observable.create(subscriber -> {
        subscriber.onError(new RuntimeException("Somebody set up us the bomb"));
    });
}

因此,两个输出均为“subscribe.onError”-似乎既未调用doOnError也未调用onErrorReturn
doOnError记录为:



我不确定如何解释,但是我希望输出“doOnError”或“doOnError”后跟“subscribe.onError”。
onErrorReturn记录为:



因此,我期望“得到:是的,我有这个”作为后一项测试的输出。

是什么赋予了?

最佳答案

doOnErroronErrorReturn都返回具有更改行为的新Observable。我同意它们的文档可能有点误导。像这样修改您的测试以获得预期的行为:

// This one outputs "subscribe.onError"
@Test
public void observable_doOnError_subscribingToError() throws InterruptedException {
    Observable<String> obs =
        getErrorProducingObservable()
            .doOnError(throwable -> System.out.println("doOnError"));

    obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(
        s -> {},
        error -> System.out.println("subscribe.onError")
    );
    Thread.sleep(300);
}

// This one outputs "subscribe.onError"
@Test
public void observable_onErrorReturn() throws InterruptedException {
    Observable<String> obs =
        getErrorProducingObservable()
            .onErrorReturn(throwable -> "Yeah I got this");

    obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(
        s -> System.out.println("got: " + s),
        error -> System.out.println("subscribe.onError")
    );
    Thread.sleep(300);
}

private Observable<String> getErrorProducingObservable()  {
    return Observable.create(subscriber -> {
        subscriber.onError(new RuntimeException("Somebody set up us the bomb"));
    });
}

07-24 09:17