asyncio

  这是python3.4引入的标准库,直接内置对异步IO的支持。asyncio的编程模型就是一个消息循环。从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。

协程

  子程序,或者称为函数。在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。子程序调用是通过栈实现的,一个线程就是执行一个子程序。子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。

协程的定义

  协程看上去是子程序,但是有很大的不同。协程,执行过程中,在内部可中断,然后转而执行别的程序,在适当的时候再返回来接着执行。在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。下面举个例子:

def A():
print('')
print('')
print('') def B():
print('x')
print('y')
print('z')

假设由协程执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,结果可能是:

1
2
x
y
3
z

但是在A中是没有调用B的,所以不是函数的调用中断。因此,协程的调用比函数调用理解起来要难一些。

协程的优势

  协程最大的优势就是极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销。第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

  协程利用多个CPU的方法是,多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

协程的实现

  Python对协程的支持是通过generator(带有yield的函数)实现的。在generator中,我们不但可以通过for循环来迭代,还可以不断调用next()函数获取由yield语句返回的下一个值。Python的yield不但可以返回一个值,它还可以接收调用者发出的参数。看个实例:

def consumer():
r = 'k' # 定义一个空字符串
while True: # 设定一个循环
# 如果不调用consumer的send方法传入其参数给n,n将为None
n = yield r
if not n: # 如果满足条件,表示方法外并未调用send
return # 执行return,退出方法,返回空值
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK' def produce(c):
x = c.send(None) # 启动生成器,相当于调用了next(c)
print("This is qidong:", x)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
# 生产了东西,通过c.send(n)切换到consumer执行
r = c.send(n)
# consumer通过yield拿到消息,处理,又通过yield把结果传回
print('[PRODUCER] Consumer return: %s' % r)
# produce拿到consumer处理的结果,继续生产下一条消息
# produce决定不生产了,通过c.close()关闭consumer,整个过程结束
c.close() c = consumer()
produce(c)

从实例可以看出来,整个生产者消费者的流程是无锁的,只由一个线程执行,produceconsumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。

asyncio实例

  下面是用asyncio实现Hello world:

import asyncio

# @asyncio.coroutine把一个generator标记为coroutine(协程)类型
@asyncio.coroutine
def hello():
print("Hello world!")
# 异步调用asyncio.sleep(1):
# yield from语法可以让我们方便地调用另一个generator
# asyncio.sleep()也是一个coroutine
# 线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环
# 把asyncio.sleep(1)看成是一个耗时1秒的IO操作,在此期间,
# 主线程并未等待,而是去执行EventLoop中其他可以执行的coroutine了,因此可以实现并发执行
yield from asyncio.sleep(1)
print("Hello again!") # 从asyncio模块中直接获取EventLoop
loop = asyncio.get_event_loop()
# 执行coroutine
loop.run_until_complete(hello())
loop.close()

用Task封装两个coroutine试试:

import asyncio
import threading @asyncio.coroutine
def hello():
print('Hello world! (%s)' % threading.currentThread())
yield from asyncio.sleep(1)
print('Hello again! (%s)' % threading.currentThread()) # 从asyncio模块中直接获取EventLoop
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

运行结果是:

Hello world! (<_MainThread(MainThread, started 7116)>)
Hello world! (<_MainThread(MainThread, started 7116)>)
Hello again! (<_MainThread(MainThread, started 7116)>)
Hello again! (<_MainThread(MainThread, started 7116)>)

可以看出来,两个协程是由同一个线程并发执行的。把asyncio.sleep()换成真正的IO操作,则多个coroutine就可以由一个线程并发执行。

asyncio网络实例

  用asyncio的异步网络连接来获取sina、sohu和163的网站首页:

import asyncio

@asyncio.coroutine
def wget(host):
print('wget %s...' % host)
connect = asyncio.open_connection(host, 80)
reader, writer = yield from connect
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
writer.write(header.encode('utf-8'))
# drain()在事件循环中刷新缓冲区,特别是在数据量很大的情况下,保证数据完整性
yield from writer.drain()
while True:
line = yield from reader.readline()
if line == b'\r\n':
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# Ignore the body, close the socket
writer.close() loop = asyncio.get_event_loop()
tasks = [wget(host)
for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

可见3个连接由一个线程通过coroutine并发完成。

总结 

  asyncio模块提供了完善的异步IO支持。异步操作需要在coroutine(协程)中通过yield from完成。多个coroutine可以封装成一组Task然后并发执行。

  注意在python3.5之后,可以把@asyncio.coroutine替换为async,yield from替换为await。使代码更简洁易懂。

05-11 22:59