Python编程之异步爬虫

协程的基本原理

要实现异步机制的爬虫,自然和协程脱不了关系。

  1. 案例引入

先看一个案例网站,地址为https://www.httpbin.org/delay/5,访问这个链接需要先等5秒钟才能得到结果,这是因为服务器强制等待5秒时间才返回响应。下面来测试一下,用requests写一个遍历程序,直接遍历100次案例网站,看看效果,代码如下:

import requests
import logging
import time

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s:%(message)s')

TOTAL_NUMBER = 100
URL = 'https://www.httpbin.org/delay/5'
start_time = time.time()
for _ in range(1, TOTAL_NUMBER + 1):
    logging.info('scraping %s', URL)
    response = requests.get(URL)

end_time = time.time()
logging.info('total time %s seconds', end_time - start_time)

使用的是requests单线程,在爬取之前和爬取之后分别记录了时间,最后输出了爬取100个页面消耗的总时间。运行结果如下:

2024-03-23 18:45:12,159 - INFO:scraping https://www.httpbin.org/delay/5
2024-03-23 18:45:18,693 - INFO:scraping https://www.httpbin.org/delay/5
2024-03-23 18:45:24,865 - INFO:scraping https://www.httpbin.org/delay/5
2024-03-23 18:45:30,957 - INFO:scraping https://www.httpbin.org/delay/5
2024-03-23 18:45:37,544 - INFO:scraping https://www.httpbin.org/delay/5
….
….

2024-03-23 18:55:19,929 - INFO:scraping https://www.httpbin.org/delay/5
2024-03-23 18:55:26,069 - INFO:scraping https://www.httpbin.org/delay/5
2024-03-23 18:55:32,186 - INFO:total time 620.0276908874512 seconds

由于每个页面至少等待5秒钟,100个页面至少花费500秒,加上网站本身负载问题,总时间大约620秒,10分钟多。

  1. 基础知识

协程的基础概念

1. 阻塞和非阻塞:

  • 阻塞:当一个任务执行时,如果需要等待某个操作完成才能继续执行,这个任务就会被阻塞。在阻塞状态下,任务无法执行其他操作。
  • 非阻塞:相对于阻塞,非阻塞任务在等待某个操作完成时,可以继续执行其他操作。

2. 同步和异步:

  • 同步:指的是程序按照代码顺序依次执行,一个操作完成之后才会进行下一个操作。
  • 异步:异步编程允许程序在等待某个操作的同时继续执行其他操作,操作完成后通过回调或者事件通知来处理结果。

3. 多进程和协程:

  • 多进程:每个进程有自己独立的内存空间,系统为每个进程分配资源,进程间通信开销较大。
  • 协程:协程(coroutine)是一种轻量级的线程,可以看作是在同一个线程内部进行切换执行不同任务,共享同一个进程的资源,更高效利用 CPU 和内存。

协程的特点:

  • 轻量级: 协程不需要像线程那样创建新的进程或者线程,因此比多线程的切换开销更小。
  • 灵活性: 协程可以根据需要暂停和恢复执行,可以实现任务的合理调度。
  • 高效性: 由于不需要进行系统调用、进程/线程切换,协程可以更高效地利用计算资源。

在 Python 中,使用 asyncio 库可以实现协程。通过 asyncawait 关键字可以定义异步函数和阻塞点,在适当的时机挂起和恢复函数的执行。

