本文介绍了测试从常规函数调用python协程(异步定义)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一些异步协程,它获取一些数据并将其返回。如下所示:

async def fetch_data(*args):
  result = await some_io()
  return result
基本上,此协程是从协程链中调用的,初始协程通过创建任务来运行。但是,如果出于测试目的,我只想在运行某个文件时以这种方式运行一个协程:

if __name__ == '__main__':
  result = await fetch_data(*args)
  print(result)
显然我不能这样做,因为我试图运行并等待来自非协程函数的协程。所以问题是:有没有一些正确的方法可以在不调用函数的情况下从协程中获取数据?我可以为result制作一些Future对象并等待它,但也许还有其他更简单、更清晰的方法?

推荐答案

您需要创建一个事件循环来运行您的协程:

import asyncio

async def async_func():
    return "hello"

loop = asyncio.get_event_loop()
result = loop.run_until_complete(async_func())
loop.close()

print(result)

或作为函数:

def run_coroutine(f, *args, **kwargs):
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(f(*args, **kwargs))
    loop.close()
    return result

如下使用:

print(run_coroutine(async_func))

或:

assert "expected" == run_coroutine(fetch_data, "param1", param2="foo")

这篇关于测试从常规函数调用python协程(异步定义)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-22 00:47