我正在使用RxJava和带有RxAndroid的Android应用程序。我正在使用mergeDelayError将两个逆向拟合网络调用合并为一个可观察到的对象,如果其中一个发出一个,则处理发出的项目,如果其中一个发出一个,则处理错误。这是行不通的,并且仅在遇到任何错误时才触发onError操作。现在进行测试,我转到一个非常简单的示例,当我进行onError调用时,仍然不会调用successAction。请参见下面的示例。

Observable.mergeDelayError(
                Observable.error(new RuntimeException()),
                Observable.just("Hello")
            )
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .finallyDo(completeAction)
            .subscribe(successAction, errorAction);

仅当我使用两个成功可观察对象时,才会调用成功 Action 。我是否应该使用mergeDelayError应该如何工作?

编辑:

我发现,如果删除了observeOnsubscribeOn,一切都会按预期进行。我需要指定线程,并认为这是使用Rx的全部重点。知道为什么指定这些Schedulers会破坏行为吗?

最佳答案

使用 .observeOn(AndroidSchedulers.mainThread(),true)而不是 .observeOn(AndroidSchedulers.mainThread()

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
        return observeOn(scheduler, delayError, RxRingBuffer.SIZE);
    }

上面是observeOn函数的签名。以下代码有效。
  Observable.mergeDelayError(
                Observable.error(new RuntimeException()),
                Observable.just("Hello")
        )
                .observeOn(AndroidSchedulers.mainThread(), true)
                .subscribeOn(Schedulers.io())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String s) {

                    }
                });

从ConcatDelayError线程获得了这个技巧:https://github.com/ReactiveX/RxJava/issues/3908#issuecomment-217999009

09-28 09:48