我试图控制缓慢的订户的流入。在NodeJS中尝试了以下内容
var xmlNodeStream = Rx.Observable.from([1,2,3,4,5,6,7,8,9,10,11]);
var commJson = xmlNodeStream.bufferWithCount(2).publish();
var FastSubscriber = commJson.subscribe(
function (x) { console.log('----------\nFastSub: onNext: %s', x); },
function (e) { console.log('FastSub: onError: %s', e); },
function () { console.log('FastSub: onCompleted'); });
var slowSubscriber = commJson.subscribe(function (x) {
setTimeout(function () { console.log("============\nSlowsub called: ", x); }, 5000);
});
commJson.connect();
当我run the above code时,我希望慢速的订户每次都暂停5秒,然后再接收下一个数据批。
但这没有发生。最初的5秒延迟后,所有数据将以2为批次泛洪到
slowSubscriber
。什么是控制流入的正确方法,以便慢速子程序可以花费时间(最好是快速子机可以等待慢速子机完成)?
最佳答案
它不会暂停,因为setTimeout
不会阻止执行,它只是计划将工作安排在以后的时间(即2秒后),然后会有更多数据进入,并且计划在2秒后再加上一些微小的增量。结果是快订户和慢订户将同时完成,但是慢订户的结果要等到2秒后才能显示。
如果您的实际用例中的慢速订户确实是非阻塞的,那么您有两种方法可以控制事件的流,要么需要控制消息源的流,无论何处。或者您需要使用诸如controlled()
的背压运算符之一
var xmlNodeStream = Rx.Observable.from([1,2,3,4,5,6,7,8,9,10,11]);
var controller = xmlNodeStream.bufferWithCount(2).controlled();
var commJson = controller.publish().refCount();
var FastSubscriber = commJson.subscribe(
function (x) { console.log('----------\nFastSub: onNext: %s', x); },
function (e) { console.log('FastSub: onError: %s', e); },
function () { console.log('FastSub: onCompleted'); });
var slowSubscriber = commJson.subscribe(function (x) {
setTimeout(function () {
console.log("============\nSlowsub called: ", x);
controller.request(1);
}, 5000);
});
commJson.request(1);
关于javascript - RxJS bufferWithCount()不暂停超时,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/32097094/