我将状态保存在一个ReplaySubject
中,以重播状态的最后一个副本。从该状态,派生其他ReplaySubjects
来保持...很好的派生状态。每个重播主题仅需要保持其最后的计算状态/派生状态。 (我们不使用BehaviorSubjects
,因为它们总是提供一个值,但我们只希望从父级可观察对象派生一个值。)如果我们已经生成派生状态,则始终有必要将值重放给新的订户。
我有一个自定义的可观察运算符,可以按照我想要的方式完成此操作,但是感觉并不干净。我觉得应该有一种有效的方法来利用RxJ的运算符(operator)本身来完成此任务。
我尝试了两种最明显的方法,但是每种方法都有一些小问题。问题涉及取消订阅和重新订阅。
打开下面的小提琴,打开控制台,然后单击运行。我将用每个输出描述问题。
https://jsfiddle.net/gfe1nryp/1/
用refCount
编辑的ReplaySubject的问题
=== RefCounted Observable ===
Work
Subscription 1: 1
Work
Subscription 1: 2
Work
Subscription 1: 3
Unsubscribe
Resubscribe
Subscription 2: 3
Work
Subscription 2: 6
Work
Subscription 2: 7
Work
Subscription 2: 8
这很好用,当没有订阅时,中间函数不起作用。但是,一旦我们重新订阅。我们可以看到订阅2在取消订阅之前重播了最后一个状态,然后根据
base$
状态中的当前值播放了派生状态。这是不理想的。connect
ed ReplaySubject 问题=== Hot Observable ===
Work
Subscription 1: 1
Work
Subscription 1: 2
Work
Subscription 1: 3
Unsubscribe
Work
Work
Work
Resubscribe
Subscription 2: 6
Work
Subscription 2: 7
Work
Subscription 2: 8
这个问题与
refCount
ed可观察到的问题不同,在取消订阅之前,没有不必要的最后状态重播。但是,由于observable现在很热,所以需要权衡的是,即使有任何订阅未使用该值,只要有新值进入base$
状态,我们总是会工作。最后,我们有了自定义运算符:
=== Custom Observable ===
Work
Subscription 1: 1
Work
Subscription 1: 2
Work
Subscription 1: 3
Unsubscribe
Resubscribe
Work
Subscription 2: 6
Work
Subscription 2: 7
Work
Subscription 2: 8
啊,两全其美。它不仅不会不必要地重播取消订阅之前的最后一个值,而且在没有订阅的情况下也不会不必要地进行任何工作。
这是通过手动创建
RefCount
和ReplaySubject
的组合来完成的。我们跟踪每个订户,当它达到0时,我们刷新重播值。它的代码在这里(当然,在小提琴中):Rx.Observable.prototype.selectiveReplay = function() {
let subscribers = [];
let innerSubscription;
let storage = null;
return Rx.Observable.create(observer => {
if (subscribers.length > 0) {
observer.next(storage);
}
subscribers.push(observer);
if (!innerSubscription) {
innerSubscription = this.subscribe(val => {
storage = val;
subscribers.forEach(subscriber => subscriber.next(val))
});
}
return () => {
subscribers = subscribers.filter(subscriber => subscriber !== observer);
if (subscribers.length === 0) {
storage = null;
innerSubscription.unsubscribe();
innerSubscription = null;
}
};
});
};
因此,这个可观察的自定义变量已经可以使用了。 但是,仅使用RxJS运算符可以完成吗?请记住,这样的主题可能有多个相互关联。在示例中,我仅使用一个链接到
base$
来说明我在最基本的级别上尝试过的两种常用方法的问题。基本上,如果您只能使用RxJS运算符,并获得与上述
=== Custom Observable ===
的输出匹配的输出。这就是我想要的。谢谢! 最佳答案
您应该能够将multicast
与主题工厂而不是主题一起使用。 cf. https://jsfiddle.net/pto7ngov/1/
(function(){
console.log('=== RefCounted Observable ===');
var base$ = new Rx.ReplaySubject(1);
var listen$ = base$.map(work).multicast(()=> new Rx.ReplaySubject(1)).refCount();
var subscription1 = listen$.subscribe(x => console.log('Subscription 1: ' + x));
base$.next(1);
base$.next(2);
base$.next(3);
console.log('Unsubscribe');
subscription1.unsubscribe();
base$.next(4);
base$.next(5);
base$.next(6);
console.log('Resubscribe');
var subscription2 = listen$.subscribe(x => console.log('Subscription 2: ' + x));
base$.next(7);
base$.next(8);
})();
multicast
operator的重载完全可以满足您的用例。每次multicast
运算符返回的可观察对象完成并重新连接时,它都会使用提供的工厂创建一个新的主题。虽然它没有很好的文档记录,但是它基本上是从Rxjs v4复制现有的API。万一我误解了或者不起作用,请告诉我,