我试图控制缓慢的订户的流入。在NodeJS中尝试了以下内容

var xmlNodeStream = Rx.Observable.from([1,2,3,4,5,6,7,8,9,10,11]);

var commJson = xmlNodeStream.bufferWithCount(2).publish();

var FastSubscriber = commJson.subscribe(
      function (x) { console.log('----------\nFastSub: onNext: %s', x); },
      function (e) { console.log('FastSub: onError: %s', e); },
      function () { console.log('FastSub: onCompleted'); });

var slowSubscriber = commJson.subscribe(function (x) {
    setTimeout(function () { console.log("============\nSlowsub called: ", x); }, 5000);
});

commJson.connect();


当我run the above code时,我希望慢速的订户每次都暂停5秒,然后再接收下一个数据批。

但这没有发生。最初的5秒延迟后,所有数据将以2为批次泛洪到slowSubscriber

什么是控制流入的正确方法,以便慢速子程序可以花费时间(最好是快速子机可以等待慢速子机完成)?

最佳答案

它不会暂停,因为setTimeout不会阻止执行,它只是计划将工作安排在以后的时间(即2秒后),然后会有更多数据进入,并且计划在2秒后再加上一些微小的增量。结果是快订户和慢订户将同时完成,但是慢订户的结果要等到2秒后才能显示。

如果您的实际用例中的慢速订户确实是非阻塞的,那么您有两种方法可以控制事件的流,要么需要控制消息源的流,无论何处。或者您需要使用诸如controlled()的背压运算符之一

var xmlNodeStream = Rx.Observable.from([1,2,3,4,5,6,7,8,9,10,11]);

var controller = xmlNodeStream.bufferWithCount(2).controlled();
var commJson = controller.publish().refCount();

var FastSubscriber = commJson.subscribe(
      function (x) { console.log('----------\nFastSub: onNext: %s', x); },
      function (e) { console.log('FastSub: onError: %s', e); },
      function () { console.log('FastSub: onCompleted'); });

var slowSubscriber = commJson.subscribe(function (x) {
    setTimeout(function () {
                console.log("============\nSlowsub called: ", x);
                controller.request(1);
               }, 5000);
});

commJson.request(1);

关于javascript - RxJS bufferWithCount()不暂停超时,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/32097094/

10-09 20:27
查看更多