我正在尝试使用Java Rx(版本1)重试。

我想做一个retryWhen而不是简单的retry(),因为我想在达到限制时返回具有一定值的可观察对象,而不是抛出异常。

因此,检查此https://blog.danlew.net/2016/01/25/rxjavas-repeatwhen-and-retrywhen-explained/和此Catch error if retryWhen:s retries runs out,我可以构建一些有助于实现我的目的的东西。

// this is only to simulate the real method that will possibly throw an exception
public static Observable<String> test() {
    Observable<String> var = Observable.error(new IOException());
    return var;
}


Observable<String> test = test().retryWhen(attempts -> {
    return attempts.zipWith(Observable.range(1, 3), (throwable, attempt) -> {
        if (attempt == 3) {
            LOG.info("attempting");
            return Observable.just("completed with error");
        } else {
            return attempt;
        }
    });
});



test.doOnError(x -> System.out.println("do on error message")).subscribe(s -> {
    System.out.println(s);
});


当我在本地运行此命令时,我看到尝试3次的日志记录(按预期)。

我没有看到println "do on error message"(按预期)

但是我没有看到我期望的completed with error,这使我怀疑我是否实际上返回了我想要的可观察对象,我做错了什么?

我也不明白为什么它允许我在zipWith内部返回一个可观察的整数。有任何想法吗?

并且,是否有可能从我自己的可观察定义中引发异常/错误?像这样的东西:

Observable<String> test = test().retry(3).map(value -> {
// some logic to define what to do
Observable.error(new Exception("error");
});

最佳答案

首先,


  我也不明白为什么它允许我在zipWith内部返回一个可观察的整数。


zipWith中的lambda签名为(Throwable, Integer) -> Object,这意味着任何内容都是有效返回,因为它是Object的子代。之所以如此,是因为此函数定义了如何组合两个对象(在这种情况下为ThrowableInteger,并且任何Object是有效组合(或缺少它们))。

回到您的主要问题。记住retryWhen实际在做什么很重要。 (至少对我而言)这有点棘手(但至少对我而言),但是基本上retryWhen正文中的观察者发出时,都会导致上游Observable被重新订阅。这不能控制下游排放。

docs中的示例(RxJava 2片段,但仍应适用):

  Observable.create((ObservableEmitter<? super String> s) -> {
      System.out.println("subscribing");
      s.onError(new RuntimeException("always fails"));
  }).retryWhen(attempts -> {
      return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
          System.out.println("delay retry by " + i + " second(s)");
          return Observable.timer(i, TimeUnit.SECONDS);
      });
  }).blockingForEach(System.out::println);


在此示例中,当我们重新订阅初始源时,retryWhen块中的返回值受到控制。在这种情况下,我们要说的是要延迟重新订阅i秒。

考虑到这一点,retryWhen可能不是您最初寻求的解决方案。另一个解决方案可能是使用retry多次尝试尝试进行预订(如果您希望进行更多定制的重新订购,则尝试使用retryWhen),然后使用onErrorResumeNext。另请参见this

举个例子:

Observable.create((ObservableEmitter<String> s) -> s.onError(new RuntimeException("always fails")))
        .retry(3)
        .onErrorResumeNext(throwable -> {
            return Observable.just("hi");
        })
        .subscribe(System.out::println, System.out::println);


结果输出为hi。这里的关键是onErrorResumeNext允许我们将发出的异常转换为其他内容。几乎像map的例外。

07-24 04:03