我对RxJava相当陌生,并且与其他许多人一样,我正在设法使我避开异常处理。我在网上阅读了很多文章(例如,此处how to handle exceptions thrown by observer's onNext的讨论),并认为我了解了这些概念的基本概念。

在上面提到的讨论中,一位发布者说,当订户中引发异常时,RxJava将执行以下操作:


  实施常规处理以记录故障并停止发送事件
  (任何形式)并清理归因于该订户的任何资源并随身携带
  以及所有剩余的订阅。


这或多或少是我所看到的,我唯一遇到的问题是“清理所有资源”位。为了清楚起见,让我们假设以下示例:

我想创建一个Observable来侦听每个接收到的消息上的异步事件源(例如JMS队列)和onNext()。因此,在(伪)代码中,我将执行以下操作:

Observable<String> observable = Observable.create( s -> {
  createConnectionToBroker();
  getConsumer().setMessageListener(message -> s.onNext(transform(message)));
  s.setDisposable(new Disposable() {
    public void dispose() {
      tearDownBrokerConnection();
    }
  });
});


由于我想为许多订阅者/观察者重用消息侦听器,因此我不直接在创建的Observable上进行订阅,而是使用publish()。refCount()团队。类似于以下内容:

Observable<String> observableToSubscribeTo = observable.publish().refCount();

Disposable d1 = observableToSubscribeTo.subscribe(s -> ...);
Disposable d2 = observableToSubscribeTo.subscribe(s -> ...);


这一切都按预期工作。该代码仅在建立第一个订阅时才连接到JMS,并且在最后一个观察者被dispose() d时关闭与代理的连接。

但是,如果订户在onNext()时引发异常,事情似乎就会变得混乱。不出所料,抛出的观察者很讨厌,每当发布一个新事件时,都不会再通知它。我的问题似乎是,当所有其余订户都被dispose() d时,不再通知与消息代理保持连接的Observable。在我看来,引发异常的订户处于某种僵尸状态。当涉及事件分发时,它将被忽略,但是它以某种方式阻止了当最后一个订阅者是dispose() d时通知根Observable。

我知道RxJava希望观察者确保不抛出而是正确地处理最终异常。不幸的是,在我想提供一个将Observable返回给调用者的库的情况下,无论如何我都无法控制订户。这意味着,我将永远无法保护我的图书馆免受愚蠢的观察者的侵害。

所以,我问自己:我在这里错过了什么吗?订户抛出时真的没有机会正确清理吗?这是一个错误还是仅仅是我不了解库?

任何见解非常感谢!

最佳答案

如果您可以显示一些单元测试来证明问题(不需要JMS),那就太好了。

另外,RxJava 2中的onNext绝不能抛出;如果这样做,则是未定义的行为。如果您不信任消费者,则可以使用一个可在终端观察的变压器,该变压器可以执行safeSubscribe而不是普通的subscribe,可以增加保护以防止下游行为不当:

.compose(o -> v -> o.safeSubscribe(v))


要么

.compose(new ObservableTransformer<T>() {
    @Override public Observable<T> apply(final Observable<T> source) {
        return new Observable<T>() {
            @Override public void subscribeActual(Observer<? super T> observer) {
                 source.safeSubscribe(observer);
            }
        };
    }
})

08-18 10:33