问题描述
我有以下情况:
- Python 3.6 +
- 从文件中逐行读取输入数据.
- 协程将数据发送到API(使用
aiohttp
),并将调用结果保存到Mongo(使用motor
).因此,有很多IO正在进行.
- Python 3.6+
- The input data is read from a file, line by line.
- A coroutine sends the data to an API (using
aiohttp
) and saves the result of the call to Mongo (usingmotor
). So there's a lot of IO going on.
该代码是使用async
/await
编写的,对于手动执行的单个调用也可以正常工作.
The code is written using async
/ await
, and works just fine for individual calls executed manually.
我不知道该怎么做,就是要大量使用输入数据.
What I don't know how to do is to consume the input data en masse.
我见过的所有asyncio
示例都通过发送有限列表作为参数来演示asyncio.wait
.但是我不能简单地向它发送任务列表,因为输入文件可能有数百万行.
All asyncio
examples I've seen demonstrate asyncio.wait
by sending a finite list as a parameter. But I can't simply send a list of tasks to it, because the input file may have millions of rows.
我的情况是关于通过传送带将数据流传输到消费者.
My scenario is about streaming data as through a conveyor belt to a consumer.
我还能做什么?我希望程序使用它可以聚集的所有资源来处理文件中的数据,而又不会感到不知所措.
What else can I do? I want the program to process the data in the file using all the resources it can muster, but without getting overwhelmed.
推荐答案
您可以创建与传送带的容量大致相对应的固定数量的任务,并将其弹出队列.例如:
You can create a fixed number of tasks roughly corresponding to the capacity of your conveyor belt, and pop them off a queue. For example:
async def consumer(queue):
while True:
line = await queue.get()
# connect to API, Mongo, etc.
...
queue.task_done()
async def producer():
N_TASKS = 10
loop = asyncio.get_event_loop()
queue = asyncio.Queue(N_TASKS)
tasks = [loop.create_task(consume(queue)) for _ in range(N_TASKS)]
try:
with open('input') as f:
for line in f:
await queue.put(line)
await queue.join()
finally:
for t in tasks:
t.cancel()
由于与线程不同,任务是轻量级的,并且不占用操作系统资源,因此最好在创建太多"任务时犯错. asyncio可以毫不费力地处理成千上万的任务,尽管这对于这些任务可能是过大的,几十就足够了.
Since, unlike threads, tasks are lightweight and do not hog operating system resources, it is fine to err on the side of creating "too many" of them. asyncio can handle thousands of tasks without a hitch, although that is probably overkill for this tasks - tens will suffice.
这篇关于Python asyncio:处理潜在的无限列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!