本文介绍了RxJava - ConnectableObservable,断开连接和重新连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试从断开连接"中复制示例代码部分 这里.
断开连接
正如我们在 connect 的签名中看到的,这个方法返回一个 Subscription,就像 Observable.subscribe 一样.您可以使用该引用来终止 ConnectableObservable 的订阅.这将阻止事件传播给观察者,但不会从 ConnectableObservable 取消订阅它们.如果您再次调用 connect,ConnectableObservable 将开始新的订阅,而旧的观察者将再次开始接收值.
ConnectableObservable可连接 = Observable.interval(200, TimeUnit.MILLISECONDS).publish();订阅 s = connectable.connect();connectable.subscribe(i -> System.out.println(i));线程睡眠(1000);System.out.println(关闭连接");s.取消订阅();线程睡眠(1000);System.out.println(重新连接");s = connectable.connect();
输出
01234关闭连接重新连接012...
使用 RxJava 2.0.8,我有:
ConnectableObservable可连接 = Observable.interval(200, TimeUnit.MILLISECONDS).publish();一次性 s = connectable.connect();connectable.subscribe(new Observer() {@覆盖公共无效订阅(一次性d){}@覆盖public void onNext(Long aLong) {Log.d("test", "Num: " + aLong);}@覆盖public void onError(Throwable e) {}@覆盖公共无效 onComplete() {}});尝试 {线程睡眠(1000);} catch (InterruptedException e) {e.printStackTrace();}Log.d(测试",关闭连接");s.dispose();尝试 {线程睡眠(1000);} catch (InterruptedException e) {e.printStackTrace();}Log.d(测试",正在重新连接...");可连接.connect();
输出
编号:0数量:1数量:2数量:3数量:4关闭连接正在重新连接...
提前致谢....
解决方案
RxJava 似乎没有采用这种行为.工作示例来自 Rx.NET.请参阅 https://github.com/ReactiveX/RxJava/issues/4771>
I am trying to replicate sample code from the "Disconnecting" section here.
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription s = connectable.connect();
connectable.subscribe(i -> System.out.println(i));
Thread.sleep(1000);
System.out.println("Closing connection");
s.unsubscribe();
Thread.sleep(1000);
System.out.println("Reconnecting");
s = connectable.connect();
Output
0
1
2
3
4
Closing connection
Reconnecting
0
1
2
...
Using RxJava 2.0.8, I have:
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Disposable s = connectable.connect();
connectable.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d("test", "Num: " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.d("test", "Closing connection");
s.dispose();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.d("test", "Reconnecting...");
connectable.connect();
Output
Num: 0
Num: 1
Num: 2
Num: 3
Num: 4
Closing connection
Reconnecting...
Thanks in advance....
解决方案
It seems this behaviour has not been adopted by RxJava. The working example is from Rx.NET. See https://github.com/ReactiveX/RxJava/issues/4771
这篇关于RxJava - ConnectableObservable,断开连接和重新连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!