最近,我一直在挂这个话题。看起来AsyncIterables和Observables都具有类似流的质量,尽管它们的使用方式有所不同。

您可以像这样消耗异步迭代

const myAsyncIterable = async function*() { yield 1; yield 2; yield 3; }

const main = async () => {
  for await (const number of myAsyncIterable()) {
    console.log(number)
  }
}

main()


你可以像这样消耗一个可观察物

const Observable = rxjs
const { map } = rxjs.operators

Observable.of(1, 2, 3).subscribe(x => console.log(x))
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>


我的首要问题基于此RxJS pr



在我看来,异步迭代器本来就不存在背压问题,那么在Observable上实现Symbol.asyncIterator(@@ asyncIterator)并默认为背压策略是否正确?根据AsyncIterables甚至需要Observables吗?

理想情况下,您可以通过代码示例向我展示AsyncIterables和Observables之间的实际区别。

最佳答案

主要区别在于哪一方决定何时进行迭代。

对于异步迭代器,客户端通过调用await iterator.next()来决定。消息源决定何时兑现 promise ,但客户必须首先要求下一个值。因此,使用者从源中“拉”数据。

Observables注册一个回调函数,当有新值传入时,observable会立即调用该回调函数。因此,源将“推”到使用者。

通过使用Subject并将其映射到异步迭代器的下一个值,可以轻松地将Observable用作消耗异步迭代器。每当您准备使用下一项时,就可以调用Subject上的next。这是一个代码示例

const pull = new Subject();
const output = pull.pipe(
  concatMap(() => from(iter.next())),
  map(val => {
    if(val.done) pull.complete();
    return val.value;
  })
);
//wherever you need this
output.pipe(

).subscribe(() => {
  //we're ready for the next item
  if(!pull.closed) pull.next();
});

09-19 13:42