PEP 525 --异步生成器

简述

PEP492引入了对Python 3.5的原生协程和async/await句法的支持。本次提案添加了对异步生成器的支持进而来扩展Python的异步功能。

理论和目标

常规生成器(在PEP 255中引入)的实现,使得编写复杂数据变得更优雅,它们的行为类似于迭代器。
当时没有提供async for使用的异步生成器。 编写异步数据生成器变得非常复杂,因为必须定义一个实现__aiter__和__anext__的方法,才能在async for语句中使用它。
为了说明异步生成器的重要性,专门做了性能测试,测试结果表明使用异步生成器要比使用异步迭代器快2倍多。
下面的代码是演示了在迭代的过程中等待几秒

class Ticker:
    """Yield numbers from 0 to `to` every `delay` seconds."""

    def __init__(self, delay, to):
        self.delay = delay
        self.i = 0
        self.to = to

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        if i >= self.to:
            raise StopAsyncIteration
        self.i += 1
        if i:
            await asyncio.sleep(self.delay)
        return i

我们那可以使用下面的代码实现同样的功能:

async def ticker(delay, to):
    """Yield numbers from 0 to `to` every `delay` seconds."""
    for i in range(to):
        yield i
        await asyncio.sleep(delay)

详细说明

异步生成器

我们直到在函数中使用一个或多个yield该函数将变成一个生成器。

def func():            # 方法
    return

def genfunc():         # 生成器方法
    yield

我们提议使用类似的功能实现下面异步生成器:

async def coro():      # 一个协程方法
    await smth()

async def asyncgen():  # 一个异步生成器方法
    await smth()
    yield 42

调用异步生成器函数的结果是异步生成器对象,它实现了PEP 492中定义的异步迭代协议。
注意:在异步生成器中使用非空return语句会引发SyntaxError错误。

对异步迭代协议的支持

该协议需要实现两种特殊方法:
__aiter__方法返回一个异步迭代器。
__anext__方法返回一个awaitable对象,它使用StopIteration异常来捕获yield的值,使用StopAsyncIteration异常来表示迭代结束。
异步生成器定义了这两种方法。 让我们实现一个一个简单的异步生成器:

import asyncio
async def genfunc():
    yield 1
    yield 2

gen = genfunc()

async def start():
    assert gen.__aiter__() is gen
    assert await gen.__anext__() == 1
    assert await gen.__anext__() == 2
    await gen.__anext__()  # This line will raise StopAsyncIteration.

if __name__ == '__main__':
    asyncio.run(start())

终止

PEP 492提到需要使用事件循环或调度程序来运行协程。 因为异步生成器是在协程使用的,所以还需要创建一个事件循环来运行。
异步生成器可以有try..finally块,也可以用async with异步上下文管理代码快。 重要的是提供一种保证,即使在部分迭代时,也可以进行垃圾收集,生成器可以安全终止。

async def square_series(con, to):
    async with con.transaction():
        cursor = con.cursor(
            'SELECT generate_series(0, $1) AS i', to)
        async for row in cursor:
            yield row['i'] ** 2

async for i in square_series(con, 1000):
    if i == 100:
        break

上面代码演示了异步生成器在async with中使用,然后使用async for对异步生成器对象进行迭代处理,同时我们也可以设置一个中断条件。
square_series()生成器将被垃圾收集,并没有异步关闭生成器的机制,Python解释器将无法执行任何操作。
为了解决这个问题,这里提出以下改进建议:
1.在异步生成器上实现一个aclose方法,返回一个特殊awaittable 对象。 当awaitable抛出GeneratorExit异常的时候,抛出到挂起的生成器中并对其进行迭代,直到发生GeneratorExit或StopAsyncIteration。这就是在常规函数中使用close方法关闭对象一样,只不过aclose需要一个事件循环去执行。
2.不要在异步生成器中使用yield语句,只能用await。
3.在sys模块中加两个方法:set_asyncgen_hooks() and get_asyncgen_hooks().
sys.set_asyncgen_hooks()背后的思想是允许事件循环拦截异步生成器的迭代和终结,这样最终用户就不需要关心终结问题了,一切正常。
sys.set_asyncgen_hooks() 可以结束两个参数
firstiter:一个可调用的,当第一次迭代异步生成器时将调用它。
finalizer:一个可调用的,当异步生成器即将被GC时将被调用。
当第一迭代异步生成器时,它会引用到当前的finalizer。
当异步生成器即将被垃圾收集时,它会调用其缓存的finalizer。假想在事件循环激活异步生成器开始迭代的时候,finalizer将调用一个aclose()方法.
例如,以下是如何修改asyncio以允许安全地完成异步生成器:

# asyncio/base_events.py

class BaseEventLoop:

    def run_forever(self):
        ...
        old_hooks = sys.get_asyncgen_hooks()
        sys.set_asyncgen_hooks(finalizer=self._finalize_asyncgen)
        try:
            ...
        finally:
            sys.set_asyncgen_hooks(*old_hooks)
            ...

    def _finalize_asyncgen(self, gen):
        self.create_task(gen.aclose())

