本文介绍了RxJava - ConnectableObservable,断开连接和重新连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从断开连接"中复制示例代码部分 这里.

断开连接

正如我们在 connect 的签名中看到的,这个方法返回一个 Subscription,就像 Observable.subscribe 一样.您可以使用该引用来终止 ConnectableObservable 的订阅.这将阻止事件传播给观察者,但不会从 ConnectableObservable 取消订阅它们.如果您再次调用 connect,ConnectableObservable 将开始新的订阅,而旧的观察者将再次开始接收值.

ConnectableObservable可连接 = Observable.interval(200, TimeUnit.MILLISECONDS).publish();订阅 s = connectable.connect();connectable.subscribe(i -> System.out.println(i));线程睡眠(1000);System.out.println(关闭连接");s.取消订阅();线程睡眠(1000);System.out.println(重新连接");s = connectable.connect();

输出

01234关闭连接重新连接012...


使用 RxJava 2.0.8,我有:

 ConnectableObservable可连接 = Observable.interval(200, TimeUnit.MILLISECONDS).publish();一次性 s = connectable.connect();connectable.subscribe(new Observer() {@覆盖公共无效订阅(一次性d){}@覆盖public void onNext(Long aLong) {Log.d("test", "Num: " + aLong);}@覆盖public void onError(Throwable e) {}@覆盖公共无效 onComplete() {}});尝试 {线程睡眠(1000);} catch (InterruptedException e) {e.printStackTrace();}Log.d(测试",关闭连接");s.dispose();尝试 {线程睡眠(1000);} catch (InterruptedException e) {e.printStackTrace();}Log.d(测试",正在重新连接...");可连接.connect();

输出

编号:0数量:1数量:2数量:3数量:4关闭连接正在重新连接...

提前致谢....

解决方案

RxJava 似乎没有采用这种行为.工作示例来自 Rx.NET.请参阅 https://github.com/ReactiveX/RxJava/issues/4771>

I am trying to replicate sample code from the "Disconnecting" section here.

ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription s = connectable.connect();

connectable.subscribe(i -> System.out.println(i));

Thread.sleep(1000);
System.out.println("Closing connection");
s.unsubscribe();

Thread.sleep(1000);
System.out.println("Reconnecting");
s = connectable.connect();

Output

0
1
2
3
4
Closing connection
Reconnecting
0
1
2
...


Using RxJava 2.0.8, I have:

    ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
    Disposable s = connectable.connect();

    connectable.subscribe(new Observer<Long>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(Long aLong) {
            Log.d("test", "Num: " + aLong);
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });

    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    Log.d("test", "Closing connection");
    s.dispose();

    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    Log.d("test", "Reconnecting...");
    connectable.connect();

Output

Num: 0
Num: 1
Num: 2
Num: 3
Num: 4
Closing connection
Reconnecting...

Thanks in advance....

解决方案

It seems this behaviour has not been adopted by RxJava. The working example is from Rx.NET. See https://github.com/ReactiveX/RxJava/issues/4771

这篇关于RxJava - ConnectableObservable,断开连接和重新连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-18 23:45