好的,为了更好地学习RxJS,决定尝试创建自定义Rx运算符。
因此,这是一个可以正常工作的简单示例:
Rx.Observable.prototype.multiply = function (input) {
const source = this;
return Rx.Observable.create(function (obs) {
return source.subscribe(function(val){
obs.next(input*val);
});
});
};
我们可以这样使用它:
const obs = Rx.Observable.interval(1000)
.multiply(4)
.forEach(function (v) {
console.log(v);
});
但是,如果我们得到了一些更复杂的信息,例如,如果我们的运算符采用函数而不是静态值,该怎么办?
Rx.Observable.prototype.handleFn = function (fn) {
const source = this;
return Rx.Observable.create(function (obs) {
return source.subscribe(function(val){
obs.next(fn.call(obs,val));
});
});
};
上面的内容很好,但是,如果我们需要处理从输入函数返回的Rx.Observable,该怎么办?
const obs = Rx.Observable.interval(1000)
.handleFn(function(){
return Rx.Observable.timer(399);
})
.forEach(function (v) {
console.log(v);
});
是否有某种Promise.resolve(),但对于Observables来说,以便我可以解析Rx.Observable.timer()的结果?将检查
Rx.Observable.prototype.flatMap
等的源代码! 最佳答案
您可以使用.mergeAll()
,如下所示:
Rx.Observable.prototype.handleFn = function (fn) {
const source = this;
return Rx.Observable.create(function (obs) {
return source.subscribe(function(val){
obs.next(fn.call(obs,val));
});
});
};
const obs = Rx.Observable.interval(1000)
.handleFn(function(){
return Rx.Observable.timer(150).mapTo(Math.random());
})
.mergeAll();
obs.subscribe(x => console.log(x));
请参见实时JSBin here。
选项2:
除了
mergeAll
,您还可以执行以下操作:Rx.Observable.prototype.handleFn = function (fn) {
const source = this;
return Rx.Observable.create(function (obs) {
return source.subscribe(function(val){
fn.call(obs,val).subscribe(x => obs.next(x));
});
});
};
附加说明:如果您想了解如何正确实施此方法,请看一下(如您已经提到的)
flatMap, switchMap, concatMap
的来源。关于javascript - Promise.resolve(),但适用于Observables(RxJS5),我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/41503963/