我对如何使用Python3.4中的asyncio模块感到困惑。我有一个搜索引擎的searchingAPI,并且希望每个搜索请求都可以并行或异步运行,这样我就不必等待一个搜索完成后再启动另一个搜索。
下面是我的高级搜索API,用于用原始搜索结果构建一些对象。搜索引擎本身正在使用某种异步机制,所以我不必为此费心。

# No asyncio module used here now
class search(object):
  ...
  self.s = some_search_engine()
  ...
  def searching(self, *args, **kwargs):
    ret = {}
    # do some raw searching according to args and kwargs and build the wrapped results
    ...
    return ret

为了尝试异步请求,我编写了以下测试用例来测试如何将我的东西与asyncio模块交互。
# Here is my testing script
@asyncio.coroutine
def handle(f, *args, **kwargs):
  r = yield from f(*args, **kwargs)
  return r

s = search()
loop = asyncio.get_event_loop()
loop.run_until_complete(handle(s.searching, arg1, arg2, ...))
loop.close()

通过使用pytest运行,当它到达行时,它将返回一个RuntimeError: Task got bad yield : {results from searching...}
我也尝试了另一种方法。
# same handle as above
def handle(..):
  ....
s = search()
loop = asyncio.get_event_loop()
tasks = [
        asyncio.async(handle(s.searching, arg11, arg12, ...)),
        asyncio.async(handle(s.searching, arg21, arg22, ...)),
        ...
        ]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

通过通过Pytest运行这个测试用例,它可以通过,但是搜索引擎会出现一些奇怪的异常。上面写着。
我想问的问题:
对于我的第一次尝试,通过返回函数调用的实际结果,这是正确的使用方法吗?
我想我需要在我的第二个测试用例中添加一些睡眠来等待任务完成,但是我该怎么做呢?在我的第二个测试用例中,如何让我的函数调用返回?
通过创建一个异步处理程序来处理请求,这是用现有模块实现异步IO的好方法吗?
如果问题2的答案是“否”,那么是否每个客户机对类的调用都需要包含r = yield from ...这类异步请求的内容?

最佳答案

问题是,不能像调用asyncio.coroutine并获得异步行为那样调用现有的同步代码。当您调用yield from searching(...)时,只有当searching本身实际上是一个asyncio.coroutine或者至少返回一个asyncio.Future时,您才会获得异步行为。现在,searching只是一个常规的同步函数,所以调用yield from searching(...)只会抛出一个错误,因为它不会返回Future或协程。
要获得您想要的行为,除了searching版本之外,您还需要一个异步版本的synchronous(或者,如果不需要同步版本,只需完全删除它)。您有几个选项可以同时支持这两种功能:
searching重写为使用兼容的调用来进行I/O,而不是阻止I/O的asyncio.coroutine。这将使其在asyncio上下文中工作,但意味着您将无法再在同步上下文中直接调用它。相反,您还需要提供一个可选的同步asyncio方法,启动一个searching事件循环并调用asyncio。有关更多详细信息,请参见this question
保持对return loop.run_until_complete(self.searching(...))的同步实现,并提供另一个异步API,该API使用searching在后台线程中运行您的BaseEventLoop.run_in_executor方法:

class search(object):
  ...
  self.s = some_search_engine()
  ...
  def searching(self, *args, **kwargs):
    ret = {}
    ...
    return ret

   @asyncio.coroutine
   def searching_async(self, *args, **kwargs):
      loop = kwargs.get('loop', asyncio.get_event_loop())
      try:
          del kwargs['loop']  # assuming searching doesn't take loop as an arg
      except KeyError:
          pass
      r = yield from loop.run_in_executor(None, self.searching, *args)  # Passing None tells asyncio to use the default ThreadPoolExecutor
      return r

测试脚本:
s = search()
loop = asyncio.get_event_loop()
loop.run_until_complete(s.searching_async(arg1, arg2, ...))
loop.close()

通过这种方式,您可以保持同步代码的原样,并且至少提供可以在searching代码中使用的方法,而不会阻塞事件循环。如果您在代码中实际使用了异步I/O,那么解决方案就没有它那么干净了,但它总比什么都没有好。
提供两个完全独立的版本的asyncio,一个使用阻塞I/O,另一个与searching兼容。这为两种上下文提供了理想的实现,但需要两倍的工作量。

关于python - Python asyncio任务 yield 很低,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/30172821/

10-12 22:24