我发现it is undesirable在响应式编程中使用Subjects,尽管我发现它们非常方便。但是我知道他们会被滥用。因此,我尝试创建一个无限的Observable<ImmutableMap<Integer,ActionProfile>,每次调用ImmutableMap时都需要发布一个新的refresh()。我也有一个forKey()方法,该方法返回Observable来检索与特定键匹配的最新ActionProfile

但是,有些事情让我不满意我如何处理订户。如果观察对象的生命是无限的,我是否必须在观察对象的结构之外自己管理订户?可观察者是否维护其订阅者列表?还是那是我的责任,所以我可以随时叫他们onNext()

public final class ActionProfileManager {
    private final Observable<ImmutableMap<Integer,ActionProfile>> actionProfiles;
    private volatile ImmutableMap<Integer,ActionProfile> actionProfileMap;

    //do I really need this?
    private final CopyOnWriteArrayList<Subscriber<? super ImmutableMap<Integer,ActionProfile>>> subscribers = new CopyOnWriteArrayList<>();

    private ActionProfileManager() {
        this.actionProfiles = Observable.create(subscriber -> {
            subscriber.onNext(actionProfileMap);
            subscribers.add(subscriber); // is it up to me to capture the subscriber here or is it already saved somewhere for me?
        });
    }

    public void refresh() {
        actionProfileMap = importFromDb();
        subscribers.forEach(s -> s.onNext(actionProfileMap));
    }

    public Observable<ActionProfile> forKey(int actionProfileId) {
        return actionProfiles.map(m -> m.get(actionProfileId));
    }
    private ImmutableMap<Integer,ActionProfile> importFromDb() {
        return ImmutableMap.of(); //import data here
    }
}

最佳答案

冷可观测对象通常一次与一个订户交互,即使您订阅更多,它们也可以独立运行,并且彼此之间实际上并不需要了解。

另一方面,主题在他们自己接收到多播事件时必须跟踪其订户。

快速查看您的代码表明存在一些竞争情况,并且可能会丢失通知。取而代之的是,您可以依赖BehaviorSubject,它是异步单词的“反应属性”。让它存储当前的不可变映射并处理订户:

BehaviorSubject<ImmutableMap> bs = BehaviorSubject.create();
Subject<ImmutableMap, ImmutableMap> sync = bs.toSerialized();

forKey(k): bs.map(m -> m.get(k));

refresh(): sync.onNext(importFromDb());

10-04 10:32