我正在尝试将现有的API转换为可与RxJS一起使用...对 Node 而言是相当新的,而对RxJs而言则是非常新的,所以请耐心等待。
我有一个现有的API(getNextMessage),当某些东西可用时,它要么阻塞(异步),要么通过 Node 样式(err,val)回调返回新项或错误。
所以看起来像这样:
getNextMessage(nodeStyleCompletionCallback);
您可以将getNextMessage视为一个http请求,该请求将来会在服务器响应时完成,但是您确实需要在收到消息后再次调用getNextMessage,以继续从服务器获取新项目。
因此,为了使其成为一个可观察的集合,我必须让RxJs继续调用我的getNextMessage函数,直到订阅者被处置为止。
基本上,我正在尝试创建自己的RxJs可观察集合。
问题是:
希望能有所帮助,谢谢!
这是我现有的代码:
var Rx = require('rx');
var port = require('../lib/port');
var async = require('async');
function observableReceive(portName)
{
var observerCallback;
var listenPort = new port(portName);
var disposed = false;
var asyncReceive = function(asyncCallback)
{
listenPort.getNextMessage(
function(error, json)
{
observerCallback(error, json);
if (!disposed)
setImmediate(asyncCallback);
}
);
}
return function(outerCallback)
{
observerCallback = outerCallback;
async.forever(asyncReceive);
}
}
var receive = Rx.Observable.fromNodeCallback(observableReceive('rxtest'));
var source = receive();
var subscription = source.forEach(
function (json)
{
console.log('receive completed: ' + JSON.stringify(json));
},
function (error) {
console.log("receive failed: " + error.toString());
},
function () {
console.log('Completed');
subscription.dispose();
}
);
最佳答案
所以这可能是我会做的。
var Rx = require('Rx');
// This is just for kicks. You have your own getNextMessage to use. ;)
var getNextMessage = (function(){
var i = 1;
return function (callback) {
setTimeout(function () {
if (i > 10) {
callback("lawdy lawd it's ova' ten, ya'll.");
} else {
callback(undefined, i++);
}
}, 5);
};
}());
// This just makes an observable version of getNextMessage.
var nextMessageAsObservable = Rx.Observable.create(function (o) {
getNextMessage(function (err, val) {
if (err) {
o.onError(err);
} else {
o.onNext(val);
o.onCompleted();
}
});
});
// This repeats the call to getNextMessage as many times (11) as you want.
// "take" will cancel the subscription after receiving 11 items.
nextMessageAsObservable
.repeat()
.take(11)
.subscribe(
function (x) { console.log('next', x); },
function (err) { console.log('error', err); },
function () { console.log('done'); }
);
关于node.js - 试图使我自己的RxJ可见,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/21667824/