问题描述
我需要对某些外部 API 执行循环调用并有一些延迟,以防止超出用户速率限制"限制.
I need to perform a cyclic call to some external API with some delay, to prevent from 'User Rate Limit Exceeded' restriction.
Google Maps Geocoding API 对req/sec"敏感,允许 10 req/sec.我应该为数百个联系人进行地理编码,而这样的延迟是必需的.所以,我需要有 10 个异步地理编码函数,每个函数都有 1 秒的后延迟.因此,我收集数组中的所有联系人,然后以异步方式遍历数组.
Google Maps Geocoding API is sensitive to 'req/sec', allowing 10 req/sec. I should make geocoding for hundreds of my contacts, and such delay is required. So, I need have a 10 async geocoding functions with post-delay in 1 sec for each. So, I collect all contacts in array, and then I loop through array in async manner.
通常,我需要有 N 个并发线程,每个线程结束时延迟 D 毫秒.整个循环遍历用户实体数组.像往常一样,每个线程处理单个实体.
Generally, I need to have a N simultaneous threads, with a delay in D msecs in the end of each thread. Entire loop iterates over an array of User entities. Each thread process single entity, as usual.
我想有一个类似的代码:
I suppose to have a code like:
const N = 10; # threads count
const D = 1000; # delay after each execution
var processUser = function(user, callback){
someBusinessLogicProc(user, function(err) {
setTimeout(function() {
return callback(err);
}, D);
});
}
var async = require('async') ;
var people = new Array(900);
async.batchMethod(people, processUser, N, finalCallback);
在这个伪代码中,batchMethod
是我要求的方法.
推荐答案
延迟结果并不是您真正想要的.相反,您希望跟踪您发送的内容以及发送时间,以便一旦您处于每秒请求数的范围内,您就可以发送另一个请求.
Putting a delay on the results is not really what you want. Instead, you want to keep track of what you've sent and when you sent it so as soon as you fall under the requests per second boundary, you can send another request.
这是一个函数的一般概念,它将控制速率限制为每秒固定数量的请求.这使用了promise,并要求您提供一个返回promise 的请求函数(如果您现在不使用promise,则只需将请求函数包装在promise 中).
Here's a general concept for a function that will control rate limiting for you to a fixed number of requests per second. This uses promises and requires that you supply a request function that returns a promise (if you aren't using promises now, you just need to wrap your request function in a promise).
// pass the following arguments:
// array - array of values to iterate
// requestsPerSec - max requests per second to send (integer)
// maxInFlight - max number of requests in process at a time
// fn - function to process an array value
// function is passed array element as first argument
// function returns a promise that is resolved/rejected when async operation is done
// Returns: promise that is resolved with an array of resolves values
// or rejected with first error that occurs
function rateLimitMap(array, requestsPerSec, maxInFlight, fn) {
return new Promise(function(resolve, reject) {
var index = 0;
var inFlightCntr = 0;
var doneCntr = 0;
var launchTimes = [];
var results = new Array(array.length);
// calculate num requests in last second
function calcRequestsInLastSecond() {
var now = Date.now();
// look backwards in launchTimes to see how many were launched within the last second
var cnt = 0;
for (var i = launchTimes.length - 1; i >= 0; i--) {
if (now - launchTimes[i] < 1000) {
++cnt;
} else {
break;
}
}
return cnt;
}
function runMore() {
while (index < array.length && inFlightCntr < maxInFlight && calcRequestsInLastSecond() < requestsPerSec) {
(function(i) {
++inFlightCntr;
launchTimes.push(Date.now());
fn(array[i]).then(function(val) {
results[i] = val;
--inFlightCntr;
++doneCntr;
runMore();
}, reject);
})(index);
++index;
}
// see if we're done
if (doneCntr === array.length) {
resolve(results);
} else if (launchTimes.length >= requestsPerSec) {
// calc how long we have to wait before sending more
var delta = 1000 - (Date.now() - launchTimes[launchTimes.length - requestsPerSec]);
if (delta >= 0) {
setTimeout(runMore, ++delta);
}
}
}
runMore();
});
}
示例用法:
rateLimitMap(inputArrayToProcess, 9, 20, myRequestFunc).then(function(results) {
// process array of results here
}, function(err) {
// process error here
});
这个函数的一个更高级的版本叫做 rateMap()
是 在 Github 上.
A more advanced version of this function called rateMap()
is here on Github.
这段代码背后的总体思路是:
The general idea behind this code is this:
- 你传入一个数组来迭代
- 它返回一个承诺,其解析值是一个结果数组(按顺序)
- 您传递的最大请求数为每次命中
- 您同时传递了最大数量的请求
- 您传递一个函数,该函数将从正在迭代的数组中传递一个元素,并且必须返回一个承诺
- 它保存了上次发送请求时的时间戳数组.
- 要查看是否可以发送另一个请求,它会在数组中向后查看并计算上一秒发送的请求数.
- 如果该数字低于阈值,则发送另一个.
- 如果该数字达到阈值,则它会计算您必须等待多长时间才能发送另一个,并为该时间设置计时器.
- 完成每个请求后,它会检查是否可以发送更多请求
- 如果任何请求拒绝其承诺,则返回的承诺会立即拒绝.如果您不希望它在第一个错误时停止,那么修改您传入的函数以不拒绝,而是使用一些值进行解析,您可以稍后在处理结果时将其识别为失败的请求.
这是一个工作模拟:https://jsfiddle.net/jfriend00/3gr0tq7k/
注意:如果你传入的maxInFlight
值高于requestsPerSec
值,那么这个函数基本上只会发送requestsPerSec请求然后一秒后发送另一个requestsPerSec 请求,因为这是保持在 requestsPerSec
边界下的最快方法.如果 maxInFlight
值等于或低于 requestsPerSec
,那么它会发送 requestsPerSec
然后当每个请求完成时,它会看看它是否可以再发一个.
Note: If the maxInFlight
value you pass in is higher than the requestsPerSec
value, then this function will basically just send requestsPerSec requests and then one second later, send another requestsPerSec requests since that's the quickest way to stay under the requestsPerSec
boundary. If the maxInFlight
value is the same or lower than requestsPerSec
then it will send requestsPerSec
and then as each request finishes, it will see if it can send another one.
这篇关于为最大请求数/秒的批处理选择合适的异步方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!