本文介绍了rxjs 速率限制(每秒请求数)和并发的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想弄清楚如何在 rxjs 中编写速率限制器.用于访问大多数 api(twitter、facebook 等)如果开箱即用的方法不支持,我认为可以编写调度程序.例如 highland.js 有 ratelimit.我不想丢弃任何物品,例如窗口、样品等.

I am trying to figure out how to write a rate limiter in rxjs. Used to access most apis (twitter, facebook, etc) If not supported by out of the box methods, i would assume a scheduler could be written. For instance highland.js has ratelimit. I don't want to drop any items like with window, sample, etc.

var source = Rx.Observable.create(function (observer) {

  // queue of requests
  _.each(requests, function(r) {
    observer.onNext(r);
  });

  observer.onCompleted();

  // Any cleanup logic might go here
  return function () {
    console.log('disposed');
  }
})
  // what goes here, if built in (e.g. 2 requests per 2 seconds or 15 request per 15 minutes)

// SHOULD ONLY RUN
var subscription = source.subscribe(
  function (x) { console.log('onNext: %s', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); });

编辑 1:想着这样的事情,使用令牌桶算法,还是很粗糙但是...

EDIT 1:Thinking about something like this, using the token bucket algorithm, still really rough but...

Rx.Observable.prototype.tokenBucket = function(options, scheduler) {
  function time() {
    return new Date().getTime();
  }

  var BUCKET = {
    capacity: options.capacity || Infinity,
    left: options.capacity,
    last: time(),

    tokensPerInterval: options.tokensPerInterval,
    interval: options.interval
  };

  //var BUCKET = _.merge(defaultOptions, options);
  console.log(BUCKET);

  var source = this,
    scheduler = scheduler || (scheduler = Rx.Scheduler.timeout);

  return Rx.Observable.create(function(observer) {
    var d1 = source.subscribe(function(mainValue) {
      return throttle(mainValue);
    });

    function throttle(x, tokens) {
      if (BUCKET.capacity === Infinity) {
        return observer.onNext(x);
      } // return x;

      // the number of tokens to add every S milliseconds = (r*S)/1000.
      var self = BUCKET;
      var now = time();

      var deltaMS = Math.max(now - self.last, 0);
      self.last = now;
      var dripAmount = deltaMS * (self.tokensPerInterval / self.interval);
      self.left = Math.min(self.left + dripAmount, self.capacity);

      if (self.left < 1) {
        var interval = Math.ceil((1 - self.left) * self.interval);
        scheduler.scheduleWithRelative(interval, function (s, i) {
          return throttle(x);
        });
      } else {
        self.left -= tokens || 1;
        console.log('calling');
        return observer.onNext(x);
      }
    }

    return function() {
      d1.dispose();
      console.log('disposed tokenBucket');
    };
  });
};

var start = moment();
var source = Rx.Observable.range(1, 20)
  .tokenBucket({capacity: 2, tokensPerInterval: 2, interval: 2000})

var subscription = source.subscribe(
  function (x) { console.log('onNext: %s', x); addToDom(x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); });

function addToDom(x) {
  var ul = document.getElementById('c');
  var li = document.createElement('li');
  li.innerHTML = x + ' - ' + moment().diff(start, 'seconds') + 's ago';
  ul.appendChild(li);
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/moment.js/2.10.3/moment.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.3/rx.all.js"></script>
<ul id="c"></ul>

推荐答案

我在我的个人项目中遇到了一个非常相似的问题,并决定发布一个可重用的解决方案作为 npm 包 https://www.npmjs.com/package/rx-op-lossless-throttle

I've faced a very similar issue in my personal project and decided to publish a reusable solution as a npm package https://www.npmjs.com/package/rx-op-lossless-throttle

不同于http://www.g9labs.com/2016/03/21/lossless-rate-limiting-with-rxjs/ 它不会强制延迟每个事件.

Unlike http://www.g9labs.com/2016/03/21/lossless-rate-limiting-with-rxjs/ it doesn't force the delay on every single event.

这篇关于rxjs 速率限制(每秒请求数)和并发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 16:57