因此,我正在开发一个由其他人开发的旧项目,并且碰巧在该项目中使用了RxAndroid。
我必须在某些应用程序的请求中添加缓存处理,而OkHttp缓存在我需要时表现不佳。因此,我想出了一种解决方案来实现我自己的服务,该服务会检查Internet连接,然后发出请求,或者根据连接从缓存中读取请求。我没有使用RxAndroid的经验,但是我读了一点,然后想到了:

@Override
public Observable<List<SocialNews>> getStream(@Path("stream") String stream, @Query("networks") String networks, @Query("page") Integer page, @Query("limit") Integer limit) {
    return Observable.create(new AsyncOnSubscribe<Object, List<SocialNews>>() {
        @Override
        protected Object generateState() {
            return null;
        }

        @Override
        protected Object next(Object state, long requested, Observer<Observable<? extends List<SocialNews>>> observer) {
            if (NetworkUtils.hasActiveInternetConnection(context)) {
                observer.onNext(APIClient.getService(context).getStream(stream, networks, page, limit));
            } else {
                observer.onNext(Observable.just(NetworkUtils.getCachedStream()));
            }
            return state;
        }
    });
}


我这样订阅:

 final Observable<List<SocialNews>> observable = APIClient.getCacheService(getActivity()).getStream(stream, networks, page, SportFiveAPI.DEFAULT_ITEM_LIMIT);
    LifecycleObservable.bindFragmentLifecycle(lifecycle(),
            AppObservable.bindFragment(this, observable)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread()))
            .subscribeOn(Schedulers.io())
            .subscribe(
                    socialNews -> {
                        // do stuff
                    },
                    error -> {
                        // do stuff
                    }
            );


但是不知何故,它陷入了循环,并继续在我的AsyncOnSubscribe中调用下一个方法。有人知道我在做什么错吗?

最佳答案

您不是在观察者上呼叫onCompleted

10-07 20:48