python 中的协程

从我个人的理解来说一说 python中的协程,我们知道 linux 中的线程比进程轻量级,切换成本低,协程比线程更轻量级。所以切换成本耕地,是基于生成器来实现的也就是 yield 语句,后来又有 yeild from 子协程的语法出现,生成器是迭代器,迭代器不是生成器,生成器能够输出值,也可以接收值,可以 hang/resume。当然在 python3.5 使用了新的语法 async/await, 本质没啥变化,仅仅是防止在语法上的混淆。可以进行隐式切换或者显式切换,在一个线程中实现多协程切换,asyncio 就是显式的来切换协程。


Asyncio 异步框架

asyncio 框架是建立在 epoll、poll、seledct等功能基之上的,下文统一用 epoll 代替,当然使用那种事件机制取决于操作系统,在使用asyncio时,大部分操作是用asyncio运行任务,运行任务时 asyncio 并没有使用epoll 机制,因为我们知道 epoll 是需要注册文件描述符的,是在使用协程,至于协程和 epoll 怎么结合运行的,下文会细说。epoll 是用来实现异步 web 框架用的。协程使用来运行用户的 task。


Asyncio 的运行流程

简单写一个异步任务,这个任务简单点,因为本篇文章主要讲的是 asyncio 的运行机制而不是 asyncio 的使用

import asyncio
async def print_hello_after3second():
    await asyncio.sleep(3)
    print("hello")
asyncio.run(print_hello_after3second)

这里使用的 run 这个接口,使用 asyncio 运行异步任务有很多种方式,run 我觉得更像是一个命令行,从外面看接口简单,其实内部帮忙做了很多事情.为了节省篇幅以及使得文章看起来清晰每个代码片段只截取重要部分,其余的省略。

asyncio/runners.py

#run 第一个参数要是个协程
def run(main, *, debug=False):
    # loop 理解成 epoll 就好
    events.set_event_loop(loop)
    #重点在这里
    loop.run_until_complete(loop.shutdown_asyncgens())
    .... 

