一些上下文:
在此链接中:https://github.com/ReactiveX/RxJava/issues/448 @ ben-lesh提出了一个手动递归实现以使用Observables进行轮询。
但是,在最新的RxJava版本中没有OnSubscribeFunc
。
这是我当前的实现:
Observable.create(new Observable.OnSubscribe<Item>() {
@Override
public void call(final Subscriber<? super Item> innerSubscriber) {
Schedulers.io().createWorker()
.schedulePeriodically(new Action0() {
@Override
public void call() {
searchObservable()
.doOnNext(new Action1<Item>() {
@Override
public void call(Item item) {
innerSubscriber.onNext(item);
}
})
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if (throwable != null) {
innerSubscriber.onError(throwable);
}
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
innerSubscriber.onCompleted();
}
}).subscribe(); // Set subscriber?
}
}, initialDelay, pollingInterval, TimeUnit.MINUTES);
}
})
.subscribeOn(Schedulers.io()) // performs networking on background thread
.observeOn(observeOnScheduler) // sends notifications to another Scheduler, usually the UI thread
.subscribe(subscriber); // The subscriber
searchObservable
执行服务请求。第一次运行就可以正常工作,也就是将数据传递给subscriber
。但是,在等待pollingInterval
之后,数据将返回并执行doOnNext
,但是数据不会传递到UI。我是否需要在Action
设置的schedulePeriodically
中设置任何订户? 最佳答案
它停止是因为您正在调用innerSubscriber.onCompleted
,它将在第一次运行时终止序列。有一些标准运算符可以使您获得相同的效果,而无需创建自定义Observable:
Observable.interval(initialDelay, pollingInterval, TimeUnit.MINUTES, Schedulers.io())
.onBackpressureBuffer()
.concatMap(v -> searchObservable())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
(备注:这里不需要subscribeOn(),因为无论如何间隔都会在Schedulers.io()上发出。)