在RxJava 1中,使用观察者进行订阅会返回可能无法订阅的订阅。

在RxJava 2中,使用Observer进行订阅将返回void并且没有Disposeable。如何停止该“订阅”?

// v1
rx.Observable<Long> v1hot = rx.Observable.interval(1, TimeUnit.SECONDS);
rx.Observer<Long> v1observer = new TestSubscriber<>();
Subscription subscription = v1hot.subscribe(v1observer);
subscription.unsubscribe();

// v2
Observable<Long> v2hot = Observable.interval(1, TimeUnit.SECONDS);
Observer<Long> v2Observer = new TestObserver<>();
v2hot.subscribe(v2Observer); // void

编辑:如何处理这种情况,即我们使用本身并没有实现Disposable的观察者,例如BehaviorSubject呢?像这个例子一样:

// v1
rx.Observable<Long> v1hot = rx.Observable.interval(1, TimeUnit.SECONDS);
rx.Observer<Long> v1observer = rx.subjects.BehaviorSubject.create();
Subscription subscription = v1hot.subscribe(v1observer);
subscription.unsubscribe();

// v2
Observable<Long> v2hot = Observable.interval(1, TimeUnit.SECONDS);
Observer<Long> v2Observer = BehaviorSubject.createDefault(-1L);
v2hot.subscribe(v2Observer); // void

最佳答案

All other subscribe methods return a Disposable 。在您的示例中, TestObserver 本身实现了Disposable,因此您可以在观察者本身上调用 dispose() 来处理订阅。

否则,您可以将 DisposableObserver 用作您自己的自定义观察者的基类,以使抽象基类为您提供 Disposable 行为。

编辑来回答更新的问题:

如果您需要使用 subscribe(Observer) 方法(一个返回void的方法),但需要使用未实现ObserverDisposable,则您仍然可以选择将Observer包装在 SafeObserver 中,这将为您提供Disposable行为(以及其他契约(Contract)一致性保证)。

10-08 18:45