asyncio/base_events.py

    def run_until_complete(self, future):
        ....
        # asyncio 会把我们传进来的任务封装成 task,也可以说是 future,task 是 future 的子类
        future = tasks.ensure_future(future, loop=self
        # 里面有 _run_once 是用来调度事件循环的
        self.run_forever()
        ....

asyncio/task.py

# ensure_future 也是一个传递任务的接口
def ensure_future(coro_or_future, *, loop=None):
        ....
        # 在调用 Task 类中的__init__方法进行初始化,同时将 Task 类中的 _step方法作为回掉函数注册到了事件循环中
        task = loop.create_task(coro_or_future)
        ....

asyncio/base_events.py

    #这个方法很重要所以在这里全部列出,里面包含了asyncio的调用思想,调度 task 和 epoll
    def _run_once(self):
        """Run one full iteration of the event loop.

        This calls all currently ready callbacks, polls for I/O,
        schedules the resulting callbacks, and finally schedules
        'call_later' callbacks.
        """
        print("run once")
        sched_count = len(self._scheduled)
        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
            self._timer_cancelled_count / sched_count >
                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
            # Remove delayed calls that were cancelled if their number
            # is too high
            new_scheduled = []

            for handle in self._scheduled:
                if handle._cancelled:
                    handle._scheduled = False
                else:
                    new_scheduled.append(handle)

            heapq.heapify(new_scheduled)
            self._scheduled = new_scheduled
            self._timer_cancelled_count = 0
        else:
            # Remove delayed calls that were cancelled from head of queue.
            while self._scheduled and self._scheduled[0]._cancelled:
                self._timer_cancelled_count -= 1
                handle = heapq.heappop(self._scheduled)
                handle._scheduled = False

        timeout = None
        if self._ready or self._stopping:
            timeout = 0
        elif self._scheduled:
            # Compute the desired timeout.
            when = self._scheduled[0]._when
            timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)

        if self._debug and timeout != 0:
            t0 = self.time()
            # 这里是在检查 epoll 事件
            event_list = self._selector.select(timeout)
            dt = self.time() - t0
            if dt >= 1.0:
                level = logging.INFO
            else:
                level = logging.DEBUG
            nevent = len(event_list)
            if timeout is None:
                logger.log(level, 'poll took %.3f ms: %s events',
                           dt * 1e3, nevent)
            elif nevent:
                logger.log(level,
                           'poll %.3f ms took %.3f ms: %s events',
                           timeout * 1e3, dt * 1e3, nevent)
            elif dt >= 1.0:
                logger.log(level,
                           'poll %.3f ms took %.3f ms: timeout',
                           timeout * 1e3, dt * 1e3)
        else:
            event_list = self._selector.select(timeout)
        #这里将从epoll中获取到可读可写的事件后,添加回掉函数到self._ready这个列表中,这个列表同时也包含了用户添加的异步任务,那么在什么时候添加进来的呢?
        self._process_events(event_list)

        # Handle 'later' callbacks that are ready.
        # 定时任务,可以使得异步任务在未来的某个事件点运行,用堆实现了优先级队列,按照时间排序
        end_time = self.time() + self._clock_resolution
        while self._scheduled:
            handle = self._scheduled[0]
            if handle._when >= end_time:
                break
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
            self._ready.append(handle)

        # This is the only place where callbacks are actually *called*.
        # All other places just add them to ready.
        # Note: We run all currently scheduled callbacks, but not any
        # callbacks scheduled by callbacks run this time around --
        # they will be run the next time (after another I/O poll).
        # Use an idiom that is thread-safe without using locks.
        ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            if handle._cancelled:
                continue
            if self._debug:
                try:
                    self._current_handle = handle
                    t0 = self.time()
                    #我们的异步任务对应的回掉函数被封装成了 handler 实例了,这个实例是协程安全的,
                    handle._run()
                    dt = self.time() - t0
                    if dt >= self.slow_callback_duration:
                        logger.warning('Executing %s took %.3f seconds',
                                       _format_handle(handle), dt)
                finally:
                    self._current_handle = None
            else:
                print("hanle %s", handle)
                handle._run()
        handle = None  # Needed to break cycles when an exception occurs.

总结一下,asyncio 将异步任务和epoll获取来的可读可写的回掉事件都放到了 self._ready 这个列表中统一运行。那么异步任务什么时候被放到 self._ready 这个列表中来的呢
asyncio/base_events.py

    #谁在调用这个函数呢,前文说过我们的异步任务都会被封装成 asyncio 中的 Task 类的 task 类的 __init__  中的这个方法调用了 call_soon , 那么 call_at 这种未来执行的任务呢?当然最终也会调用 call_soon 的,在运行时间到的时候。
    def call_soon(self, callback, *args, context=None):
        """Arrange for a callback to be called as soon as possible.

        This operates as a FIFO queue: callbacks are called in the
        order in which they are registered.  Each callback will be
        called exactly once.

        Any positional arguments after the callback will be passed to
        the callback when it is called.
        """
        self._check_closed()
        if self._debug:
            self._check_thread()
            self._check_callback(callback, 'call_soon')
        handle = self._call_soon(callback, args, context)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        return handle
 # 没错就是在这里进行添加和封装成 handler 的
 def _call_soon(self, callback, args, context):
        print("register")
        handle = events.Handle(callback, args, self, context)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        self._ready.append(handle)
        return handle

