我经常遇到一种模式,我不确定如何有效地解决它。

基本上,如果我有一个包含昂贵项目Observable<T>T,我不想每次使用某项内容时都重新构建该T项,或者将其映射到一千个其他不同的可观察对象,这将导致它被构建1000次。

因此,我开始使用replay()将其缓存一段时间,但是理想情况下,当排放闲置一段时间后,我希望它清除缓存。

是否可以使用运算符(operator)或某些变压器来完成此任务?

public final class ActionManager {

    private final Observable<ImmutableList<Action>> actionMap;

    private ActionManager() {
        this.actionMap = Observable.defer(() -> buildExpensiveList()).replay(10, TimeUnit.SECONDS).autoConnect();
    }

    //this method could get called thousands of times
    //I don't want to rebuild the map for every call

    public Observable<Action> forItem(Item item) {
        actionMap.map(l -> //get Action for item);
    }


}

更新

尝试将其实现为“变形金刚/运算符”组合。我在这里做错什么了吗?
   public static <T> Transformer<T,T> recacheOnIdle(long time, TimeUnit timeUnit) {

        return obs -> obs.timeout(time, timeUnit).lift(new Operator<T,T>() {

            private volatile T cachedItem;
            private volatile boolean isCurrent = false;

            @Override
            public Subscriber<? super T> call(Subscriber<? super T> s) {
                return new Subscriber<T>(s) {
                    @Override
                    public void onCompleted() {
                         if(!s.isUnsubscribed()) {
                              s.onCompleted();
                         }
                    }
                    @Override
                    public void onError(Throwable e) {
                        if(!s.isUnsubscribed()) {
                            if (e instanceof TimeoutException) {
                                isCurrent = false;
                                cachedItem = null;
                            } else {
                                s.onError(e);
                            }
                        }
                    }

                    @Override
                    public void onNext(T t) {
                        if(!s.isUnsubscribed()) {
                            if (!isCurrent) {
                                cachedItem = t;
                            }
                            s.onNext(cachedItem);
                        }
                    }
                };
            }

        });
    }

最佳答案

您可能可以使用timeout operatorconnectable observable(用于保存和同步多个订户):



通过这种方式,您可以通过重新缓存昂贵的商品来应对抛出的错误。假设这是一个“罕见”案例:

// if no emissions are made for a period of 3 seconds - will call onError
observableWithCache.timeout(3000, TimeUnit.MILLISECONDS).subscribe(new Subscriber<SomeObject>() {

    public void onCompleted() {

    }

    public void onError(Throwable arg0) {
        doClearCache(); // make sure to re-subscribe with timeout
    }

    public void onNext(SomeObject item) {
        System.out.println("Got item: " + item); // you can ignore this
    }
});

请注意,onError不会取消原始可观察到的,如图所示:

java - RxJava-缓存直到排放闲置一段时间?-LMLPHP

但是您可以对一段时间内没有产生任何排放的气体使用react。

10-02 05:02