问题描述
我很难理解如何使用RxJava构建缓存。我的想法是,我需要从内存缓存中获取数据或从我的数据库(dynamoDb)加载。但是,此缓存应该跨片段和/或线程共享。所以我需要返回当前正在运行但尚未完成的现有observable。这允许线程赶上而不做不必要的工作。我是RxJava的新手所以这就是我想到的草图(为了简洁而缺少一些代码):
I'm having a hard time understanding on how to build a cache using RxJava. The idea is that I need to either get data from a memory cache or load from my database (dynamoDb). However, this cache is supposed to be shared across fragments and or threads. So I need to return existing observables that are currently running and not finished. This allows threads to catch up and not do unnecessary work. I'm new to RxJava so this is what I have in mind as a sketch (missing some code for brevity):
public class DBCache<K, T> {
private final ConcurrentHashMap<K, Set<T>> resultCache = new ConcurrentHashMap<>;
private final ConcurrentHashMap<K, Observable<Set<T>>> observableCache = new ConcurrentHashMap<>;
private Observable<Set<T>> getFromCache(final DynamoDbCacheKey<K, T> query) {
return Observable.create(new Observable.OnSubscribe<Set<T>>() {
@Override
public void call(Subscriber<? super Set<T>> subscriber) {
Set<T> results = resultCache.get(query.getKey());
if (results != null && results.size() > 0) {
subscriber.onNext(results);
}
subscriber.onCompleted();
}
});
}
public Observable<Set<T>> get(final QueryCacheKey<K, T> query){
Observable<Set<T>> cachedObservable = observableCache.get(query.getKey());
if (cachedObservable != null) {
return cachedObservable;
}
Observable<Set<T>> observable = Observable
.concat(getFromCache(query), getFromNetwork(query))
.first()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.cache();
observableCache.putIfAbsent(query.getKey(), observable);
return observable;
}
private Observable<Set<T>> getFromNetwork(final QueryCacheKey<K, T> query) {
return Observable.create(new Observable.OnSubscribe<Set<T>>() {
@Override
public void call(Subscriber<? super Set<T>> subscriber) {
try {
Set<T> results = loadFromDb(query); //omitted
resultCache.putIfAbsent(query.getKey(), results);
subscriber.onNext(results);
subscriber.onCompleted();
observableCache.remove(query.getKey());
} catch (Exception exception) {
subscriber.onError(exception);
}
}
});
}
}
有没有更好的方法通过RxJava实现这一点(对缓存策略不感兴趣)。有什么想法?
Is there a better way to achieve this through RxJava (Not interested in caching strategies). Any thoughts?
推荐答案
这是一个简单的缓存和检索值的例子:
Here is a simple example of doing caching and retrieving a value once:
public class RxCache<K, V> {
final ConcurrentHashMap<K, AsyncSubject<V>> cache;
final Func1<K, Observable<V>> valueGenerator;
public RxCache(Func1<K, Observable<V>> valueGenerator) {
this.valueGenerator = valueGenerator;
this.cache = new ConcurrentHashMap<>();
}
public Observable<V> get(K key) {
AsyncSubject<V> o = cache.get(key);
if (o != null) {
return o;
}
o = AsyncSubject.create();
AsyncSubject<V> p = cache.putIfAbsent(key, o);
if (p != null) {
return p;
}
valueGenerator.call(key).subscribe(o);
return o;
}
public void remove(K key) {
cache.remove(key);
}
}
如果您有多个值,请替换 AsyncSubject
with ReplaySubject
。
If you have multiple values, replace AsyncSubject
with ReplaySubject
.
这篇关于RxJava构建缓存与android的observables的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!