协程的优点在于它们可以解决异步编程中的并发性问题,并且能够提供更好的性能和资源利用率。通过合理地使用协程,可以实现高效的并发编程,尤其在 I/O 密集型应用中表现突出。

  1. 协程的用法

    在 Python 中,可以使用 asyncio 库来实现协程。以下是协程的基本用法示例:

    1. 定义一个异步函数

    使用 async def 关键字定义一个异步函数,该函数可以包含 await 表达式来挂起执行。

    import asyncio
    
    async def greet():
        print("Hello")
        await asyncio.sleep(1)
        print("World")
    
    

    b. 运行协程任务

    使用 asyncio.run() 函数来运行协程任务,并且保证事件循环的创建和销毁。

    asyncio.run(greet())
    
    

    c. 创建并发任务

    使用 asyncio.create_task() 函数创建多个并发任务,让它们同时运行。

    async def task1():
        print("Task 1 start")
        await asyncio.sleep(2)
        print("Task 1 end")
    
    async def task2():
        print("Task 2 start")
        await asyncio.sleep(1)
        print("Task 2 end")
    
    async def main():
        taskA = asyncio.create_task(task1())
        taskB = asyncio.create_task(task2())
        await taskA
        await taskB
    
    asyncio.run(main())
    
    

    d. 并发等待多个任务完成

    使用 asyncio.gather() 函数等待多个任务完成后再继续执行。

    async def main():
        tasks = [task1(), task2()]
        await asyncio.gather(*tasks)
    
    asyncio.run(main())
    
    

    e. 异步IO操作

    在协程中可以进行异步的IO操作,例如网络请求、文件读写等操作,以提高应用程序的性能和效率。

    通过上述示例,您可以了解到如何定义、运行和管理协程,以及如何利用协程来处理并发任务和异步IO操作。在实际应用中,协程可以帮助降低资源消耗,提高程序响应性,并简化复杂的并发编程任务。

  2. 定义协程

    import asyncio
    
    async def execute(x):
        print('Number:', x)
    
    coroutine = execute(1)
    print('Coroutine:', coroutine)
    print('After calling excute')
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(coroutine)
    print('After calling loop')
    
    运行结果如下:
    Coroutine: <coroutine object execute at 0x10f5b37c0>
    After calling excute
    Number: 1
    After calling loop
    

    导入asyncio包,这样才可以使用async和await关键字。然后使用async定义一个execute方法,该方法接收一个数字参数x,执行之后会打印这个数字。

    随后直接执行execute方法,然而这个方法没有执行,而是返回了一个coroutine协程对象。之后我们使用了get_event_loop方法创建了一个事件循环loop,调用loop对象的run_until_complete方法将协程对象注册到了事件循环中,接着启动。可见,async定义的方法会变成一个无法直接执行的协程对象,必须将此对象注册到事件循环中才可以执行。

    当我们把协程对象coroutine传递给run_until_complete方法的时候,实际上它进行了一个操作,就是将coroutine封装成task对象。显示声明,代码如下:

    import asyncio
    
    async def execute(x):
        print('Number:', x)
        return x
    
    coroutine = execute(1)
    print('Coroutine:', coroutine)
    print('After calling execute')
    
    loop = asyncio.get_event_loop()
    task = loop.create_task(coroutine)
    print('Task:',task)
    loop.run_until_complete(task)
    print('Task:', task)
    print('After calling loop')
    
    运行结果如下:
    Coroutine: <coroutine object execute at 0x10faf37c0>
    After calling execute
    Task: <Task pending name='Task-1' coro=<execute() running at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/6章异步爬虫/协程用法4.py:3>>
    Number: 1
    Task: <Task finished name='Task-1' coro=<execute() done, defined at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/6章异步爬虫/协程用法4.py:3> result=1>
    After calling loop
    

    定义task对象还有另外一种方法,就是直接调用asyncio包的ensure_future方法,返回结果也是task对象,写法如下:

    import asyncio
    
    async def execute(x):
        print('Number:', x)
        return x
    
    coroutine = execute(1)
    print('Coroutine:', coroutine)
    print('After calling execute')
    
    task = asyncio.ensure_future(coroutine)
    print('Task:', task)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(task)
    print('Task:', task)
    print('After calling loop')
    
    运行结果如下:
    Coroutine: <coroutine object execute at 0x10c3737c0>
    After calling execute
    Task: <Task pending name='Task-1' coro=<execute() running at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/6章异步爬虫/协程用法5.py:3>>
    Number: 1
    Task: <Task finished name='Task-1' coro=<execute() done, defined at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/6章异步爬虫/协程用法5.py:3> result=1>
    After calling loop
    
  3. 绑定回调

    为某个task对象绑定一个回调方法,如下所示:

    import asyncio
    import requests
    
    async def request():
        url = 'https://www.baidu.com'
        status = requests.get(url)
        return status
    
    def callback(task):
        print('Status:', task.result())
    
    coroutine = request()
    task = asyncio.ensure_future(coroutine)
    task.add_done_callback(callback)
    print('Task:', task)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(task)
    print('Task:', task)
    

    定义了request方法,在这个方法里请求了百度,并获取了其状态码,随后我们定义了callback方法,这个方法接收一个参数,参数是task对象,在这个方法中调用print方法打印出task对象的结果。这样就定义好了一个协程对象和一个回调方法,我们希望达到的效果是,当协程对象执行完毕后,就去执行声明的callback方法。如何关联的呢?只要调用add_done_callback方法就行。将callback方法传递给封装好的task对象。这样当task执行完之后,就可以调用callback方法了。同时task对象还会作为参数传递给callback方法,调用task对象的result方法就可以获取返回结果了。运行结果如下:

    Task: <Task pending name='Task-1' coro=<request() running at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/6章异步爬虫/绑定回调.py:4> cb=[callback() at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/6章异步爬虫/绑定回调.py:9]>
    status: <Response [200]>
    task: <Task finished name='Task-1' coro=<request() done, defined at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/6章异步爬虫/绑定回调.py:4> result=<Response [200]>>
    

    实际上,即使不使用回调方法,在task运行完毕后,也可以直接调用result方法获取结果,代码如下:

    import asyncio
    import requests
    
    async def request():
        url = 'https://www.baidu.com'
        status = requests.get(url)
        return status
    
    coroutine = request()
    task = asyncio.ensure_future(coroutine)
    print('Task:', task)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(task)
    print('Task:', task)
    print('Task Result:', task.result())
    
    运行结果如下:
    Task: <Task pending name='Task-1' coro=<request() running at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/6章异步爬虫/绑定回调1.py:5>>
    Task: <Task finished name='Task-1' coro=<request() done, defined at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/6章异步爬虫/绑定回调1.py:5> result=<Response [200]>>
    Task Result: <Response [200]>
    
  4. 多任务协程

    如果想执行多次请求,应该怎么办?可以定义一个task列表,然后使用asyncio包中的wait方法执行,如下所示:

    import asyncio
    import requests
    
    async def request():
        url = 'https://www.baidu.com'
        status = requests.get(url)
        return status
    
    tasks = [asyncio.ensure_future(request()) for _ in range(5)]
    print('Tasks:', tasks)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    
    for task in tasks:
        print('Task Result:', task.result())
    
    运行结果如下:
    Tasks: [<Task pending name='Task-1' coro=<request() running at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/6章异步爬虫/多任务协程.py:5>>, <Task pending name='Task-2' coro=<request() running at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/6章异步爬虫/多任务协程.py:5>>, <Task pending name='Task-3' coro=<request() running at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/6章异步爬虫/多任务协程.py:5>>, <Task pending name='Task-4' coro=<request() running at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/6章异步爬虫/多任务协程.py:5>>, <Task pending name='Task-5' coro=<request() running at /Users/bruce_liu/PycharmProjects/崔庆才--爬虫/6章异步爬虫/多任务协程.py:5>>]
    Task Result: <Response [200]>
    Task Result: <Response [200]>
    Task Result: <Response [200]>
    Task Result: <Response [200]>
    Task Result: <Response [200]>
    
  5. 协程实现

    协程在解决IO密集型任务方面的优势,耗时等待一般都是IO操作,例如文件读取、网络请求等。协程在处理这种操作时是有很大优势的,当遇到需要等待的情况时,程序可以暂时挂起,转而执行其他操作,避免浪费时间。

    以https://www.httpbin.org/delay/5为例,体验一下协程的效果。示例代码如下:

    import asyncio
    import requests
    import time
    
    start = time.time()
    
    async def request():
        url = 'https://www.httpbin.org/delay/5'
        print('waiting for', url)
        response = requests.get(url)
        print('Get response from', url, 'response', response)
    
    tasks = [asyncio.ensure_future(request()) for _ in range(10)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    
    end = time.time()
    print('Cost time:', end - start)
    
    运行结果如下:
    waiting for https://www.httpbin.org/delay/5
    Get response from https://www.httpbin.org/delay/5 response <Response [200]>
    waiting for https://www.httpbin.org/delay/5
    Get response from https://www.httpbin.org/delay/5 response <Response [200]>
    waiting for https://www.httpbin.org/delay/5
    Get response from https://www.httpbin.org/delay/5 response <Response [200]>
    ...
    waiting for https://www.httpbin.org/delay/5
    Get response from https://www.httpbin.org/delay/5 response <Response [200]>
    waiting for https://www.httpbin.org/delay/5
    Get response from https://www.httpbin.org/delay/5 response <Response [200]>
    waiting for https://www.httpbin.org/delay/5
    Get response from https://www.httpbin.org/delay/5 response <Response [200]>
    Cost time: 63.61974787712097
    

    可以发现,与正常的顺序请求没有啥区别。那么异步处理的优势呢?要实现异步处理,先得有挂起操作,当一个任务需要等待IO结果的时候,可以挂起当前任务,转而执行其他任务,这样才能充分利用好资源。

  6. 使用aiohttp

    aiohttp是一个支持异步请求的库,它和asyncio配合使用,可以使我们非常方便地实现异步请求操作。

    aiohttp分为两部分:一部分是Client,一部分是Server。

    下面我们将aiohttp投入使用,将代码改成如下:

    import asyncio
    import aiohttp
    import time
    
    start = time.time()
    
    async def get(url):
        session = aiohttp.ClientSession()
        response = await session.get(url)
        await response.text()
        await session.close()
        return response
    
    async def request():
        url = 'https://www.httpbin.org/delay/5'
        print('Waiting for', url)
        response = await get(url)
        print('Get response from', url, 'response', response)
    
    tasks = [asyncio.ensure_future(request()) for _ in range(10)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    
    end = time.time()
    print('Cost time:', end - start)
    
    运行结果如下:
    Waiting for https://www.httpbin.org/delay/5
    Waiting for https://www.httpbin.org/delay/5
    Waiting for https://www.httpbin.org/delay/5
    Waiting for https://www.httpbin.org/delay/5
    Waiting for https://www.httpbin.org/delay/5
    Waiting for https://www.httpbin.org/delay/5
    ...
    Get response from https://www.httpbin.org/delay/5 response <ClientResponse(https://www.httpbin.org/delay/5) [200 OK]>
    <CIMultiDictProxy('Date': 'Sat, 23 Mar 2024 13:42:05 GMT', 'Content-Type': 'application/json', 'Content-Length': '367', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
    
    Get response from https://www.httpbin.org/delay/5 response <ClientResponse(https://www.httpbin.org/delay/5) [200 OK]>
    <CIMultiDictProxy('Date': 'Sat, 23 Mar 2024 13:42:05 GMT', 'Content-Type': 'application/json', 'Content-Length': '367', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
    ...
    Get response from https://www.httpbin.org/delay/5 response <ClientResponse(https://www.httpbin.org/delay/5) [200 OK]>
    <CIMultiDictProxy('Date': 'Sat, 23 Mar 2024 13:42:05 GMT', 'Content-Type': 'application/json', 'Content-Length': '367', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
    
    Get response from https://www.httpbin.org/delay/5 response <ClientResponse(https://www.httpbin.org/delay/5) [200 OK]>
    <CIMultiDictProxy('Date': 'Sat, 23 Mar 2024 13:42:05 GMT', 'Content-Type': 'application/json', 'Content-Length': '367', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
    
    Cost time: 6.868626832962036
    

    这里将请求库由requests改成了aiohttp,利用aiohttp库里ClientSession类的get方法进行请求。

    测试一下并发量分别为1、3、5、10、….、500时的耗时情况,代码如下:

    import asyncio
    import aiohttp
    import time
    
    def test(number):
        start = time.time()
    
        async def get(url):
            session = aiohttp.ClientSession()
            response = await session.get(url)
            await response.text()
            await session.close()
            return response
    
        async def request():
            url = 'https://www.baidu.com/'
            await get(url)
    
        tasks = [asyncio.ensure_future(request()) for _ in range(number)]
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait(tasks))
    
        end = time.time()
        print('Number:', number, 'Cost time:', end - start)
    
    for number in [1, 3, 5, 10, 15, 30, 50, 75, 100, 200, 500]:
        test(number)
        
     运行结果如下:
    Number: 1 Cost time: 0.23929095268249512
    Number: 3 Cost time: 0.19086170196533203
    Number: 5 Cost time: 0.20035600662231445
    Number: 10 Cost time: 0.21305394172668457
    Number: 15 Cost time: 0.25495195388793945
    Number: 30 Cost time: 0.769071102142334
    Number: 50 Cost time: 0.3470029830932617
    Number: 75 Cost time: 0.4492309093475342
    Number: 100 Cost time: 0.586918830871582
    Number: 200 Cost time: 1.0910720825195312
    Number: 500 Cost time: 2.4768006801605225
    
03-24 13:27