Python编程之异步爬虫
协程的基本原理
要实现异步机制的爬虫,自然和协程脱不了关系。
- 案例引入
先看一个案例网站,地址为https://www.httpbin.org/delay/5,访问这个链接需要先等5秒钟才能得到结果,这是因为服务器强制等待5秒时间才返回响应。下面来测试一下,用requests写一个遍历程序,直接遍历100次案例网站,看看效果,代码如下:
import requests
import logging
import time
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s:%(message)s')
TOTAL_NUMBER = 100
URL = 'https://www.httpbin.org/delay/5'
start_time = time.time()
for _ in range(1, TOTAL_NUMBER + 1):
logging.info('scraping %s', URL)
response = requests.get(URL)
end_time = time.time()
logging.info('total time %s seconds', end_time - start_time)
使用的是requests单线程,在爬取之前和爬取之后分别记录了时间,最后输出了爬取100个页面消耗的总时间。运行结果如下:
2024-03-23 18:45:12,159 - INFO:scraping https://www.httpbin.org/delay/5
2024-03-23 18:45:18,693 - INFO:scraping https://www.httpbin.org/delay/5
2024-03-23 18:45:24,865 - INFO:scraping https://www.httpbin.org/delay/5
2024-03-23 18:45:30,957 - INFO:scraping https://www.httpbin.org/delay/5
2024-03-23 18:45:37,544 - INFO:scraping https://www.httpbin.org/delay/5
….
….
2024-03-23 18:55:19,929 - INFO:scraping https://www.httpbin.org/delay/5
2024-03-23 18:55:26,069 - INFO:scraping https://www.httpbin.org/delay/5
2024-03-23 18:55:32,186 - INFO:total time 620.0276908874512 seconds
由于每个页面至少等待5秒钟,100个页面至少花费500秒,加上网站本身负载问题,总时间大约620秒,10分钟多。
- 基础知识
协程的基础概念
1. 阻塞和非阻塞:
- 阻塞:当一个任务执行时,如果需要等待某个操作完成才能继续执行,这个任务就会被阻塞。在阻塞状态下,任务无法执行其他操作。
- 非阻塞:相对于阻塞,非阻塞任务在等待某个操作完成时,可以继续执行其他操作。
2. 同步和异步:
- 同步:指的是程序按照代码顺序依次执行,一个操作完成之后才会进行下一个操作。
- 异步:异步编程允许程序在等待某个操作的同时继续执行其他操作,操作完成后通过回调或者事件通知来处理结果。
3. 多进程和协程:
- 多进程:每个进程有自己独立的内存空间,系统为每个进程分配资源,进程间通信开销较大。
- 协程:协程(coroutine)是一种轻量级的线程,可以看作是在同一个线程内部进行切换执行不同任务,共享同一个进程的资源,更高效利用 CPU 和内存。
协程的特点:
- 轻量级: 协程不需要像线程那样创建新的进程或者线程,因此比多线程的切换开销更小。
- 灵活性: 协程可以根据需要暂停和恢复执行,可以实现任务的合理调度。
- 高效性: 由于不需要进行系统调用、进程/线程切换,协程可以更高效地利用计算资源。
在 Python 中,使用 asyncio
库可以实现协程。通过 async
和 await
关键字可以定义异步函数和阻塞点,在适当的时机挂起和恢复函数的执行。
协程的优点在于它们可以解决异步编程中的并发性问题,并且能够提供更好的性能和资源利用率。通过合理地使用协程,可以实现高效的并发编程,尤其在 I/O 密集型应用中表现突出。
-
协程的用法
在 Python 中,可以使用
asyncio
库来实现协程。以下是协程的基本用法示例:- 定义一个异步函数
使用
async def
关键字定义一个异步函数,该函数可以包含await
表达式来挂起执行。import asyncio async def greet(): print("Hello") await asyncio.sleep(1) print("World")
b. 运行协程任务
使用
asyncio.run()
函数来运行协程任务,并且保证事件循环的创建和销毁。asyncio.run(greet())
c. 创建并发任务
使用
asyncio.create_task()
函数创建多个并发任务,让它们同时运行。async def task1(): print("Task 1 start") await asyncio.sleep(2) print("Task 1 end") async def task2(): print("Task 2 start") await asyncio.sleep(1) print("Task 2 end") async def main(): taskA = asyncio.create_task(task1()) taskB = asyncio.create_task(task2()) await taskA await taskB asyncio.run(main())
d. 并发等待多个任务完成
使用
asyncio.gather()
函数等待多个任务完成后再继续执行。async def main(): tasks = [task1(), task2()] await asyncio.gather(*tasks) asyncio.run(main())
e. 异步IO操作
在协程中可以进行异步的IO操作,例如网络请求、文件读写等操作,以提高应用程序的性能和效率。
通过上述示例,您可以了解到如何定义、运行和管理协程,以及如何利用协程来处理并发任务和异步IO操作。在实际应用中,协程可以帮助降低资源消耗,提高程序响应性,并简化复杂的并发编程任务。
-
定义协程
import asyncio async def execute(x): print('Number:', x) coroutine = execute(1) print('Coroutine:', coroutine) print('After calling excute') loop = asyncio.get_event_loop() loop.run_until_complete(coroutine) print('After calling loop') 运行结果如下: Coroutine: <coroutine object execute at 0x10f5b37c0> After calling excute Number: 1 After calling loop
导入asyncio包,这样才可以使用async和await关键字。然后使用async定义一个execute方法,该方法接收一个数字参数x,执行之后会打印这个数字。
随后直接执行execute方法,然而这个方法没有执行,而是返回了一个coroutine协程对象。之后我们使用了get_event_loop方法创建了一个事件循环loop,调用loop对象的run_until_complete方法将协程对象注册到了事件循环中,接着启动。可见,async定义的方法会变成一个无法直接执行的协程对象,必须将此对象注册到事件循环中才可以执行。
当我们把协程对象coroutine传递给run_until_complete方法的时候,实际上它进行了一个操作,就是将coroutine封装成task对象。显示声明,代码如下:
import asyncio async def execute(x): print('Number:', x) return x coroutine = execute(1) print('Coroutine:', coroutine) print('After calling execute') loop = asyncio.get_event_loop() task = loop.create_task(coroutine) print('Task:',task) loop.run_until_complete(task) print('Task:', task) print('After calling loop') 运行结果如下: Coroutine: <coroutine object execute at 0x10faf37c0> After calling execute Task: <Task pending name='Task-1' coro=<execute() running at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/第6章异步爬虫/协程用法4.py:3>> Number: 1 Task: <Task finished name='Task-1' coro=<execute() done, defined at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/第6章异步爬虫/协程用法4.py:3> result=1> After calling loop
定义task对象还有另外一种方法,就是直接调用asyncio包的ensure_future方法,返回结果也是task对象,写法如下:
import asyncio async def execute(x): print('Number:', x) return x coroutine = execute(1) print('Coroutine:', coroutine) print('After calling execute') task = asyncio.ensure_future(coroutine) print('Task:', task) loop = asyncio.get_event_loop() loop.run_until_complete(task) print('Task:', task) print('After calling loop') 运行结果如下: Coroutine: <coroutine object execute at 0x10c3737c0> After calling execute Task: <Task pending name='Task-1' coro=<execute() running at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/第6章异步爬虫/协程用法5.py:3>> Number: 1 Task: <Task finished name='Task-1' coro=<execute() done, defined at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/第6章异步爬虫/协程用法5.py:3> result=1> After calling loop
-
绑定回调
为某个task对象绑定一个回调方法,如下所示:
import asyncio import requests async def request(): url = 'https://www.baidu.com' status = requests.get(url) return status def callback(task): print('Status:', task.result()) coroutine = request() task = asyncio.ensure_future(coroutine) task.add_done_callback(callback) print('Task:', task) loop = asyncio.get_event_loop() loop.run_until_complete(task) print('Task:', task)
定义了request方法,在这个方法里请求了百度,并获取了其状态码,随后我们定义了callback方法,这个方法接收一个参数,参数是task对象,在这个方法中调用print方法打印出task对象的结果。这样就定义好了一个协程对象和一个回调方法,我们希望达到的效果是,当协程对象执行完毕后,就去执行声明的callback方法。如何关联的呢?只要调用add_done_callback方法就行。将callback方法传递给封装好的task对象。这样当task执行完之后,就可以调用callback方法了。同时task对象还会作为参数传递给callback方法,调用task对象的result方法就可以获取返回结果了。运行结果如下:
Task: <Task pending name='Task-1' coro=<request() running at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/第6章异步爬虫/绑定回调.py:4> cb=[callback() at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/第6章异步爬虫/绑定回调.py:9]> status: <Response [200]> task: <Task finished name='Task-1' coro=<request() done, defined at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/第6章异步爬虫/绑定回调.py:4> result=<Response [200]>>
实际上,即使不使用回调方法,在task运行完毕后,也可以直接调用result方法获取结果,代码如下:
import asyncio import requests async def request(): url = 'https://www.baidu.com' status = requests.get(url) return status coroutine = request() task = asyncio.ensure_future(coroutine) print('Task:', task) loop = asyncio.get_event_loop() loop.run_until_complete(task) print('Task:', task) print('Task Result:', task.result()) 运行结果如下: Task: <Task pending name='Task-1' coro=<request() running at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/第6章异步爬虫/绑定回调1.py:5>> Task: <Task finished name='Task-1' coro=<request() done, defined at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/第6章异步爬虫/绑定回调1.py:5> result=<Response [200]>> Task Result: <Response [200]>
-
多任务协程
如果想执行多次请求,应该怎么办?可以定义一个task列表,然后使用asyncio包中的wait方法执行,如下所示:
import asyncio import requests async def request(): url = 'https://www.baidu.com' status = requests.get(url) return status tasks = [asyncio.ensure_future(request()) for _ in range(5)] print('Tasks:', tasks) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) for task in tasks: print('Task Result:', task.result()) 运行结果如下: Tasks: [<Task pending name='Task-1' coro=<request() running at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/第6章异步爬虫/多任务协程.py:5>>, <Task pending name='Task-2' coro=<request() running at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/第6章异步爬虫/多任务协程.py:5>>, <Task pending name='Task-3' coro=<request() running at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/第6章异步爬虫/多任务协程.py:5>>, <Task pending name='Task-4' coro=<request() running at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/第6章异步爬虫/多任务协程.py:5>>, <Task pending name='Task-5' coro=<request() running at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/第6章异步爬虫/多任务协程.py:5>>] Task Result: <Response [200]> Task Result: <Response [200]> Task Result: <Response [200]> Task Result: <Response [200]> Task Result: <Response [200]>
-
协程实现
协程在解决IO密集型任务方面的优势,耗时等待一般都是IO操作,例如文件读取、网络请求等。协程在处理这种操作时是有很大优势的,当遇到需要等待的情况时,程序可以暂时挂起,转而执行其他操作,避免浪费时间。
以https://www.httpbin.org/delay/5为例,体验一下协程的效果。示例代码如下:
import asyncio import requests import time start = time.time() async def request(): url = 'https://www.httpbin.org/delay/5' print('waiting for', url) response = requests.get(url) print('Get response from', url, 'response', response) tasks = [asyncio.ensure_future(request()) for _ in range(10)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print('Cost time:', end - start) 运行结果如下: waiting for https://www.httpbin.org/delay/5 Get response from https://www.httpbin.org/delay/5 response <Response [200]> waiting for https://www.httpbin.org/delay/5 Get response from https://www.httpbin.org/delay/5 response <Response [200]> waiting for https://www.httpbin.org/delay/5 Get response from https://www.httpbin.org/delay/5 response <Response [200]> ... waiting for https://www.httpbin.org/delay/5 Get response from https://www.httpbin.org/delay/5 response <Response [200]> waiting for https://www.httpbin.org/delay/5 Get response from https://www.httpbin.org/delay/5 response <Response [200]> waiting for https://www.httpbin.org/delay/5 Get response from https://www.httpbin.org/delay/5 response <Response [200]> Cost time: 63.61974787712097
可以发现,与正常的顺序请求没有啥区别。那么异步处理的优势呢?要实现异步处理,先得有挂起操作,当一个任务需要等待IO结果的时候,可以挂起当前任务,转而执行其他任务,这样才能充分利用好资源。
-
使用aiohttp
aiohttp是一个支持异步请求的库,它和asyncio配合使用,可以使我们非常方便地实现异步请求操作。
aiohttp分为两部分:一部分是Client,一部分是Server。
下面我们将aiohttp投入使用,将代码改成如下:
import asyncio import aiohttp import time start = time.time() async def get(url): session = aiohttp.ClientSession() response = await session.get(url) await response.text() await session.close() return response async def request(): url = 'https://www.httpbin.org/delay/5' print('Waiting for', url) response = await get(url) print('Get response from', url, 'response', response) tasks = [asyncio.ensure_future(request()) for _ in range(10)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print('Cost time:', end - start) 运行结果如下: Waiting for https://www.httpbin.org/delay/5 Waiting for https://www.httpbin.org/delay/5 Waiting for https://www.httpbin.org/delay/5 Waiting for https://www.httpbin.org/delay/5 Waiting for https://www.httpbin.org/delay/5 Waiting for https://www.httpbin.org/delay/5 ... Get response from https://www.httpbin.org/delay/5 response <ClientResponse(https://www.httpbin.org/delay/5) [200 OK]> <CIMultiDictProxy('Date': 'Sat, 23 Mar 2024 13:42:05 GMT', 'Content-Type': 'application/json', 'Content-Length': '367', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')> Get response from https://www.httpbin.org/delay/5 response <ClientResponse(https://www.httpbin.org/delay/5) [200 OK]> <CIMultiDictProxy('Date': 'Sat, 23 Mar 2024 13:42:05 GMT', 'Content-Type': 'application/json', 'Content-Length': '367', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')> ... Get response from https://www.httpbin.org/delay/5 response <ClientResponse(https://www.httpbin.org/delay/5) [200 OK]> <CIMultiDictProxy('Date': 'Sat, 23 Mar 2024 13:42:05 GMT', 'Content-Type': 'application/json', 'Content-Length': '367', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')> Get response from https://www.httpbin.org/delay/5 response <ClientResponse(https://www.httpbin.org/delay/5) [200 OK]> <CIMultiDictProxy('Date': 'Sat, 23 Mar 2024 13:42:05 GMT', 'Content-Type': 'application/json', 'Content-Length': '367', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')> Cost time: 6.868626832962036
这里将请求库由requests改成了aiohttp,利用aiohttp库里ClientSession类的get方法进行请求。
测试一下并发量分别为1、3、5、10、….、500时的耗时情况,代码如下:
import asyncio import aiohttp import time def test(number): start = time.time() async def get(url): session = aiohttp.ClientSession() response = await session.get(url) await response.text() await session.close() return response async def request(): url = 'https://www.baidu.com/' await get(url) tasks = [asyncio.ensure_future(request()) for _ in range(number)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print('Number:', number, 'Cost time:', end - start) for number in [1, 3, 5, 10, 15, 30, 50, 75, 100, 200, 500]: test(number) 运行结果如下: Number: 1 Cost time: 0.23929095268249512 Number: 3 Cost time: 0.19086170196533203 Number: 5 Cost time: 0.20035600662231445 Number: 10 Cost time: 0.21305394172668457 Number: 15 Cost time: 0.25495195388793945 Number: 30 Cost time: 0.769071102142334 Number: 50 Cost time: 0.3470029830932617 Number: 75 Cost time: 0.4492309093475342 Number: 100 Cost time: 0.586918830871582 Number: 200 Cost time: 1.0910720825195312 Number: 500 Cost time: 2.4768006801605225