考虑下面的Typescript / rxjs代码:

import { range } from 'rxjs';

range(1, 5).subscribe(async (num) => {
  await longLastingOperation(num);
})

function longLastingOperation(num) {
  console.log('starting', num);

  return new Promise((resolve) => {
    setTimeout(() => {
      console.log('done with', num);
      resolve();
    }, Math.random() * 1000);
  });
}


每个发射值触发一个随机持续时间长的持续操作。控制台输出是不可预测的,看起来类似于以下内容:

starting 1
starting 2
starting 3
starting 4
starting 5
done with 2
done with 5
done with 1
done with 4
done with 3


现在,我想为每个发射值“序列化”持久操作的执行。例如,我希望longLastingOperation(2)等待longLastingOperation(1)在开始之前完成。

我想每次看起来都像这样实现输出:

starting 1
done with 1
starting 2
done with 2
starting 3
done with 3
starting 4
done with 4
starting 5
done with 5


如何使用rxjs和observables实现这一目标?

最佳答案

考虑在每个concatMap调用上使用longLastingOperation(num)运算符,将响应包装在Observable中,并按顺序订阅结果:

range(1, 5)
.concatMap((num) => {
  return Observable.fromPromise(longLastingOperation(num));
})
.subscribe(res => {
  console.log(`End working on ${res}`) // Shall be ordered here
})


这是文档参考:
http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-concatMap

这是我在该主题上的一些其他工作:
https://www.linkedin.com/pulse/rx-map-misleading-marbles-tomasz-budzi%C5%84ski/

关于javascript - 序列化可观察的下一函数调用,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53189580/

10-16 16:00