Python 大任务切分小任务

今天来说说,Python中的任务切分。以爬虫为例,从一个存 url 的 txt 文件中,读取其内容,我们会获取一个 url 列表。我们把这一个 url 列表称为大任务。

列表切分

在不考虑内存占用的情况下,我们对上面的大任务进行一个切分。比如我们将大任务切分成的小任务是每秒最多只访问5个URL。

import os
import time CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) def read_file():
file_path = os.path.join(CURRENT_DIR, "url_list.txt")
with open(file_path, "r", encoding="utf-8") as fs:
result = [i.strip() for i in fs.readlines()]
return result def fetch(url):
print(url) def run():
max_count = 5
url_list = read_file()
for index in range(0, len(url_list), max_count):
start = time.time()
fetch(url_list[index:index + max_count])
end = time.time() - start
if end < 1:
time.sleep(1 - end) if __name__ == '__main__':
run()

关键代码都在for循环里,首先我们通过声明range的第三个参数,该参数指定迭代的步长为5,这样每次index增加都是以5为基数,即0,5,10。。。

然后我们对url_list做切片,每次取其五个元素,这五个元素会随着index的增加不断的在改变,如果最后不够五个了,按照切片的特性这个时候就会有多少取多少了,不会造成索引超下标的问题。

随着url列表的增加,我们会发现内存的占用也在提高了。这个时候我们就需要对代码进行修改了,我们知道生成器是比较节省内存的空间的,修改之后代码变成,下面的这样。

生成器切分

# -*- coding: utf-8 -*-
# @时间 : 2019-11-23 23:47
# @作者 : 陈祥安
# @文件名 : g.py
# @公众号: Python学习开发
import os
import time
from itertools import islice CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) def read_file():
file_path = os.path.join(CURRENT_DIR, "url_list.txt")
with open(file_path, "r", encoding="utf-8") as fs:
for i in fs:
yield i.strip() def fetch(url):
print(url) def run():
max_count = 5
url_gen = read_file()
while True:
url_list = list(islice(url_gen, 0, max_count))
if not url_list:
break
start = time.time()
fetch(url_list)
end = time.time() - start
if end < 1:
time.sleep(1 - end) if __name__ == '__main__':
run()

首先,我们修改了文件读取的方式,把原来读列表的形式,改为了生成器的形式。这样我们在调用该文件读取方法的时候大大节省了内存。

然后就是对上面for循环进行改造,因为生成器的特性,这里不适合使用for进行迭代,因为每一次的迭代都会消耗生成器的元素,通过使用itertools的islice对url_gen进行切分,islice是生成器的切片,这里我们每次切分出含有5个元素的生成器,因为生成器没有__len__方法所以,我们将其转为列表,然后判断列表是否为空,就可以知道迭代是否该结束了。

修改之后的代码,不管是性能还是节省内存上都大大的提高。读取千万级的文件不是问题。

除此之外,在使用异步爬虫的时候,也许会用到异步生成器切片。下面就和大家讨论,异步生成器切分的问题

异步生成器切分

首先先来看一个简单的异步生成器。

我们知道调用下面的代码会得到一个生成器

def foo():
for i in range(20):
yield i

如果在def前面加一个async,那么在调用的时候它就是个异步生成器了。

完整示例代码如下

import asyncio
async def foo():
for i in range(20):
yield i async def run():
async_gen = foo()
async for i in async_gen:
print(i) if __name__ == '__main__':
asyncio.run(run())

关于async for的切分有点复杂,这里推荐使用aiostream模块,使用之后代码改为下面这样

import asyncio
from aiostream import stream async def foo():
for i in range(22):
yield i async def run():
index = 0
limit = 5 while True:
#测试发现foo应该是个callable类型,并且运行之后是个async_generator。
xs = stream.iterate(foo())
ys = xs[index:index + limit]
t = await stream.list(ys)
if not t:
break
print(t)
index += limit
await asyncio.sleep(0.01) if __name__ == '__main__':
asyncio.run(run())
05-13 04:46