因此,我正在开发一个由其他人开发的旧项目,并且碰巧在该项目中使用了RxAndroid。
我必须在某些应用程序的请求中添加缓存处理,而OkHttp缓存在我需要时表现不佳。因此,我想出了一种解决方案来实现我自己的服务,该服务会检查Internet连接,然后发出请求,或者根据连接从缓存中读取请求。我没有使用RxAndroid的经验,但是我读了一点,然后想到了:
@Override
public Observable<List<SocialNews>> getStream(@Path("stream") String stream, @Query("networks") String networks, @Query("page") Integer page, @Query("limit") Integer limit) {
return Observable.create(new AsyncOnSubscribe<Object, List<SocialNews>>() {
@Override
protected Object generateState() {
return null;
}
@Override
protected Object next(Object state, long requested, Observer<Observable<? extends List<SocialNews>>> observer) {
if (NetworkUtils.hasActiveInternetConnection(context)) {
observer.onNext(APIClient.getService(context).getStream(stream, networks, page, limit));
} else {
observer.onNext(Observable.just(NetworkUtils.getCachedStream()));
}
return state;
}
});
}
我这样订阅:
final Observable<List<SocialNews>> observable = APIClient.getCacheService(getActivity()).getStream(stream, networks, page, SportFiveAPI.DEFAULT_ITEM_LIMIT);
LifecycleObservable.bindFragmentLifecycle(lifecycle(),
AppObservable.bindFragment(this, observable)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()))
.subscribeOn(Schedulers.io())
.subscribe(
socialNews -> {
// do stuff
},
error -> {
// do stuff
}
);
但是不知何故,它陷入了循环,并继续在我的AsyncOnSubscribe中调用下一个方法。有人知道我在做什么错吗?
最佳答案
您不是在观察者上呼叫onCompleted
。