我将Retrofit与RxJava Observables和lambda表达式一起使用。我是RxJava的新手,无法找到以下操作的方法:

Observable<ResponseBody> res = api.getXyz();
res.subscribe(response -> {
    // I don't need the response here
}, error -> {
    // I might be able to handle an error here. If so, it shall not go to the second error handler.
});
res.subscribe(response -> {
    // This is where I want to process the response
}, error -> {
    // This error handler shall only be invoked if the first error handler was not able to handle the error.
});

我看着error handling operators,但是我不明白它们如何为我的用例提供帮助。

最佳答案

方法1:保留两个Subscriber,但cache保留Observable

保持一切不变,但是将第一行更改为:

Observable<ResponseBody> res = api.getXyz().cache();
cache将确保该请求仅发送一次,但是这两个Subscriber都将获得所有相同的事件。

这样,是否以及如何处理第一个Subscriber中的错误不会影响第二个Subscriber看到的内容。

方法2:使用onErrorResumeNext捕获一些错误,但转发所有其他错误。

onErrorResumeNext添加到您的Observable中,以生成类似以下内容(在“内部”对象中):
Observable observable = Observable.error(new IllegalStateException())
.onErrorResumeNext(new Func1<Throwable, Observable<?>>() {
    @Override
    public Observable<?> call(Throwable throwable) {
        if (throwable instanceof NumberFormatException) {
            System.out.println("NFE - handled");
            return Observable.empty();
        } else {
            System.out.println("Some other exception - panic!");
            return Observable.error(throwable);
        }
    }
});

并且只订阅一次(在“外部”对象中):
observable.subscribe(new Subscriber() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("onError");
        e.printStackTrace();
    }

    @Override
    public void onNext(Object o) {
        System.out.println(String.format("onNext: %s", String.valueOf(o)));
    }

});

这样,仅当无法在onErrorResumeNext中处理该错误时才转发该错误-如果可以,则Subscriber将仅获得对onCompleted的调用,而没有其他任何调用。

但是,在onErrorResumeNext中有副作用使我有些不舒服。 :-)

编辑:哦,如果您想更加严格,则可以使用方法3:将每种情况包装在一个新对象中。
public abstract class ResultOrError<T> {
}

public final class Result<T> extends ResultOrError<T> {
    public final T result;

    public Result(T result) {
        this.result = result;
    }
}

public final class HandledError<T> extends ResultOrError<T> {
    public final Throwable throwable;

    public Result(Throwable throwable) {
        this.throwable = throwable;
    }
}

public final class UnhandledError<T> extends ResultOrError<T> {
    public final Throwable throwable;

    public Result(Throwable throwable) {
        this.throwable = throwable;
    }
}

接着:
  • 将适当的结果包装在Result中(使用map)
  • 将可处理的错误包装在HandledError
  • UnhandledError中的
  • 无法处理的错误(将onErrorResumeNextif子句一起使用)
  • 处理HandledError(使用doOnError)
  • 有一个Subscriber<ResultOrError<ResponseBody>>-它会获得所有三种类型的通知(onNext),但只会忽略HandledError并处理其他两种类型。
  • 10-08 18:45