问题描述
目前,我有一个低效的同步生成器,它按顺序发出许多 HTTP 请求并产生结果.我想使用 asyncio
和 aiohttp
并行化请求,从而加速这个生成器,但我想要保持它作为一个普通的生成器(不是 PEP 525 异步生成器) 这样调用它的非异步代码就不需要修改了.如何创建这样的生成器?
Currently, I have an inefficient synchronous generator that makes many HTTP requests in sequence and yields the results. I'd like to use asyncio
and aiohttp
to parallelise the requests and thereby speed up this generator, but I want to keep it as an ordinary generator (not a PEP 525 async generator) so that the non-async code that calls it doesn't need to be modified. How can I create such a generator?
推荐答案
asyncio.as_completed()
接受协程或期货的迭代,并按照输入期货完成的顺序返回期货的迭代.通常,你会循环它的结果并await
来自async
函数内部的成员......
asyncio.as_completed()
takes an iterable of coroutines or futures and returns an iterable of futures in the order that the input futures complete. Normally, you'd loop over its result and await
the members from inside an async
function...
import asyncio
async def first():
await asyncio.sleep(5)
return 'first'
async def second():
await asyncio.sleep(1)
return 'second'
async def third():
await asyncio.sleep(3)
return 'third'
async def main():
for future in asyncio.as_completed([first(), second(), third()]):
print(await future)
# Prints 'second', then 'third', then 'first'
asyncio.run(main())
...但就这个问题而言,我们想要的是能够从一个普通的生成器中产生这些结果,这样普通的同步代码就可以在不知道 async
的情况下使用它们功能正在引擎盖下使用.我们可以通过对 as_completed
调用产生的期货调用 loop.run_until_complete()
来做到这一点...
... but for the purpose of this question, what we want is to be able to yield these results from an ordinary generator, so that normal synchronous code can consume them without ever knowing that async
functions are being used under the hood. We can do that by calling loop.run_until_complete()
on the futures yielded by our as_completed
call...
import asyncio
async def first():
await asyncio.sleep(5)
return 'first'
async def second():
await asyncio.sleep(1)
return 'second'
async def third():
await asyncio.sleep(3)
return 'third'
def ordinary_generator():
loop = asyncio.get_event_loop()
for future in asyncio.as_completed([first(), second(), third()]):
yield loop.run_until_complete(future)
# Prints 'second', then 'third', then 'first'
for element in ordinary_generator():
print(element)
通过这种方式,我们以不需要调用者将任何函数定义为 async
的方式将我们的异步代码暴露给非异步领域,甚至不需要知道 ordinary_generator
在幕后使用 asyncio
.
In this way, we've exposed our async code to non-async-land in a manner that doesn't require callers to define any functions as async
, or to even know that ordinary_generator
is using asyncio
under the hood.
作为在某些情况下提供更大灵活性的 ordinary_generator()
的替代实现,我们可以重复调用 asyncio.wait()
带有 FIRST_COMPLETED
标志,而不是在 上循环as_completed()
:
As an alternative implementation of ordinary_generator()
that offers more flexibility in some circumstances, we can repeatedly call asyncio.wait()
with the FIRST_COMPLETED
flag instead of looping over as_completed()
:
import concurrent.futures
def ordinary_generator():
loop = asyncio.get_event_loop()
pending = [first(), second(), third()]
while pending:
done, pending = loop.run_until_complete(
asyncio.wait(
pending,
return_when=concurrent.futures.FIRST_COMPLETED
)
)
for job in done:
yield job.result()
这种维护一个 pending
作业列表的方法的优点是我们可以调整它以动态地将作业添加到 pending
列表中.这在我们的异步作业可以向队列添加不可预测数量的进一步作业的用例中非常有用 - 就像一个网络蜘蛛跟踪它访问的每个页面上的所有链接.
This approach, maintaining a list of pending
jobs, has the advantage that we can adapt it to add jobs to the pending
list on the fly. This is useful in use cases where our async jobs can add an unpredictable number of further jobs to the queue - like a web spider that follows all links on each page that it visits.
一个警告:上面的方法假设我们从主线程调用同步代码,在这种情况下 get_event_loop
保证给我们一个循环,我们不需要 .close
它.如果我们希望 ordinary_generator
可以在非主线程中使用,尤其是那些之前可能已经创建了事件循环的线程,那么事情会变得更加艰难,因为我们不能依赖 get_event_loop
(它在任何还没有事件循环的非主线程上引发 RuntimeError
).在这种情况下,我能想到的最简单的事情就是分出一个新线程来运行我们的 asyncio
代码,并通过队列与它通信:
One caveat: the approaches above assume we're calling the synchronous code from the main thread, in which case get_event_loop
is guaranteed to give us a loop and we've got no need to .close
it. If we want ordinary_generator
to be usable from a non-main thread, especially one that may have previously had an event loop created, then life gets harder, because we can't rely on get_event_loop
(it raises a RuntimeError
on any non-main thread that doesn't have an event loop yet). In that case the simplest thing I can think to do is to spin off a new thread to run our asyncio
code, and communicate with it via a queue:
def ordinary_generator():
sentinel = object()
queue = Queue()
def thread_entry_point():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
for future in asyncio.as_completed([first(), second(), third()]):
try:
queue.put(loop.run_until_complete(future))
except Exception as e:
queue.put((sentinel, e))
break
loop.close()
queue.put(sentinel)
Thread(target=thread_entry_point).start()
while True:
val = queue.get()
if val is sentinel:
return
if isinstance(val, tuple) and len(val) == 2 and val[0] is sentinel:
raise val[1]
yield val
(将倒数第二个示例中 run_until_complete
的使用与最后一个示例中额外线程的使用相结合,留给任何需要这样做的读者作为练习.)
(Combining the use of run_until_complete
from the penultimate example with the use of an extra thread in the final example is left as an exercise for any reader who needs to do so.)
这篇关于创建生成器,在协程完成时产生协程结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!