使用forkJoin时,所有项目(在本例中为HTTP请求)都立即发送到服务器,然后作为数组发出:
let observable_array = [observable1, observable2, ...];
Observable.forkJoin(observable_array).subscribe(res => {
console.log('Res: ', res);
}, err => {
console.log('Err: ', err);
});
这对我来说是个问题,因为我使用的是单线程服务器(NodeJS)。如果
observable_array
包含10个HTTP请求,并且每个请求需要3秒钟才能完成,则30秒钟内不能发出其他请求。我正在寻找的是一种执行请求1的方法,等待服务器的响应完成,然后执行请求2,然后等待响应,依此类推。使用这种方法意味着新的请求不必等待
observable_array
完成要在服务器上执行的30秒之前……他们最多只能等待3秒。我想出了以下解决方案,但是我敢肯定,知识渊博的人会有一个更优雅的解决方案。
let observable_array = [observable1, observable2, ...];
var queue = function(t, ob) { // Needed to use var instead of let because of scoping...
if (ob) {
ob.subscribe(res => {
console.log('Res: ', res);
queue(t, observables.shift());
}, err => {
console.log('Err: ', err);
});
}
}
queue(this, observables.shift());
最佳答案
您可以使用自己喜欢的flatMap / mergeMap风格并添加并发参数。这将限制您正在服务的并发可观察项/请求的数量。
//emit value every 1s
const source = Rx.Observable.interval(1000);
const example = source.mergeMap(
//project
val => Rx.Observable.interval(5000).take(2),
//resultSelector
(oVal, iVal, oIndex, iIndex) => [oIndex, oVal, iIndex, iVal],
//concurrent
2
);
/*
Output:
[0, 0, 0, 0] <--1st inner observable
[1, 1, 0, 0] <--2nd inner observable
[0, 0, 1, 1] <--1st inner observable
[1, 1, 1, 1] <--2nd inner observable
[2, 2, 0, 0] <--3rd inner observable
[3, 3, 0, 0] <--4th inner observable
*/
const subscribe = example.subscribe(val => console.log(val));
取自https://www.learnrxjs.io/operators/transformation/mergemap.html
注意2作为最后一个参数,以将并发请求限制为最多两个。显然,您可以将其更改为1。上面的示例使用
.interval
模拟了可观察对象-您只需将其替换为自己的可观察对象列表即可。