这是代码:

import io.reactivex.Observable;
import io.reactivex.Observer;
import org.junit.jupiter.api.Test;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;

public class RxJavaTest {

    @Test
    public void onErr() {

        Observable<String> values1 = new Observable<String>() {
            @Override
            protected void subscribeActual(Observer<? super String> observer) {
                observer.onNext("New");
                observer.onNext("New1");
                observer.onNext("New2");
                observer.onNext("New3");
                observer.onNext("New4");
                if (ThreadLocalRandom
                            .current()
                            .nextInt(10) == 5) {
                    observer.onError(new Exception("don't retry..."));
                } else {
                    observer.onError(new IllegalArgumentException("retry..."));
                }
            }
        };
        AtomicBoolean finished = new AtomicBoolean(false);
        values1
                .retryWhen(throwableObservable -> throwableObservable
                        .takeWhile(throwable -> {
                            boolean result = (throwable instanceof IllegalArgumentException);
                            if (result) {
                                System.out.println("Retry on error: " + throwable);
                                return result;
                            }
                            System.out.println("Error: " + throwable);
                            return result;
                        })
                        .take(20))
                .onErrorReturn(throwable -> "Saved the day!")
                .doOnTerminate(() -> finished.set(true))
                .subscribe(v -> System.out.println(v));
    }
}


目标是


仅在存在IllegalArgumentException时重试,
对于其他任何异常,请立即返回(通过onErrorReturn)。


上面的代码完成了第一个目标,但在第二个目标上失败了,它停止了重试,但是忽略了.onErrorReturn部分。

任何想法如何使其工作?

最佳答案

您可以将retryWhen更改为:

                .retryWhen(throwableObservable ->
                                throwableObservable.flatMap(throwable -> {
                                    if (throwable instanceof IllegalArgumentException) {
                                        System.out.println("Retry on error: " + throwable);
                                        return Observable.just(1);
                                    } else {
                                        System.out.println("Error: " + throwable);
                                        return Observable.<Integer>error(throwable);
                                    }
                                })
                )


为了使其重试,在retryWhen中返回哪个值都无关紧要(在上面的示例中,它返回1)。根据javadoc:


  如果该ObservableSource调用onComplete或onError,则重试将在子订阅上调用onComplete或onError。否则,此ObservableSource将重新订阅源ObservableSource。

09-16 00:59