最近,我一直在研究RxJava 2,并且已经测试了Observable.interval()
subscription = Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
subscription.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
//binding.appBar.mainContent.msg.setText(aLong+"");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
在活动的
onCreate
方法之后开始可观察。我正在通过onNext()
方法记录输出。我有一个停止按钮。触发后,我想停止订阅流程。即使单击了停止按钮,日志仍继续运行。
stop.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
if (subscription != null) {
subscription.unsubscribeOn(Schedulers.io());
}
}
});
最佳答案
您已经预订了Observer
,这意味着您必须保留对Disposable
回调中实际onSubscribe(Disposable)
的引用,然后在该对象上执行Disposable#dispose()
。 private Disposable disposable; ... Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( new Observer() { @Override public void onSubscribe(Disposable d) { disposable = d; } // other callbacks here }); disposable.dispose();
相反,您可以将订阅更改为以下内容: Disposable disposable = Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(Long aLong) throws Exception { // onNext } }, new Consumer() { @Override public void accept(Throwable throwable) throws Exception { // onError } }, new Action() { @Override public void run() throws Exception { // onComplete } }); disposable.dispose();