以上的一些说明只是讲解了 asyncio 如何运行用户侧 task 以及异步事件的,其实用户侧异步task,被隐藏在了epoll的概念中,这也是 asyncio 很高明之处。
到此仅仅说明了 asyncio 是如何调度 task 和 epoll 事件的回掉的执行。但是异步task的回掉在这里很重要,也就是上文提到的 _step 这个方法,这个方法在 Task 类中。这也关系到了 aio-libs 中这些python的异步库如何改造,下文会说如何自己实现python异步库的改造和编写。
asyncio/tasks.py

    #方法很重要,不进行删减
    def __step(self, exc=None):
        if self.done():
            raise futures.InvalidStateError(
                f'_step(): already done: {self!r}, {exc!r}')
        if self._must_cancel:
            if not isinstance(exc, futures.CancelledError):
                exc = futures.CancelledError()
            self._must_cancel = False
        # 这个coro 是我们添加进来的异步任务
        coro = self._coro
        self._fut_waiter = None

        _enter_task(self._loop, self)
        # Call either coro.throw(exc) or coro.send(None).
        try:
            if exc is None:
                # We use the `send` method directly, because coroutines
                # don't have `__iter__` and `__next__` methods.
                #触发异步任务运行
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        # 协程运行结束会抛出这个异常
        except StopIteration as exc:
            if self._must_cancel:
                # Task is cancelled right before coro stops.
                self._must_cancel = False
                super().set_exception(futures.CancelledError())
            else:
                # future(不打算翻译成中文啦,也不解释在编程语言或者python中什么意思),抛出 StopIteration 异常代表异步任务运行结束,设置结果给 future,很重要,不设置结果异步任务就停不下来了(哈哈!)
                super().set_result(exc.value)
        except futures.CancelledError:
            super().cancel()  # I.e., Future.cancel(self).
        except Exception as exc:
            super().set_exception(exc)
        except BaseException as exc:
            super().set_exception(exc)
            raise
        else:
            #result 是一个future实例
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking is not None:
                # Yielded Future must come from Future.__iter__().
                if futures._get_loop(result) is not self._loop:
                    new_exc = RuntimeError(
                        f'Task {self!r} got Future '
                        f'{result!r} attached to a different loop')
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)
                elif blocking:
                    if result is self:
                        new_exc = RuntimeError(
                            f'Task cannot await on itself: {self!r}')
                        self._loop.call_soon(
                            self.__step, new_exc, context=self._context)
                    else:
                        #注册wakeup到future的callback中,这个wakeup是用来提取future中的结果用的
                        result._asyncio_future_blocking = False
                        result.add_done_callback(
                            self.__wakeup, context=self._context)
                        self._fut_waiter = result
                        if self._must_cancel:
                            if self._fut_waiter.cancel():
                                self._must_cancel = False
          #小面除了 result 为none的分之外都是出现异常
                else:
                    new_exc = RuntimeError(
                        f'yield was used instead of yield from '
                        f'in task {self!r} with {result!r}')
                    print("call soon")
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)

            elif result is None:
                # Bare yield relinquishes control for one event loop iteration.
                self._loop.call_soon(self.__step, context=self._context)
            elif inspect.isgenerator(result):
                # Yielding a generator is just wrong.
                new_exc = RuntimeError(
                    f'yield was used instead of yield from for '
                    f'generator in task {self!r} with {result!r}')
                self._loop.call_soon(
                    self.__step, new_exc, context=self._context)
            else:
                # Yielding something else is an error.
                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
                self._loop.call_soon(
                    self.__step, new_exc, context=self._context)
        finally:
            _leave_task(self._loop, self)
            self = None  # Needed to break cycles when an exception occurs.

_run_once 方法是调度,调度 task 和 epoll 事件,_step 是在处理 await something() 这种语句,_step方法不是很好理解,用一段话总结一下 _step 做的事情。当然描述起来也是很抽象,
用户写的协程函数,会被 asyncio 封装成 task,协程函数作为 Task 类中 _step方法中的一个属性,_step又会被封装成 handler 作为异步事件被调用,每一个协程函数都有一个 future 和 wakeup 与之绑定,函数运行结果会设置到 future 中,wakeup 作为 future 的回掉被调用(真正调用的还是事件循环),当设置好结果后 wakeup 唤醒协程函数来提取结果,

async def a():
    await b() # 协程函数 b 有个future 和 wakeup 与之绑定

asyncio 无论是使用还是理解原理都是很难的,不像 golang 这种原生支持协程,python的协程经历了很慢长的步伐,不去理解背后的原理在使用过程会出现很多问题。asyncio 的生态也还不完善,有时需要自己去实现异步改造。所以理解 asyncio 背后的原理很重要,只有知道原理后才知道如何自己去改造或者写出与 asyncio 配套的工具。

参考

深入理解asyncio(二)

03-05 15:58