问题描述
我对如何将 asyncio.Queue
用于特定的生产者-消费者模式感到困惑,在这种模式中,生产者和消费者同时独立运行.
I'm confused about how to use asyncio.Queue
for a particular producer-consumer pattern in which both the producer and consumer operate concurrently and independently.
首先,考虑这个例子,它紧跟 docs forasyncio.Queue
:
First, consider this example, which closely follows that from the docs for asyncio.Queue
:
import asyncio
import random
import time
async def worker(name, queue):
while True:
sleep_for = await queue.get()
await asyncio.sleep(sleep_for)
queue.task_done()
print(f'{name} has slept for {sleep_for:0.2f} seconds')
async def main(n):
queue = asyncio.Queue()
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
tasks = []
for i in range(n):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
if __name__ == '__main__':
import sys
n = 3 if len(sys.argv) == 1 else sys.argv[1]
asyncio.run(main())
关于这个脚本有一个更详细的细节:项目被同步放入队列,queue.put_nowait(sleep_for)
超过了传统的 for 循环.
There is one finer detail about this script: the items are put into the queue synchronously, with queue.put_nowait(sleep_for)
over a conventional for-loop.
我的目标是创建一个使用 async def worker()
(或 consumer()
)和 async def producer()
的脚本.两者都应安排为同时运行.没有任何消费者协程与生产者明确绑定或链接.
My goal is to create a script that uses async def worker()
(or consumer()
) and async def producer()
. Both should be scheduled to run concurrently. No one consumer coroutine is explicitly tied to or chained from a producer.
如何修改上面的程序,使生产者成为自己的协程,可以与消费者/工人同时调度?
How can I modify the program above so that the producer(s) is its own coroutine that can be scheduled concurrently with the consumers/workers?
推荐答案
这个例子可以在不改变其基本逻辑的情况下进行推广:
The example can be generalized without changing its essential logic:
- 将插入循环移至单独的生产者协程.
- 在后台启动消费者,让他们在生产商品时对其进行处理.
- 在消费者运行的情况下,启动生产者并等待他们完成生产项目,例如
await producer()
或await gather(*producers)
等. - 所有生产者都完成后,等待消费者使用
await queue.join()
处理剩余的项目. - 取消消费者,所有消费者现在都在无所事事地等待队列交付下一个项目,因为我们知道生产者已经完成,下一个项目永远不会到达.
- Move the insertion loop to a separate producer coroutine.
- Start the consumers in the background, letting them process the items as they are produced.
- With the consumers running, start the producers and wait for them to finish producing items, as with
await producer()
orawait gather(*producers)
, etc. - Once all producers are done, wait for consumers to process the remaining items with
await queue.join()
. - Cancel the consumers, all of which are now idly waiting for the queue to deliver the next item, which will never arrive as we know the producers are done.
以下是实现上述内容的示例:
Here is an example implementing the above:
import asyncio, random
async def rnd_sleep(t):
# sleep for T seconds on average
await asyncio.sleep(t * random.random() * 2)
async def producer(queue):
while True:
# produce a token and send it to a consumer
token = random.random()
print(f'produced {token}')
if token < .05:
break
await queue.put(token)
await rnd_sleep(.1)
async def consumer(queue):
while True:
token = await queue.get()
# process the token received from a producer
await rnd_sleep(.3)
queue.task_done()
print(f'consumed {token}')
async def main():
queue = asyncio.Queue()
# fire up the both producers and consumers
producers = [asyncio.create_task(producer(queue))
for _ in range(3)]
consumers = [asyncio.create_task(consumer(queue))
for _ in range(10)]
# with both producers and consumers running, wait for
# the producers to finish
await asyncio.gather(*producers)
print('---- done producing')
# wait for the remaining tasks to be processed
await queue.join()
# cancel the consumers, which are now idle
for c in consumers:
c.cancel()
asyncio.run(main())
注意,在现实生活中的生产者和消费者中,尤其是那些涉及到网络访问的,你可能想捕捉处理过程中发生的与IO相关的异常.如果异常是可恢复的,就像大多数与网络相关的异常一样,您可以简单地捕获异常并记录错误.您仍应调用 task_done()
,否则 queue.join()
将因未处理的项目而挂起.如果重新尝试处理该项目有意义,您可以在调用 task_done()
之前将其返回到队列中.例如:
Note that in real-life producers and consumers, especially those that involve network access, you probably want to catch IO-related exceptions that occur during processing. If the exception is recoverable, as most network-related exceptions are, you can simply catch the exception and log the error. You should still invoke task_done()
because otherwise queue.join()
will hang due to an unprocessed item. If it makes sense to re-try processing the item, you can return it into the queue prior to calling task_done()
. For example:
# like the above, but handling exceptions during processing:
async def consumer(queue):
while True:
token = await queue.get()
try:
# this uses aiohttp or whatever
await process(token)
except aiohttp.ClientError as e:
print(f"Error processing token {token}: {e}")
# If it makes sense, return the token to the queue to be
# processed again. (You can use a counter to avoid
# processing a faulty token infinitely.)
#await queue.put(token)
queue.task_done()
print(f'consumed {token}')
这篇关于使用 asyncio.Queue 进行生产者-消费者流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!