我有两个观察对象(为简单起见分别命名为A和B)和一个订阅者。因此,订户订阅了A,并且如果A上有错误,则踢B(即回退)。现在,每当A遇到错误B都会被罚款,但是A在订户上调用onComplete(),因此B响应即使B执行成功,也永远不会到达订户。
这是正常行为吗?我认为onErrorResumeNext()应该继续流并按照文档(https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators#onerrorresumenext)中所述,在完成后通知订阅者。
这是我正在做的整体结构(省略了几个“无聊的”代码):
public Observable<ModelA> observeGetAPI(){
return retrofitAPI.getObservableAPI1()
.flatMap(observableApi1Response -> {
ModelA model = new ModelA();
model.setApi1Response(observableApi1Response);
return retrofitAPI.getObservableAPI2()
.map(observableApi2Response -> {
// Blah blah blah...
return model;
})
.onErrorResumeNext(observeGetAPIFallback(model))
.subscribeOn(Schedulers.newThread())
})
.onErrorReturn(throwable -> {
// Blah blah blah...
return model;
})
.subscribeOn(Schedulers.newThread());
}
private Observable<ModelA> observeGetAPIFallback(ModelA model){
return retrofitAPI.getObservableAPI3().map(observableApi3Response -> {
// Blah blah blah...
return model;
}).onErrorReturn(throwable -> {
// Blah blah blah...
return model;
})
.subscribeOn(Schedulers.immediate());
}
Subscription subscription;
subscription = observeGetAPI.subscribe(ModelA -> {
// IF THERE'S AN ERROR WE NEVER GET B RESPONSE HERE...
}, throwable ->{
// WE NEVER GET HERE... onErrorResumeNext()
},
() -> { // IN CASE OF AN ERROR WE GET STRAIGHT HERE, MEANWHILE, B GETS EXECUTED }
);
有什么想法我做错了吗?
谢谢!
编辑:
这是发生情况的大致时间表:
---> HTTP GET REQUEST B
<--- HTTP 200 REQUEST B RESPONSE (SUCCESS)
---> HTTP GET REQUEST A
<--- HTTP 200 REQUEST A RESPONSE (FAILURE!)
---> HTTP GET FALLBACK A
** onComplete() called! ---> Subscriber never gets fallback response since onComplete() gets called before time.
<--- HTTP 200 FALLBACK A RESPONSE (SUCCESS)
这是我制作的简单图表的链接,它代表了我想要发生的事情:
Diagram
最佳答案
以下使用的Rx调用应模拟您对Retrofit所做的操作。
fallbackObservable =
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
logger.v("emitting A Fallback");
subscriber.onNext("A Fallback");
subscriber.onCompleted();
}
})
.delay(1, TimeUnit.SECONDS)
.onErrorReturn(new Func1<Throwable, String>() {
@Override
public String call(Throwable throwable) {
logger.v("emitting Fallback Error");
return "Fallback Error";
}
})
.subscribeOn(Schedulers.immediate());
stringObservable =
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
logger.v("emitting B");
subscriber.onNext("B");
subscriber.onCompleted();
}
})
.delay(1, TimeUnit.SECONDS)
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
logger.v("flatMapping B");
return Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
logger.v("emitting A");
subscriber.onNext("A");
subscriber.onCompleted();
}
})
.delay(1, TimeUnit.SECONDS)
.map(new Func1<String, String>() {
@Override
public String call(String s) {
logger.v("A completes but contains invalid data - throwing error");
throw new NotImplementedException("YUCK!");
}
})
.onErrorResumeNext(fallbackObservable)
.subscribeOn(Schedulers.newThread());
}
})
.onErrorReturn(new Func1<Throwable, String>() {
@Override
public String call(Throwable throwable) {
logger.v("emitting Return Error");
return "Return Error";
}
})
.subscribeOn(Schedulers.newThread());
subscription = stringObservable.subscribe(
new Action1<String>() {
@Override
public void call(String s) {
logger.v("onNext " + s);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
logger.v("onError");
}
},
new Action0() {
@Override
public void call() {
logger.v("onCompleted");
}
});
日志语句的输出为:
RxNewThreadScheduler-1发出B
RxComputationThreadPool-1 flatMapping B
RxNewThreadScheduler-2发出A
RxComputationThreadPool-2 A完成但包含无效数据-抛出错误
RxComputationThreadPool-2发出回退
RxComputationThreadPool-1 on下一个回退
RxComputationThreadPool-1 onCompleted
这似乎是您想要的东西,但也许我缺少了一些东西。