RxJS的flatMapLatest展平了最新的(仅一个)嵌套的Observable。我有一个用例,我不想用flatMap(可以展平过去所有嵌套的Observables),也不想用flatMapWithConcurrency(因为它偏爱旧的Observables,而不是最新的Observables),所以我想要的是flatMapLatestTwo或一些版本的flatMapLatest,您可以在其中指定并发嵌套Observable的最大数量,例如flatMapLatest(2, selectorFn)
。
这是我想要的输出(_X
表示嵌套的Observable X
,而eX
表示其第X个onNext事件):
_0e0
_0e1
_1e0
_0e2
_1e1
_2e0
_1e2
_2e1
_3e0
_2e2
_3e1
_4e0
_3e2
_4e1
_3e3
_4e2
_4e3
这是flatMapLatest产生的:
_0e0
_0e1
_1e0
_1e1
_2e0
_2e1
_3e0
_3e1
_4e0
_4e1
_4e2
_4e3
我更喜欢使用现有运算符而不是实现此底层操作的解决方案。
最佳答案
这看起来很幼稚。我正在寻找改善的方法,但是这里是:
Rx.Observable.prototype.flatMapLatestN = function (count, transform) {
let queue = [];
return this.flatMap(x => {
return Rx.Observable.create(observer => {
let disposable;
if (queue.length < count) {
disposable = transform(x).subscribe(observer);
queue.push(observer);
}
else {
let earliestObserver = queue[0];
if (earliestObserver) {
earliestObserver.onCompleted();
}
disposable = transform(x).subscribe(observer);
queue.push(observer);
}
return () => {
disposable.dispose();
let i = queue.indexOf(observer);
queue.splice(i, 1);
};
});
});
};
要测试:
function space(n) {
return Array(n+1).join(' ');
}
Rx.Observable
.interval(1000)
.take(6)
.flatMapLatestN(2, (x) => {
return Rx.Observable
.interval(300)
.take(10)
.map(n => `${space(x*4)}${x}-${n}`);
})
.subscribe(console.log.bind(console));
它将输出:
0-1
0-2
0-3
1-0
0-4
1-1
0-5
1-2
1-3
2-0
1-4
2-1
1-5
2-2
2-3
3-0
2-4
3-1
2-5
3-2
3-3
4-0
3-4
4-1
3-5
4-2
4-3
5-0
4-4
5-1
4-5
5-2
4-6
5-3
4-7
5-4
4-8
5-5
4-9
5-6
5-7
5-8
5-9