第二个参数firstiter,允许事件循环维护在其控制下实例化的弱异步生成器集。这使得可以实现“shutdown”机制,来安全地打开的生成器并关闭事件循环。
sys.set_asyncgen_hooks()是特定线程,因此在多个事件循环并行的时候是安全的。
sys.get_asyncgen_hooks()返回一个带有firstiter和finalizer字段的类似于类的结构。

asyncio

asyncio事件循环将使用sys.set_asyncgen_hooks()API来维护所有被调度的弱异步生成器,并在生成器被垃圾回收时侯调度它们的aclose()方法。
为了确保asyncio程序可以可靠地完成所有被调度的异步生成器,我们建议添加一个新的事件循环协程方法loop.shutdown_asyncgens()。 该方法将使用aclose()调用关闭所有当前打开的异步生成器。
在调用loop.shutdown_asyncgens()方法之后,首次迭代新的异步生成器,事件循环就会发出警告。 我们的想法是,在请求关闭所有异步生成器之后,程序不应该执行迭代新异步生成器的代码。
下面是一个关于如何使用Ashutdown_asyncgens的例子:

try:
    loop.run_forever()
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())#关闭所有异步迭代器
    loop.close()

异步生成器对象

该对象以标准Python生成器对象为模型。 本质上异步生成器的行为复制了同步生成器的行为,唯一的区别在于API是异步的。
定义了以下方法和属性:
1.agen.__aiter__(): 返回agen.
2.agen.__anext__(): 返回一个awaitable对象, 调用一次异步生成器的元素。
3.agen.asend(val): 返回一个awaitable对象,它在agen生成器中推送val对象。 当agen还没迭代时,val必须为None。
上面的方法类似同步生成器的使用。
代码例子:

import asyncio


async def gen():
    await asyncio.sleep(0.1)
    v = yield 42
    print(v)
    await asyncio.sleep(0.2)



async def start():
    g = gen()

    await g.asend(None)  # Will return 42 after sleeping
    # for 0.1 seconds.

    await g.asend('hello')  # Will print 'hello' and
    # raise StopAsyncIteration
    # (after sleeping for 0.2 seconds.)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

4.agen.athrow(typ, [val, [tb]]): 返回一个awaitable对象, 这会向agen生成器抛出一个异常。
代码如下:

import asyncio


async def gen():
    try:
        await asyncio.sleep(0.1)
        yield 'hello'
    except IndexError:
        await asyncio.sleep(0.2)
        yield 'world'


async def start():
    g = gen()
    v = await g.asend(None)
    print(v)  # Will print 'hello' after
    # sleeping for 0.1 seconds.

    v = await g.athrow(IndexError)
    print(v)  # Will print 'world' after
    # $ sleeping 0.2 seconds.


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

5.agen.aclose(): 返回一个awaitable对象, 调用该方法会抛出一个异常给生成器。

import asyncio


async def gen():
    try:
        await asyncio.sleep(0.1)
        v = yield 42
        print(v)
        await asyncio.sleep(0.2)
    except:
         print("运行结束")


async def start():
    g = gen()
    v=await g.asend(None)
    print(v)
    await g.aclose() #不做异常处理会报错


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

6.agen.__name__ and agen.__qualname__:可以返回异步生成器函数的名字。

async def gen():
    try:
        await asyncio.sleep(0.1)
        v = yield 42
        print(v)
        await asyncio.sleep(0.2)
    except:
         print("运行结束")

async def start():
    g = gen()
    print(g.__aiter__())#输出async_generator对象
    print(g.__name__)#输出gen
    print(g.__qualname__)#输出gen

其他的方法

agen.ag_await: 正等待的对象(None). 类似当前可用的 gi_yieldfrom for generators and cr_await for coroutines.
agen.ag_frame, agen.ag_running, and agen.ag_code: 同生成器一样

StopIteration and StopAsyncIteration 被替换为 RuntimeError,并且不上抛。

源码实现细节

异步生成器对象(PyAsyncGenObject)与PyGenObject共享结构布局。 除此之外,参考实现还引入了三个新对象:
PyAsyncGenASend:实现__anext__和asend()方法的等待对象。
PyAsyncGenAThrow:实现athrow()和aclose()方法的等待对象。
_PyAsyncGenWrappedValue:来自异步生成器的每个直接生成的对象都隐式地装入此结构中。 这就是生成器实现如何使用常规迭代协议从使用异步迭代协议生成的对象中分离出的对象。
PyAsyncGenASend和PyAsyncGenAThrow是awaitable对象(它们有__await__方法返回self)类似于coroutine的对象(实现__iter,__ next,send()和throw()方法)。 本质上,它们控制异步生成器的迭代方式

PyAsyncGenASend and PyAsyncGenAThrow

