最近,我一直在挂这个话题。看起来AsyncIterables和Observables都具有类似流的质量,尽管它们的使用方式有所不同。
您可以像这样消耗异步迭代
const myAsyncIterable = async function*() { yield 1; yield 2; yield 3; }
const main = async () => {
for await (const number of myAsyncIterable()) {
console.log(number)
}
}
main()
你可以像这样消耗一个可观察物
const Observable = rxjs
const { map } = rxjs.operators
Observable.of(1, 2, 3).subscribe(x => console.log(x))
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>
我的首要问题基于此RxJS pr
在我看来,异步迭代器本来就不存在背压问题,那么在Observable上实现
Symbol.asyncIterator
(@@ asyncIterator)并默认为背压策略是否正确?根据AsyncIterables甚至需要Observables吗?理想情况下,您可以通过代码示例向我展示AsyncIterables和Observables之间的实际区别。
最佳答案
主要区别在于哪一方决定何时进行迭代。
对于异步迭代器,客户端通过调用await iterator.next()
来决定。消息源决定何时兑现 promise ,但客户必须首先要求下一个值。因此,使用者从源中“拉”数据。
Observables注册一个回调函数,当有新值传入时,observable会立即调用该回调函数。因此,源将“推”到使用者。
通过使用Subject并将其映射到异步迭代器的下一个值,可以轻松地将Observable用作消耗异步迭代器。每当您准备使用下一项时,就可以调用Subject上的next。这是一个代码示例
const pull = new Subject();
const output = pull.pipe(
concatMap(() => from(iter.next())),
map(val => {
if(val.done) pull.complete();
return val.value;
})
);
//wherever you need this
output.pipe(
).subscribe(() => {
//we're ready for the next item
if(!pull.closed) pull.next();
});