这是代码:
import io.reactivex.Observable;
import io.reactivex.Observer;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
public class RxJavaTest {
@Test
public void onErr() {
Observable<String> values1 = new Observable<String>() {
@Override
protected void subscribeActual(Observer<? super String> observer) {
observer.onNext("New");
observer.onNext("New1");
observer.onNext("New2");
observer.onNext("New3");
observer.onNext("New4");
if (ThreadLocalRandom
.current()
.nextInt(10) == 5) {
observer.onError(new Exception("don't retry..."));
} else {
observer.onError(new IllegalArgumentException("retry..."));
}
}
};
AtomicBoolean finished = new AtomicBoolean(false);
values1
.retryWhen(throwableObservable -> throwableObservable
.takeWhile(throwable -> {
boolean result = (throwable instanceof IllegalArgumentException);
if (result) {
System.out.println("Retry on error: " + throwable);
return result;
}
System.out.println("Error: " + throwable);
return result;
})
.take(20))
.onErrorReturn(throwable -> "Saved the day!")
.doOnTerminate(() -> finished.set(true))
.subscribe(v -> System.out.println(v));
}
}
目标是
仅在存在
IllegalArgumentException
时重试,对于其他任何异常,请立即返回(通过
onErrorReturn
)。上面的代码完成了第一个目标,但在第二个目标上失败了,它停止了重试,但是忽略了
.onErrorReturn
部分。任何想法如何使其工作?
最佳答案
您可以将retryWhen
更改为:
.retryWhen(throwableObservable ->
throwableObservable.flatMap(throwable -> {
if (throwable instanceof IllegalArgumentException) {
System.out.println("Retry on error: " + throwable);
return Observable.just(1);
} else {
System.out.println("Error: " + throwable);
return Observable.<Integer>error(throwable);
}
})
)
为了使其重试,在
retryWhen
中返回哪个值都无关紧要(在上面的示例中,它返回1)。根据javadoc:如果该ObservableSource调用onComplete或onError,则重试将在子订阅上调用onComplete或onError。否则,此ObservableSource将重新订阅源ObservableSource。