好的,为了更好地学习RxJS,决定尝试创建自定义Rx运算符。

因此,这是一个可以正常工作的简单示例:

Rx.Observable.prototype.multiply = function (input) {

    const source = this;

    return Rx.Observable.create(function (obs) {

        return source.subscribe(function(val){
            obs.next(input*val);
        });
    });

};


我们可以这样使用它:

  const obs = Rx.Observable.interval(1000)
    .multiply(4)
    .forEach(function (v) {
        console.log(v);
    });


但是,如果我们得到了一些更复杂的信息,例如,如果我们的运算符采用函数而不是静态值,该怎么办?

Rx.Observable.prototype.handleFn = function (fn) {

    const source = this;

    return Rx.Observable.create(function (obs) {

        return source.subscribe(function(val){
            obs.next(fn.call(obs,val));
        });
    });

};


上面的内容很好,但是,如果我们需要处理从输入函数返回的Rx.Observable,该怎么办?

const obs = Rx.Observable.interval(1000)
    .handleFn(function(){
        return Rx.Observable.timer(399);
    })
    .forEach(function (v) {
        console.log(v);
    });


是否有某种Promise.resolve(),但对于Observables来说,以便我可以解析Rx.Observable.timer()的结果?将检查Rx.Observable.prototype.flatMap等的源代码!

最佳答案

您可以使用.mergeAll(),如下所示:

Rx.Observable.prototype.handleFn = function (fn) {

    const source = this;

    return Rx.Observable.create(function (obs) {

        return source.subscribe(function(val){
            obs.next(fn.call(obs,val));
        });
    });

};

const obs = Rx.Observable.interval(1000)
    .handleFn(function(){
        return Rx.Observable.timer(150).mapTo(Math.random());
    })
    .mergeAll();

obs.subscribe(x => console.log(x));


请参见实时JSBin here



选项2:
除了mergeAll,您还可以执行以下操作:

Rx.Observable.prototype.handleFn = function (fn) {
    const source = this;

    return Rx.Observable.create(function (obs) {
        return source.subscribe(function(val){
            fn.call(obs,val).subscribe(x => obs.next(x));
        });
    });
};




附加说明:如果您想了解如何正确实施此方法,请看一下(如您已经提到的)flatMap, switchMap, concatMap的来源。

关于javascript - Promise.resolve(),但适用于Observables(RxJS5),我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/41503963/

10-12 19:19