PyAsyncGenASend类似生成器对象驱动__anext__ and asend() 方法,实装了异步迭代协议。
agen.asend(val) 和agen.__anext__() 返回一个PyAsyncGenASend对象的一个引用。 (它将引用保存回父类agen对象。)
数据流定义如下:
1.首次调用PyAsyncGenASend.send(val)时, val将推入到父类agen对象 (PyGenObject利用现有对象。)
对PyAsyncGenASend对象进行后续迭代,将None推送到agen。
2.首次调用_PyAsyncGenWrappedValue对象时,它将被拆箱,并且以未被装饰的值作为参数会引发StopIteration异常。
3.异步生成器中的return语句引发StopAsyncIteration异常,该异常通过PyAsyncGenASend.send()和PyAsyncGenASend.throw()方法传播。
4.PyAsyncGenAThrow与PyAsyncGenASend非常相似。 唯一的区别是PyAsyncGenAThrow.send()在第一次调用时会向父类agen对象抛出异常(而不是将值推入其中。)

新的标准库方法和Types

1.types.AsyncGeneratorType -- 判断是否是异步生成器对象
2.sys.set_asyncgen_hooks()和 sys.get_asyncgen_hooks()--
在事件循环中设置异步生成器终结器和迭代拦截器。
3.inspect.isasyncgen()和 inspect.isasyncgenfunction() :方法内省。
4.asyncio加入新方法:loop.shutdown_asyncgens().
5.collections.abc.AsyncGenerator:抽象基类的添加。

是否支持向后兼容

该提案完全支持向后兼容
在python3.5,async def里使用yield会报错,因此在python3.6引入了安全的异步生成器

性能展示

常规生成器

import time


def gen():
    i = 0
    while i < 100000000:
        yield i
        i += 1


if __name__ == '__main__':
    start = time.time()
    list(gen())
    end = time.time()
    print("totals time", end - start)

输出

totals time 14.837260007858276

15s左右

异步迭代器的改进

import time
import asyncio

N = 10 ** 7
class AIter:
    def __init__(self):
        self.i = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        if i >= N:
            raise StopAsyncIteration
        self.i += 1
        return i

async def start():
    [_ async for _ in AIter()]
if __name__ == '__main__':
    s=time.time()
    loop=asyncio.get_event_loop()
    try:
     loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()
    e=time.time()
    print("total time",e-s)

输出

total time 5.441649913787842

很明显迭代异步生成器的速度比迭代普通生成器不只是快了两倍。
我们可以做一个更简单的异步生成器

import time
import asyncio

async def ticker(delay, to):
    """Yield numbers from 0 to `to` every `delay` seconds."""
    for i in range(to):
        yield i
        await asyncio.sleep(delay)

async def start():
    async for item in ticker(0.000001,100):
        print(item)
if __name__ == '__main__':
    s=time.time()
    loop=asyncio.get_event_loop()
    try:
     loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()
    e=time.time()
    print("total time",e-s)

设计中要注意的事项

内建函数:aiter() and anext()
最初,PEP 492将__aiter__定义为应返回等待对象的方法,从而产生异步迭代器。
但是,在CPython 3.5.2中,重新定义了__aiter__可以直接返回异步迭代器。
为了避免破坏向后兼容性,决定Python 3.6将支持两种方式:__aiter__仍然可以在发出DeprecationWarning时返回等待状态。由于Python 3.6中__aiter__的这种双重性质,我们无法添加内置的aiter()的同步实现。 因此,建议等到Python 3.7。

异步list/dict/set 推导式

将放在单独的pep中也就是后来的pep530.

异步yield from

对于异步生成器,yield from也不那么重要,因为不需要提供在协程之上实现另一个协同程序协议的机制。为了组合异步生成器,可以使用async for简化这个过程:

async def g1():
    yield 1
    yield 2

async def g2():
    async for v in g1():
        yield v

为了asend()和athrow()是必须的

它们可以使用异步生成器实现类似于contextlib.contextmanager的概念。 例如,可以实现以下模式:

@async_context_manager
async def ctx():
    await open()
    try:
        yield
    finally:
        await close()

async with ctx():
    await ...

另一个原因是从__anext__对象返回的对象来推送数据并将异常抛出到异步生成器中,很难正确地执行此操作。 添加显式的asend()和athrow()更获取异常后的数据。
在实现方面,asend()是__anext__更通用的版本,而athrow()与aclose()非常相似。 因此,为异步生成器定义这些方法不会增加任何额外的复杂性。

代码示例

async def ticker(delay, to):
    for i in range(to):
        yield i
        await asyncio.sleep(delay)


async def run():
    async for i in ticker(1, 10):
        print(i)


import asyncio
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(run())
finally:
    loop.close()

这代码将打出0-9,每个数字之间的间隔为1s。

提议者

Guido, 2016年9月6日

参考资料


[1] https://github.com/1st1/cpython/tree/async_gen
[2] https://mail.python.org/pipermail/python-dev/2016-September/146267.html
[3] http://bugs.python.org/issue28003

本文章翻译整理自,pep525
Source: https://github.com/python/peps/blob/master/pep-0525.txt
本文由CXA翻译自:2019年1月25日,翻译能力有限还请大家多多指教。

01-25 20:50