使用forkJoin时,所有项目(在本例中为HTTP请求)都立即发送到服务器,然后作为数组发出:

let observable_array = [observable1, observable2, ...];

Observable.forkJoin(observable_array).subscribe(res => {
  console.log('Res: ', res);
}, err => {
  console.log('Err: ', err);
});


这对我来说是个问题,因为我使用的是单线程服务器(NodeJS)。如果observable_array包含10个HTTP请求,并且每个请求需要3秒钟才能完成,则30秒钟内不能发出其他请求。

我正在寻找的是一种执行请求1的方法,等待服务器的响应完成,然后执行请求2,然后等待响应,依此类推。使用这种方法意味着新的请求不必等待observable_array完成要在服务器上执行的30秒之前……他们最多只能等待3秒。

我想出了以下解决方案,但是我敢肯定,知识渊博的人会有一个更优雅的解决方案。

let observable_array = [observable1, observable2, ...];

var queue = function(t, ob) { // Needed to use var instead of let because of scoping...

  if (ob) {

    ob.subscribe(res => {
      console.log('Res: ', res);
      queue(t, observables.shift());
    }, err => {
      console.log('Err: ', err);
    });

  }

}

queue(this, observables.shift());

最佳答案

您可以使用自己喜欢的flatMap / mergeMap风格并添加并发参数。这将限制您正在服务的并发可观察项/请求的数量。

//emit value every 1s
const source = Rx.Observable.interval(1000);

const example = source.mergeMap(
    //project
    val => Rx.Observable.interval(5000).take(2),
  //resultSelector
  (oVal, iVal, oIndex, iIndex) => [oIndex, oVal, iIndex, iVal],
  //concurrent
  2
);
/*
        Output:
        [0, 0, 0, 0] <--1st inner observable
        [1, 1, 0, 0] <--2nd inner observable
        [0, 0, 1, 1] <--1st inner observable
        [1, 1, 1, 1] <--2nd inner observable
        [2, 2, 0, 0] <--3rd inner observable
        [3, 3, 0, 0] <--4th inner observable
*/
const subscribe = example.subscribe(val => console.log(val));


取自https://www.learnrxjs.io/operators/transformation/mergemap.html

注意2作为最后一个参数,以将并发请求限制为最多两个。显然,您可以将其更改为1。上面的示例使用.interval模拟了可观察对象-您只需将其替换为自己的可观察对象列表即可。

07-28 03:55
查看更多