我编写了一个测试程序来尝试使用create_task()进行测试,需要等到创建的任务完成。
我尝试使用loop.run_until_complete()等待任务完成,但它会导致一个带有回溯的错误。

/Users/jason/.virtualenvs/xxx/bin/python3.5 /Users/jason/asyncio/examples/hello_coroutine.py
    Traceback (most recent call last):
    Test
      File "/Users/jason/asyncio/examples/hello_coroutine.py", line 42, in <module>
    Hello World, is a task
        loop.run_until_complete(test.greet_every_two_seconds())
      File "/Users/jason/asyncio/asyncio/base_events.py", line 373, in run_until_complete
        return future.result()
      File "/Users/jason/asyncio/asyncio/futures.py", line 274, in result
        raise self._exception
      File "/Users/jason/asyncio/asyncio/tasks.py", line 240, in _step
        result = coro.send(None)
      File "/Users/jason/asyncio/examples/hello_coroutine.py", line 33, in greet_every_two_seconds
        self.a()
      File "/Users/jason/asyncio/examples/hello_coroutine.py", line 26, in a
        t = loop.run_until_complete(self.greet_every_one_seconds(self.db_presell))
      File "/Users/jason/asyncio/asyncio/base_events.py", line 361, in run_until_complete
        self.run_forever()
      File "/Users/jason/asyncio/asyncio/base_events.py", line 326, in run_forever
        raise RuntimeError('Event loop is running.')
    RuntimeError: Event loop is running.

测试代码如下。函数a()不能是协程,
如何等待任务完成?
import asyncio

class Test(object):

    def __init__(self):
        print(self.__class__.__name__)
        pass
    @asyncio.coroutine
    def greet_every_one_seconds(self, value):
            print('Hello World, one second.')
            fut = asyncio.sleep(1,result=value)
            a = yield from fut
            print(a)

    def a(self):

        loop = asyncio.get_event_loop()

        task=loop.run_until_complete(self.greet_every_one_seconds(4))
        #How can i wait for the task until complete?

    @asyncio.coroutine
    def greet_every_two_seconds(self):
        while True:
            self.a()
            print('Hello World, two seconds.')
            yield from asyncio.sleep(2)


if __name__ == '__main__':
    test = Test()
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(test.greet_every_two_seconds())
    finally:
        loop.close()

最佳答案

如何等待任务完成?
loop.run_until_complete(task)在普通函数中。或在协程中。
下面是一个完整的代码示例,演示了这两种情况:

#!/usr/bin/env python3
import asyncio

async def some_coroutine(loop):
    task = loop.create_task(asyncio.sleep(1))  # just some task
    await task # wait for it (inside a coroutine)

loop = asyncio.get_event_loop()
task = loop.create_task(asyncio.sleep(1)) # just some task
loop.run_until_complete(task) # wait for it (outside of a coroutine)
loop.run_until_complete(some_coroutine(loop))

10-06 13:56