我正在尝试将现有的API转换为可与RxJS一起使用...对 Node 而言是相当新的,而对RxJs而言则是非常新的,所以请耐心等待。

我有一个现有的API(getNextMessage),当某些东西可用时,它要么阻塞(异步),要么通过 Node 样式(err,val)回调返回新项或错误。

所以看起来像这样:

getNextMessage(nodeStyleCompletionCallback);

您可以将getNextMessage视为一个http请求,该请求将来会在服务器响应时完成,但是您确实需要在收到消息后再次调用getNextMessage,以继续从服务器获取新项目。

因此,为了使其成为一个可观察的集合,我必须让RxJs继续调用我的getNextMessage函数,直到订阅者被处置为止。

基本上,我正在尝试创建自己的RxJs可观察集合。

问题是:

  • 我不知道如何使Subscriber.dispose()杀死async.forever
  • 我可能不应该首先使用async.forever
  • 我不确定我是否应该为每条消息都“完成”-不应该在序列
  • 的末尾
  • 我想最终消除使用fromNodeCallback的需要,以便拥有一流的RxJS可观察的
  • 显然我有点困惑。

  • 希望能有所帮助,谢谢!

    这是我现有的代码:
    var Rx = require('rx');
    var port = require('../lib/port');
    var async = require('async');
    
    function observableReceive(portName)
    {
        var observerCallback;
        var listenPort = new port(portName);
        var disposed = false;
    
        var asyncReceive = function(asyncCallback)
        {
            listenPort.getNextMessage(
                function(error, json)
                {
                    observerCallback(error, json);
    
                    if (!disposed)
                        setImmediate(asyncCallback);
                }
            );
        }
    
        return function(outerCallback)
        {
            observerCallback = outerCallback;
            async.forever(asyncReceive);
        }
    }
    
    var receive = Rx.Observable.fromNodeCallback(observableReceive('rxtest'));
    var source = receive();
    
    var subscription = source.forEach(
        function (json)
        {
            console.log('receive completed: ' + JSON.stringify(json));
        },
        function (error) {
            console.log("receive failed: " + error.toString());
        },
        function () {
            console.log('Completed');
            subscription.dispose();
        }
    );
    

    最佳答案

    所以这可能是我会做的。

    var Rx = require('Rx');
    
    // This is just for kicks. You have your own getNextMessage to use. ;)
    var getNextMessage = (function(){
    
      var i = 1;
    
      return function (callback) {
        setTimeout(function () {
          if (i > 10) {
            callback("lawdy lawd it's ova' ten, ya'll.");
          } else {
            callback(undefined, i++);
          }
        }, 5);
      };
    
    }());
    
    // This just makes an observable version of getNextMessage.
    var nextMessageAsObservable = Rx.Observable.create(function (o) {
      getNextMessage(function (err, val) {
        if (err) {
          o.onError(err);
        } else {
          o.onNext(val);
          o.onCompleted();
        }
      });
    });
    
    // This repeats the call to getNextMessage as many times (11) as you want.
    // "take" will cancel the subscription after receiving 11 items.
    nextMessageAsObservable
      .repeat()
      .take(11)
      .subscribe(
        function (x)   { console.log('next', x);    },
        function (err) { console.log('error', err); },
        function ()    { console.log('done');       }
      );
    

    关于node.js - 试图使我自己的RxJ可见,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/21667824/

    10-12 07:01