我正在使用Interval运算符,并且即使管道上发生异常,我也想继续发出项目。

因此,在出现异常的情况下,我尝试使用onErrorResumeNext发出项目。但是我看到在发出该项目之后,间隔不再发出更多的项目。

这是我的单元测试。

@Test
public void testIntervalObservableWithError() {
    Subscription subscription = Observable.interval(50, TimeUnit.MILLISECONDS)
            .map(time -> "item\n")
            .map(item -> item = null)
            .map(String::toString)
            .onErrorResumeNext(t-> Observable.just("item with error emitted"))
            .subscribe(System.out::print, t->{
                        System.out.println(t);
                    }
                   );
    TestSubscriber testSubscriber = new TestSubscriber((Observer) subscription);
    testSubscriber.awaitTerminalEvent(20000, TimeUnit.MILLISECONDS);
}


我对此行为感到困惑,为什么可观察的用户取消订阅,如果它正在接收来自onErrorResumeNext的项目

解:

经过一番解释,我意识到,当发生错误时,可观察到的t是完整的。因此,我最终将可有异常的可观察变量包装到另一个可观察变量中,并且我使用的是flatMap。因此,主要的Observable继续发射项目。

@Test
public void testIntervalObservableWithError() {
    Observable.interval(100, TimeUnit.MILLISECONDS)
            .map(time -> "item\n")
            .flatMap(item -> Observable.just(item)
                    .map(String::toString))
            .subscribe(System.out::print);
    TestSubscriber testSubscriber = new TestSubscriber();
    testSubscriber.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS);
}


如果有任何操作员可以做我想知道的所有魔术。

改级

最佳答案

您的订阅中断,因为触发onErrorResumeNext时,您的上游已经完成并出现错误。您只发出一个项目,而不是让异常向下游发送。为了保持上游状态,您必须防止引发异常。

对于您的特定示例解决方案可以如下所示:

...
    .map(time -> "item\n")
    .map(item -> item = null)
    .map(item -> {
        try {
            return item.toString();
        } catch (NullPointerException e) {
            return "item with error emitted";
    })
    //no onErrorResumeNext()
    .subscribe ...


onErrorResumeNext只是将错误替换为一个项目,然后调用onComplete

10-05 21:39