如何在全局范围内限制

如何在全局范围内限制

本文介绍了Python API 速率限制 - 如何在全局范围内限制 API 调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试限制代码中的 API 调用.我已经找到了一个不错的 python 库 ratelimiter==1.0.2.post0https://pypi.python.org/pypi/ratelimiter

I'm trying to restrict the API calls in my code. I already found a nice python library ratelimiter==1.0.2.post0https://pypi.python.org/pypi/ratelimiter

然而,这个库只能限制本地范围内的速率.即)在函数和循环中

However, this library can only limit the rate in local scope. i.e) in function and loops

# Decorator
@RateLimiter(max_calls=10, period=1)
def do_something():
    pass


# Context Manager
rate_limiter = RateLimiter(max_calls=10, period=1)

for i in range(100):
    with rate_limiter:
        do_something()

因为我有几个函数在不同的地方进行 API 调用,所以我想将 API 调用限制在 全局 范围内.

Because I have several functions, which make API calls, in different places, I want to limit the API calls in global scope.

例如,假设我想将 API 调用限制为每秒一次.而且,假设我有函数 xy,其中进行了两个 API 调用.

For example, suppose I want to limit the APIs call to one time per second. And, suppose I have functions x and y in which two API calls are made.

@rate(...)
def x():
   ...

@rate(...)
def y():
   ...

通过使用 limiter 修饰函数,我可以限制两个函数的速率.

By decorating the functions with the limiter, I'm able to limit the rate against the two functions.

但是,如果我顺序执行上述两个函数,它会失去对全局范围内 API 调用次数的跟踪,因为它们彼此不知道.因此,y 将在 x 执行后立即调用,无需再等待一秒钟.而且,这将违反每秒一次限制.

However, if I execute the above two functions sequentially, it looses track of the number of API calls in global scope because they are unaware of each other. So, y will be called right after the execution of x without waiting another second. And, this will violate the one time per second restriction.

有什么方法或库可以用来在 python 中全局限制速率?

Is there any way or library that I can use to limit the rate globally in python?

推荐答案

毕竟,我实现了自己的 Throttler 类.通过将每个 API 请求代理到 request 方法,我们可以跟踪所有 API 请求.利用传递函数作为request方法参数,它还缓存了结果以减少API调用.

After all, I implemented my own Throttler class. By proxying every API request to the request method, we can keep track of all API requests. Taking advantage of passing function as the request method parameter, it also caches the result in order to reduce API calls.

class TooManyRequestsError(Exception):
    def __str__(self):
        return "More than 30 requests have been made in the last five seconds."


class Throttler(object):
    cache = {}

    def __init__(self, max_rate, window, throttle_stop=False, cache_age=1800):
        # Dict of max number of requests of the API rate limit for each source
        self.max_rate = max_rate
        # Dict of duration of the API rate limit for each source
        self.window = window
        # Whether to throw an error (when True) if the limit is reached, or wait until another request
        self.throttle_stop = throttle_stop
        # The time, in seconds, for which to cache a response
        self.cache_age = cache_age
        # Initialization
        self.next_reset_at = dict()
        self.num_requests = dict()

        now = datetime.datetime.now()
        for source in self.max_rate:
            self.next_reset_at[source] = now + datetime.timedelta(seconds=self.window.get(source))
            self.num_requests[source] = 0

    def request(self, source, method, do_cache=False):
        now = datetime.datetime.now()

        # if cache exists, no need to make api call
        key = source + method.func_name
        if do_cache and key in self.cache:
            timestamp, data = self.cache.get(key)
            logging.info('{} exists in cached @ {}'.format(key, timestamp))

            if (now - timestamp).seconds < self.cache_age:
                logging.info('retrieved cache for {}'.format(key))
                return data

        # <--- MAKE API CALLS ---> #

        # reset the count if the period passed
        if now > self.next_reset_at.get(source):
            self.num_requests[source] = 0
            self.next_reset_at[source] = now + datetime.timedelta(seconds=self.window.get(source))

        # throttle request
        def halt(wait_time):
            if self.throttle_stop:
                raise TooManyRequestsError()
            else:
                # Wait the required time, plus a bit of extra padding time.
                time.sleep(wait_time + 0.1)

        # if exceed max rate, need to wait
        if self.num_requests.get(source) >= self.max_rate.get(source):
            logging.info('back off: {} until {}'.format(source, self.next_reset_at.get(source)))
            halt((self.next_reset_at.get(source) - now).seconds)

        self.num_requests[source] += 1
        response = method()  # potential exception raise

        # cache the response
        if do_cache:
            self.cache[key] = (now, response)
            logging.info('cached instance for {}, {}'.format(source, method))

        return response

这篇关于Python API 速率限制 - 如何在全局范围内限制 API 调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 16:57