问题描述
我正在编写一个Web爬虫,该爬虫正在为许多不同的域运行并行提取。我想限制每个单个域的每秒请求数,但是我不在乎打开的连接总数或每秒的请求总数跨越所有领域。 我希望整体上最大化打开的连接数和每秒请求数,同时限制对单个域的每秒请求数。
I am writing a web crawler that is running parallel fetches for many different domains. I want to limit the number of requests-per-second that are made to each individual domain, but I do not care about the total number of connections that are open, or the total requests per second that are made across all domains. I want to maximize the number of open connections and requests-per-second overall, while limiting the number of requests-per-second made to individual domains.
所有当前存在的示例中,我都可以找到(1)限制打开的连接数,或者(2)限制在访存循环中每秒发出的请求总数。示例包括:
All of the currently existing examples I can find either (1) limit the number of open connections or (2) limit the total number of requests-per-second made in the fetch loop. Examples include:
- aiohttp: rate limiting parallel requests
- aiohttp: set maximum number of requests per second
我要求的是在每个域的基础上限制每秒请求。第一个问题仅回答如何限制整体每秒请求数。第二个甚至没有实际问题的答案(OP每秒询问请求,而所有答案都讨论限制连接数)。
Neither of them do what I am requesting which is to limit requests-per-second on a per domain basis. The first question only answers how to limit requests-per-second overall. The second one doesn't even have answers to the actual question (the OP asks about requests per second and the answers all talk about limiting # of connections).
这是我尝试的代码,使用为同步版本制作的简单速率限制器,当DomainTimer代码在异步事件中运行时,该代码不起作用循环:
Here is the code that I tried, using a simple rate limiter I made for a synchronous version, which doesn't work when the DomainTimer code is run in an async event loop:
from collections import defaultdict
from datetime import datetime, timedelta
import asyncio
import async_timeout
import aiohttp
from urllib.parse import urlparse
from queue import Queue, Empty
from HTMLProcessing import processHTML
import URLFilters
SEED_URLS = ['http://www.bbc.co.uk', 'http://www.news.google.com']
url_queue = Queue()
for u in SEED_URLS:
url_queue.put(u)
# number of pages to download per run of crawlConcurrent()
BATCH_SIZE = 100
DELAY = timedelta(seconds = 1.0) # delay between requests from single domain, in seconds
HTTP_HEADERS = {'Referer': 'http://www.google.com',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:59.0) Gecko/20100101 Firefox/59.0'}
class DomainTimer():
def __init__(self):
self.timer = None
def resetTimer(self):
self.timer = datetime.now()
def delayExceeded(self, delay):
if not self.timer: #We haven't fetched this before
return True
if (datetime.now() - self.timer) >= delay:
return True
else:
return False
crawl_history = defaultdict(dict) # given a URL, when is last time crawled?
domain_timers = defaultdict(DomainTimer)
async def fetch(session, url):
domain = urlparse(url).netloc
print('here fetching ' + url + "\n")
dt = domain_timers[domain]
if dt.delayExceeded(DELAY) or not dt:
with async_timeout.timeout(10):
try:
dt.resetTimer() # reset domain timer
async with session.get(url, headers=HTTP_HEADERS) as response:
if response.status == 200:
crawl_history[url] = datetime.now()
html = await response.text()
return {'url': url, 'html': html}
else:
# log HTTP response, put into crawl_history so
# we don't attempt to fetch again
print(url + " failed with response: " + str(response.status) + "\n")
return {'url': url, 'http_status': response.status}
except aiohttp.ClientConnectionError as e:
print("Connection failed " + str(e))
except aiohttp.ClientPayloadError as e:
print("Recieved bad data from server @ " + url + "\n")
else: # Delay hasn't passed yet: skip for now & put @ end of q
url_queue.put(url);
return None
async def fetch_all(urls):
"""Launch requests for all web pages."""
tasks = []
async with aiohttp.ClientSession() as session:
for url in urls:
task = asyncio.ensure_future(fetch(session, url))
tasks.append(task) # create list of tasks
return await asyncio.gather(*tasks) # gather task responses
def batch_crawl():
"""Launch requests for all web pages."""
start_time = datetime.now()
# Here we build the list of URLs to crawl for this batch
urls = []
for i in range(BATCH_SIZE):
try:
next_url = url_queue.get_nowait() # get next URL from queue
urls.append(next_url)
except Empty:
print("Processed all items in URL queue.\n")
break;
loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
pages = loop.run_until_complete(fetch_all(urls))
crawl_time = (datetime.now() - start_time).seconds
print("Crawl completed. Fetched " + str(len(pages)) + " pages in " + str(crawl_time) + " seconds.\n")
return pages
def parse_html(pages):
""" Parse the HTML for each page downloaded in this batch"""
start_time = datetime.now()
results = {}
for p in pages:
if not p or not p['html']:
print("Received empty page")
continue
else:
url, html = p['url'], p['html']
results[url] = processHTML(html)
processing_time = (datetime.now() - start_time).seconds
print("HTML processing finished. Processed " + str(len(results)) + " pages in " + str(processing_time) + " seconds.\n")
return results
def extract_new_links(results):
"""Extract links from """
# later we could track where links were from here, anchor text, etc,
# and weight queue priority based on that
links = []
for k in results.keys():
new_urls = [l['href'] for l in results[k]['links']]
for u in new_urls:
if u not in crawl_history.keys():
links.append(u)
return links
def filterURLs(urls):
urls = URLFilters.filterDuplicates(urls)
urls = URLFilters.filterBlacklistedDomains(urls)
return urls
def run_batch():
pages = batch_crawl()
results = parse_html(pages)
links = extract_new_links(results)
for l in filterURLs(links):
url_queue.put(l)
return results
没有错误或异常抛出,并且限速代码在同步中正常工作大量抓取,但是在异步循环中运行DomainTimer时没有明显作用。每个域每秒无法请求一个延迟...
There are no errors or exceptions thrown, and the rate-limiting code works fine in for synchronous fetches, but the DomainTimer has no apparent effect when run in async loop. The delay of one request-per-second per domain is not upheld...
我如何修改此同步速率限制代码以在异步事件循环中工作?谢谢!
How would I modify this synchronous rate limiting code to work within the async event loop? Thanks!
推荐答案
由于代码中包含许多无关的内容,因此很难调试代码,在一个简单的新示例中更容易展示思想。
It's hard to debug your code since it contains many unrelated stuff, it's easier to show idea on a new simple example.
主要思想:
- 编写您的
信号量类似于
的类,使用__ aenter __
,__ aexit __
接受网址(域) - 使用特定于域的
Lock
可以防止对同一域的多次请求 - 在进入睡眠状态之前根据域的上次请求和RPS允许下一个请求
- 每个域的上次请求跟踪时间
- write your
Semaphore
-like class using__aenter__
,__aexit__
that accepts url (domain) - use domain-specific
Lock
to prevent multiple requests to the same domain - sleep before allowing next request according to domain's last request and RPS
- track time of last request for each domain
代码:
import asyncio
import aiohttp
from urllib.parse import urlparse
from collections import defaultdict
class Limiter:
# domain -> req/sec:
_limits = {
'httpbin.org': 4,
'eu.httpbin.org': 1,
}
# domain -> it's lock:
_locks = defaultdict(lambda: asyncio.Lock())
# domain -> it's last request time
_times = defaultdict(lambda: 0)
def __init__(self, url):
self._host = urlparse(url).hostname
async def __aenter__(self):
await self._lock
to_wait = self._to_wait_before_request()
print(f'Wait {to_wait} sec before next request to {self._host}')
await asyncio.sleep(to_wait)
async def __aexit__(self, *args):
print(f'Request to {self._host} just finished')
self._update_request_time()
self._lock.release()
@property
def _lock(self):
"""Lock that prevents multiple requests to same host."""
return self._locks[self._host]
def _to_wait_before_request(self):
"""What time we need to wait before request to host."""
request_time = self._times[self._host]
request_delay = 1 / self._limits[self._host]
now = asyncio.get_event_loop().time()
to_wait = request_time + request_delay - now
to_wait = max(0, to_wait)
return to_wait
def _update_request_time(self):
now = asyncio.get_event_loop().time()
self._times[self._host] = now
# request that uses Limiter instead of Semaphore:
async def get(url):
async with Limiter(url):
async with aiohttp.ClientSession() as session: # TODO reuse session for different requests.
async with session.get(url) as resp:
return await resp.text()
# main:
async def main():
coros = [
get('http://httpbin.org/get'),
get('http://httpbin.org/get'),
get('http://httpbin.org/get'),
get('http://httpbin.org/get'),
get('http://httpbin.org/get'),
get('http://eu.httpbin.org/get'),
get('http://eu.httpbin.org/get'),
get('http://eu.httpbin.org/get'),
get('http://eu.httpbin.org/get'),
get('http://eu.httpbin.org/get'),
]
await asyncio.gather(*coros)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
这篇关于aiohttp:按域限制每秒请求数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!