我对RxJava相当陌生,并且与其他许多人一样,我正在设法使我避开异常处理。我在网上阅读了很多文章(例如,此处how to handle exceptions thrown by observer's onNext的讨论),并认为我了解了这些概念的基本概念。
在上面提到的讨论中,一位发布者说,当订户中引发异常时,RxJava将执行以下操作:
实施常规处理以记录故障并停止发送事件
(任何形式)并清理归因于该订户的任何资源并随身携带
以及所有剩余的订阅。
这或多或少是我所看到的,我唯一遇到的问题是“清理所有资源”位。为了清楚起见,让我们假设以下示例:
我想创建一个Observable来侦听每个接收到的消息上的异步事件源(例如JMS队列)和onNext()。因此,在(伪)代码中,我将执行以下操作:
Observable<String> observable = Observable.create( s -> {
createConnectionToBroker();
getConsumer().setMessageListener(message -> s.onNext(transform(message)));
s.setDisposable(new Disposable() {
public void dispose() {
tearDownBrokerConnection();
}
});
});
由于我想为许多订阅者/观察者重用消息侦听器,因此我不直接在创建的Observable上进行订阅,而是使用publish()。refCount()团队。类似于以下内容:
Observable<String> observableToSubscribeTo = observable.publish().refCount();
Disposable d1 = observableToSubscribeTo.subscribe(s -> ...);
Disposable d2 = observableToSubscribeTo.subscribe(s -> ...);
这一切都按预期工作。该代码仅在建立第一个订阅时才连接到JMS,并且在最后一个观察者被
dispose()
d时关闭与代理的连接。但是,如果订户在
onNext()
时引发异常,事情似乎就会变得混乱。不出所料,抛出的观察者很讨厌,每当发布一个新事件时,都不会再通知它。我的问题似乎是,当所有其余订户都被dispose()
d时,不再通知与消息代理保持连接的Observable。在我看来,引发异常的订户处于某种僵尸状态。当涉及事件分发时,它将被忽略,但是它以某种方式阻止了当最后一个订阅者是dispose()
d时通知根Observable。我知道RxJava希望观察者确保不抛出而是正确地处理最终异常。不幸的是,在我想提供一个将Observable返回给调用者的库的情况下,无论如何我都无法控制订户。这意味着,我将永远无法保护我的图书馆免受愚蠢的观察者的侵害。
所以,我问自己:我在这里错过了什么吗?订户抛出时真的没有机会正确清理吗?这是一个错误还是仅仅是我不了解库?
任何见解非常感谢!
最佳答案
如果您可以显示一些单元测试来证明问题(不需要JMS),那就太好了。
另外,RxJava 2中的onNext绝不能抛出;如果这样做,则是未定义的行为。如果您不信任消费者,则可以使用一个可在终端观察的变压器,该变压器可以执行safeSubscribe
而不是普通的subscribe
,可以增加保护以防止下游行为不当:
.compose(o -> v -> o.safeSubscribe(v))
要么
.compose(new ObservableTransformer<T>() {
@Override public Observable<T> apply(final Observable<T> source) {
return new Observable<T>() {
@Override public void subscribeActual(Observer<? super T> observer) {
source.safeSubscribe(observer);
}
};
}
})