简介

aiohttp需要python3.5.3以及更高的版本,它不但能做客户端爬虫,也能做服务器端,利用asyncio,协程,十分高效

官方文档

采集模板

一批,一次性采集

import asyncio
import logging
import time
from aiohttp import ClientSession, ClientTimeout logging.basicConfig(level=logging.INFO, format='[%(asctime)s] - %(levelname)s in %(filename)s.%(funcName)s: %(message)s') # 默认请求头
HEADERS = {
'accept': 'text/javascript, text/html, application/xml, text/xml, */*',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/69.0.3497.100 Safari/537.36',
} # 默认超时时间
TIMEOUT = 15 class AioCrawl: def __init__(self):
self.logger = logging.getLogger(__name__) async def fetch(self, url, method='GET', headers=None, timeout=TIMEOUT, cookies=None, data=None):
"""采集纤程""" method = 'POST' if method.upper() == 'POST' else 'GET'
headers = headers if headers else HEADERS
timeout = ClientTimeout(total=timeout)
cookies = cookies if cookies else None
data = data if data and isinstance(data, dict) else {} async with ClientSession(headers=headers, timeout=timeout, cookies=cookies) as session:
try:
if method == 'GET':
async with session.get(url) as response:
return await response.read()
else:
async with session.post(url, data=data) as response:
return await response.read()
except Exception as e:
raise e def prepare_fetch(self, urls):
"""准备future_list"""
return [asyncio.ensure_future(self.fetch(url)) for url in urls] def crawl_batch_urls(self, urls):
"""执行采集"""
future_list = self.prepare_fetch(urls) loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(future_list)) self.logger.info('采集完一批: {}'.format(len(urls)))
return future_list if __name__ == '__main__':
a = AioCrawl()
# 2-4秒
t0 = time.time()
future_list = a.crawl_batch_urls(['https://www.sina.com.cn' for _ in range(5)])
print(time.time() - t0) for future in future_list:
if future.exception():
print(future.exception())
else:
print(len(future.result()))

动态添加任务

import asyncio
import time
from threading import Thread from aiohttp import ClientSession, ClientTimeout # 默认请求头
HEADERS = {
'accept': 'text/javascript, text/html, application/xml, text/xml, */*',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/69.0.3497.100 Safari/537.36',
} # 默认超时时间
TIMEOUT = 15 def start_loop(loop):
"""驱动事件循环"""
asyncio.set_event_loop(loop)
loop.run_forever() async def fetch(url, method='GET', headers=None, timeout=TIMEOUT, cookies=None, data=None):
"""采集纤程"""
print(url)
method = 'POST' if method.upper() == 'POST' else 'GET'
headers = headers if headers else HEADERS
timeout = ClientTimeout(total=timeout)
cookies = cookies if cookies else None
data = data if data and isinstance(data, dict) else {}
async with ClientSession(headers=headers, timeout=timeout, cookies=cookies) as session:
try:
if method == 'GET':
async with session.get(url) as response:
content = await response.read()
return response.status, content
else:
async with session.post(url, data=data) as response:
content = await response.read()
return response.status, content
except Exception as e:
raise e def callback(future):
"""回调函数"""
try:
print(future.result())
except Exception as e:
print(e)
print(type(future))
print(future) if __name__ == '__main__':
# 启动事件循环
loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(loop,))
t.setDaemon(True)
t.start() f = asyncio.run_coroutine_threadsafe(fetch('https://www.sina.com.cn'), loop)
f.add_done_callback(callback) # 给future对象添加回调函数 time.sleep(5) # 否则看不到结果

动态添加任务,封装成类

import asyncio
import logging
import time
from threading import Thread
from aiohttp import ClientSession, ClientTimeout, TCPConnector # 默认请求头
HEADERS = {
'accept': 'text/javascript, text/html, application/xml, text/xml, */*',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36',
} # 默认超时时间
TIMEOUT = 15 def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever() class AioCrawl: def __init__(self):
self.logger = logging.getLogger(__name__) # 启动事件循环
self.event_loop = asyncio.new_event_loop()
self.t = Thread(target=start_loop, args=(self.event_loop,))
self.t.setDaemon(True)
self.t.start() self.concurrent = 0 # 记录并发数 async def fetch(self, url, method='GET', headers=None, timeout=TIMEOUT, cookies=None, data=None, proxy=None):
"""采集纤程
:param url: str
:param method: 'GET' or 'POST'
:param headers: dict()
:param timeout: int
:param cookies:
:param data: dict()
:param proxy: str
:return: (status, content)
""" method = 'POST' if method.upper() == 'POST' else 'GET'
headers = headers if headers else HEADERS
timeout = ClientTimeout(total=timeout)
cookies = cookies if cookies else None
data = data if data and isinstance(data, dict) else {} tcp_connector = TCPConnector(verify_ssl=False) # 禁用证书验证
async with ClientSession(headers=headers, timeout=timeout, cookies=cookies, connector=tcp_connector) as session:
try:
if method == 'GET':
async with session.get(url, proxy=proxy) as response:
content = await response.read()
return response.status, content
else:
async with session.post(url, data=data, proxy=proxy) as response:
content = await response.read()
return response.status, content
except Exception as e:
raise e def callback(self, future):
"""回调函数
1.处理并转换成Result对象
2.写数据库
"""
msg = str(future.exception()) if future.exception() else 'success'
code = 1 if msg == 'success' else 0
status = future.result()[0] if code == 1 else None
data = future.result()[1] if code == 1 else b'' # 空串 data_len = len(data) if data else 0
if code == 0 or (status is not None and status != 200): # 打印小异常
self.logger.warning('<url="{}", code={}, msg="{}", status={}, data(len):{}>'.format(
future.url, code, msg, status, data_len)) self.concurrent -= 1 # 并发数-1 print(len(data)) def add_tasks(self, tasks):
"""添加任务
:param tasks: list <class Task>
:return: future
"""
for task in tasks:
# asyncio.run_coroutine_threadsafe 接收一个协程对象和,事件循环对象
future = asyncio.run_coroutine_threadsafe(self.fetch(task), self.event_loop)
future.add_done_callback(self.callback) # 给future对象添加回调函数
self.concurrent += 1 # 并发数加 1 if __name__ == '__main__':
a = AioCrawl() for _ in range(5):
a.add_tasks(['https://www.sina.com.cn' for _ in range(2)]) # 模拟动态添加任务
time.sleep(1)
05-11 03:19