RxComputationThreadPool

RxComputationThreadPool

我有两个观察对象(为简单起见分别命名为A和B)和一个订阅者。因此,订户订阅了A,并且如果A上有错误,则踢B(即回退)。现在,每当A遇到错误B都会被罚款,但是A在订户上调用onComplete(),因此B响应即使B执行成功,也永远不会到达订户。

这是正常行为吗?我认为onErrorResumeNext()应该继续流并按照文档(https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators#onerrorresumenext)中所述,在完成后通知订阅者。

这是我正在做的整体结构(省略了几个“无聊的”代码):

public Observable<ModelA> observeGetAPI(){
    return retrofitAPI.getObservableAPI1()
            .flatMap(observableApi1Response -> {
                ModelA model = new ModelA();

                model.setApi1Response(observableApi1Response);

                return retrofitAPI.getObservableAPI2()
                        .map(observableApi2Response -> {
                            // Blah blah blah...
                            return model;
                        })
                        .onErrorResumeNext(observeGetAPIFallback(model))
                        .subscribeOn(Schedulers.newThread())
            })
            .onErrorReturn(throwable -> {
                // Blah blah blah...
                return model;
            })
            .subscribeOn(Schedulers.newThread());
}

private Observable<ModelA> observeGetAPIFallback(ModelA model){
    return retrofitAPI.getObservableAPI3().map(observableApi3Response -> {
        // Blah blah blah...
        return model;
    }).onErrorReturn(throwable -> {
        // Blah blah blah...
        return model;
    })
    .subscribeOn(Schedulers.immediate());
}

Subscription subscription;
subscription = observeGetAPI.subscribe(ModelA -> {
    // IF THERE'S AN ERROR WE NEVER GET B RESPONSE HERE...
}, throwable ->{
    // WE NEVER GET HERE... onErrorResumeNext()
},
() -> { // IN CASE OF AN ERROR WE GET STRAIGHT HERE, MEANWHILE, B GETS EXECUTED }
);

有什么想法我做错了吗?

谢谢!

编辑:
这是发生情况的大致时间表:
---> HTTP GET REQUEST B
<--- HTTP 200 REQUEST B RESPONSE (SUCCESS)

---> HTTP GET REQUEST A
<--- HTTP 200 REQUEST A RESPONSE (FAILURE!)

---> HTTP GET FALLBACK A
** onComplete() called! ---> Subscriber never gets fallback response since onComplete() gets called before time.
<--- HTTP 200 FALLBACK A RESPONSE (SUCCESS)

这是我制作的简单图表的链接,它代表了我想要发生的事情:
Diagram

最佳答案

以下使用的Rx调用应模拟您对Retrofit所做的操作。

fallbackObservable =
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        logger.v("emitting A Fallback");
                        subscriber.onNext("A Fallback");
                        subscriber.onCompleted();
                    }
                })
                .delay(1, TimeUnit.SECONDS)
                .onErrorReturn(new Func1<Throwable, String>() {
                    @Override
                    public String call(Throwable throwable) {
                        logger.v("emitting Fallback Error");
                        return "Fallback Error";
                    }
                })
                .subscribeOn(Schedulers.immediate());

stringObservable =
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        logger.v("emitting B");
                        subscriber.onNext("B");
                        subscriber.onCompleted();
                    }
                })
                .delay(1, TimeUnit.SECONDS)
                .flatMap(new Func1<String, Observable<String>>() {
                    @Override
                    public Observable<String> call(String s) {
                        logger.v("flatMapping B");
                        return Observable
                                .create(new Observable.OnSubscribe<String>() {
                                    @Override
                                    public void call(Subscriber<? super String> subscriber) {
                                        logger.v("emitting A");
                                        subscriber.onNext("A");
                                        subscriber.onCompleted();
                                    }
                                })
                                .delay(1, TimeUnit.SECONDS)
                                .map(new Func1<String, String>() {
                                    @Override
                                    public String call(String s) {
                                        logger.v("A completes but contains invalid data - throwing error");
                                        throw new NotImplementedException("YUCK!");
                                    }
                                })
                                .onErrorResumeNext(fallbackObservable)
                                .subscribeOn(Schedulers.newThread());
                    }
                })
                .onErrorReturn(new Func1<Throwable, String>() {
                    @Override
                    public String call(Throwable throwable) {
                        logger.v("emitting Return Error");
                        return "Return Error";
                    }
                })
                .subscribeOn(Schedulers.newThread());

subscription = stringObservable.subscribe(
        new Action1<String>() {
            @Override
            public void call(String s) {
                logger.v("onNext " + s);
            }
        },
        new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                logger.v("onError");
            }
        },
        new Action0() {
            @Override
            public void call() {
                logger.v("onCompleted");
            }
        });

日志语句的输出为:

RxNewThreadScheduler-1发出B
RxComputationThreadPool-1 flatMapping B
RxNewThreadScheduler-2发出A
RxComputationThreadPool-2 A完成但包含无效数据-抛出错误
RxComputationThreadPool-2发出回退
RxComputationThreadPool-1 on下一个回退
RxComputationThreadPool-1 onCompleted

这似乎是您想要的东西,但也许我缺少了一些东西。

07-28 07:38