考虑下面的Typescript / rxjs代码:
import { range } from 'rxjs';
range(1, 5).subscribe(async (num) => {
await longLastingOperation(num);
})
function longLastingOperation(num) {
console.log('starting', num);
return new Promise((resolve) => {
setTimeout(() => {
console.log('done with', num);
resolve();
}, Math.random() * 1000);
});
}
每个发射值触发一个随机持续时间长的持续操作。控制台输出是不可预测的,看起来类似于以下内容:
starting 1
starting 2
starting 3
starting 4
starting 5
done with 2
done with 5
done with 1
done with 4
done with 3
现在,我想为每个发射值“序列化”持久操作的执行。例如,我希望
longLastingOperation(2)
等待longLastingOperation(1)
在开始之前完成。我想每次看起来都像这样实现输出:
starting 1
done with 1
starting 2
done with 2
starting 3
done with 3
starting 4
done with 4
starting 5
done with 5
如何使用rxjs和observables实现这一目标?
最佳答案
考虑在每个concatMap
调用上使用longLastingOperation(num)
运算符,将响应包装在Observable中,并按顺序订阅结果:
range(1, 5)
.concatMap((num) => {
return Observable.fromPromise(longLastingOperation(num));
})
.subscribe(res => {
console.log(`End working on ${res}`) // Shall be ordered here
})
这是文档参考:
http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-concatMap
这是我在该主题上的一些其他工作:
https://www.linkedin.com/pulse/rx-map-misleading-marbles-tomasz-budzi%C5%84ski/
关于javascript - 序列化可观察的下一函数调用,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53189580/