一些上下文:
在此链接中:https://github.com/ReactiveX/RxJava/issues/448 @ ben-lesh提出了一个手动递归实现以使用Observables进行轮询。
但是,在最新的RxJava版本中没有OnSubscribeFunc

这是我当前的实现:

Observable.create(new Observable.OnSubscribe<Item>() {
        @Override
        public void call(final Subscriber<? super Item> innerSubscriber) {

            Schedulers.io().createWorker()
                      .schedulePeriodically(new Action0() {
                          @Override
                          public void call() {
                              searchObservable()
                                      .doOnNext(new Action1<Item>() {
                                          @Override
                                          public void call(Item item) {
                                              innerSubscriber.onNext(item);
                                          }
                                      })
                                      .doOnError(new Action1<Throwable>() {
                                          @Override
                                          public void call(Throwable throwable) {
                                              if (throwable != null) {
                                                  innerSubscriber.onError(throwable);
                                              }
                                          }
                                      })
                                      .doOnCompleted(new Action0() {
                                          @Override
                                          public void call() {
                                              innerSubscriber.onCompleted();
                                          }
                                      }).subscribe(); // Set subscriber?
                          }
                      }, initialDelay, pollingInterval, TimeUnit.MINUTES);
        }
    })
            .subscribeOn(Schedulers.io()) // performs networking on background thread
            .observeOn(observeOnScheduler) // sends notifications to another Scheduler, usually the UI thread
            .subscribe(subscriber); // The subscriber


searchObservable执行服务请求。第一次运行就可以正常工作,也就是将数据传递给subscriber。但是,在等待pollingInterval之后,数据将返回并执行doOnNext,但是数据不会传递到UI。我是否需要在Action设置的schedulePeriodically中设置任何订户?

最佳答案

它停止是因为您正在调用innerSubscriber.onCompleted,它将在第一次运行时终止序列。有一些标准运算符可以使您获得相同的效果,而无需创建自定义Observable:

Observable.interval(initialDelay, pollingInterval, TimeUnit.MINUTES, Schedulers.io())
.onBackpressureBuffer()
.concatMap(v -> searchObservable())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);


(备注:这里不需要subscribeOn(),因为无论如何间隔都会在Schedulers.io()上发出。)

08-24 12:45