最近,我一直在研究RxJava 2,并且已经测试了Observable.interval()

subscription = Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

subscription.subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                //binding.appBar.mainContent.msg.setText(aLong+"");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });


在活动的onCreate方法之后开始可观察。我正在通过onNext()方法记录输出。我有一个停止按钮。触发后,我想停止订阅流程。

即使单击了停止按钮,日志仍继续运行。

stop.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                if (subscription != null) {
                    subscription.unsubscribeOn(Schedulers.io());
                }
            }
        });

最佳答案

您已经预订了Observer,这意味着您必须保留对Disposable回调中实际onSubscribe(Disposable)的引用,然后在该对象上执行Disposable#dispose()

private Disposable disposable; ... Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( new Observer() { @Override public void onSubscribe(Disposable d) { disposable = d; } // other callbacks here }); disposable.dispose();


相反,您可以将订阅更改为以下内容:

Disposable disposable = Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(Long aLong) throws Exception { // onNext } }, new Consumer() { @Override public void accept(Throwable throwable) throws Exception { // onError } }, new Action() { @Override public void run() throws Exception { // onComplete } }); disposable.dispose();

08-04 13:57