我对如何使用Python3.4中的asyncio
模块感到困惑。我有一个搜索引擎的searching
API,并且希望每个搜索请求都可以并行或异步运行,这样我就不必等待一个搜索完成后再启动另一个搜索。
下面是我的高级搜索API,用于用原始搜索结果构建一些对象。搜索引擎本身正在使用某种异步机制,所以我不必为此费心。
# No asyncio module used here now
class search(object):
...
self.s = some_search_engine()
...
def searching(self, *args, **kwargs):
ret = {}
# do some raw searching according to args and kwargs and build the wrapped results
...
return ret
为了尝试异步请求,我编写了以下测试用例来测试如何将我的东西与
asyncio
模块交互。# Here is my testing script
@asyncio.coroutine
def handle(f, *args, **kwargs):
r = yield from f(*args, **kwargs)
return r
s = search()
loop = asyncio.get_event_loop()
loop.run_until_complete(handle(s.searching, arg1, arg2, ...))
loop.close()
通过使用pytest运行,当它到达行时,它将返回一个
RuntimeError: Task got bad yield : {results from searching...}
。我也尝试了另一种方法。
# same handle as above
def handle(..):
....
s = search()
loop = asyncio.get_event_loop()
tasks = [
asyncio.async(handle(s.searching, arg11, arg12, ...)),
asyncio.async(handle(s.searching, arg21, arg22, ...)),
...
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
通过通过Pytest运行这个测试用例,它可以通过,但是搜索引擎会出现一些奇怪的异常。上面写着。
我想问的问题:
对于我的第一次尝试,通过返回函数调用的实际结果,这是正确的使用方法吗?
我想我需要在我的第二个测试用例中添加一些睡眠来等待任务完成,但是我该怎么做呢?在我的第二个测试用例中,如何让我的函数调用返回?
通过创建一个异步处理程序来处理请求,这是用现有模块实现异步IO的好方法吗?
如果问题2的答案是“否”,那么是否每个客户机对类的调用都需要包含
r = yield from ...
这类异步请求的内容? 最佳答案
问题是,不能像调用asyncio.coroutine
并获得异步行为那样调用现有的同步代码。当您调用yield from searching(...)
时,只有当searching
本身实际上是一个asyncio.coroutine
或者至少返回一个asyncio.Future
时,您才会获得异步行为。现在,searching
只是一个常规的同步函数,所以调用yield from searching(...)
只会抛出一个错误,因为它不会返回Future
或协程。
要获得您想要的行为,除了searching
版本之外,您还需要一个异步版本的synchronous
(或者,如果不需要同步版本,只需完全删除它)。您有几个选项可以同时支持这两种功能:
将searching
重写为使用兼容的调用来进行I/O,而不是阻止I/O的asyncio.coroutine
。这将使其在asyncio
上下文中工作,但意味着您将无法再在同步上下文中直接调用它。相反,您还需要提供一个可选的同步asyncio
方法,启动一个searching
事件循环并调用asyncio
。有关更多详细信息,请参见this question。
保持对return loop.run_until_complete(self.searching(...))
的同步实现,并提供另一个异步API,该API使用searching
在后台线程中运行您的BaseEventLoop.run_in_executor
方法:
class search(object):
...
self.s = some_search_engine()
...
def searching(self, *args, **kwargs):
ret = {}
...
return ret
@asyncio.coroutine
def searching_async(self, *args, **kwargs):
loop = kwargs.get('loop', asyncio.get_event_loop())
try:
del kwargs['loop'] # assuming searching doesn't take loop as an arg
except KeyError:
pass
r = yield from loop.run_in_executor(None, self.searching, *args) # Passing None tells asyncio to use the default ThreadPoolExecutor
return r
测试脚本:
s = search()
loop = asyncio.get_event_loop()
loop.run_until_complete(s.searching_async(arg1, arg2, ...))
loop.close()
通过这种方式,您可以保持同步代码的原样,并且至少提供可以在
searching
代码中使用的方法,而不会阻塞事件循环。如果您在代码中实际使用了异步I/O,那么解决方案就没有它那么干净了,但它总比什么都没有好。提供两个完全独立的版本的
asyncio
,一个使用阻塞I/O,另一个与searching
兼容。这为两种上下文提供了理想的实现,但需要两倍的工作量。关于python - Python asyncio任务 yield 很低,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/30172821/