简介

因为最近一段时间需要研究一些openstack相关的东西,在阅读一些相关代码的时候碰到很多python特定的一些特性,比如generator, coroutine以及一些相关的类库,比如eventlet, greenlet。在openstack里引用的第三方类库非常多,这些特性和类库看起来还比较复杂。如果需要对openstack里面某些特性的实现非常熟悉的话,就需要对这些牵涉到的基础的东西有个很好的了解。这里就针对coroutine的特性和它的使用做一个总结。

coroutine的定义和使用

在前面一篇关于generator的文章里,我提到了怎么定义和使用generator。当时我们使用yield value或者和list comprehension类似的语法来定义generator。我们返回的generator其实是一个可以不断取得数据的集合,所以一般使用它们的代码一般是一个循环。在这种情况下,我们感觉像是首先定义了一组管道,然后在真正需要的时候才在代码里去提取它们。

实际上,coroutine和generator还是很有关系的,我们来看如下的代码:

def grep(pattern):
print "Looking for %s" % pattern
while True:
line = (yield)
if pattern in line:
print line,

这部分代码看起来和generator很像,不过又不同。这里有一个line = (yield) 的语句。而在generator里,我们是需要yield value来返回值的。而这里后面根据获得的值还可以打印出来了。我们使用它们的代码如下:

if __name__ == '__main__':
g = grep("python")
g.next()
g.send("Yeah, but no, but yeah, but no")
g.send("A series of tubes")
g.send("python generators rock!")

这里,我们定义了方法grep,然后调用一个send方法。代码执行的输出结果如下:

Looking for python
python generators rock!

结合前面代码的部分,看起来好像是yield部分后面会针对包含有python这个串的字符进行处理。而没有的则不会处理。看来这个yield像是有什么玄机,看起来不简单。

实际上,我们在使用line = (yield)这部分就是定义了一个coroutine。coroutine是什么呢?coroutine可以说是一种实现协作式编程的手法。它可以设置有多个入口点和恢复执行的点,可以实现一些执行流程的转移。这部分概念看起来有点难懂。我们以前面的这部分代码为例来分析一下想关的概念。

在我们代码中,定义的yield这个部分相当于等待接收数据。所以在没有数据到来的时候,它就相当于被阻塞,等在那里。而为了触发这个部分,我们在使用的代码里首先用g.next()来初始化它。然后通过g.send("")方法将数据发送给它。这样,yield返回的就是send方法里带的参数了。然后我们可以接着在循环部分来处理它。这样看来,coroutine更多的是一个数据消费者的角色。每次都是等数据过来,来了之后就通过yield部分返回,然后处理。否则就等在那里。

coroutine的几个应用

因为coroutine相当于一个数据的消费者,我们可以做一种这样的应用。首先是一个生产者将一些数据准备好,然后将数据发送给一个coroutine来处理。我们来看一个文件处理的示例:

import time
def follow(thefile, target):
thefile.seek(0,2) # Go to the end of the file
while True:
line = thefile.readline()
if not line:
time.sleep(0.1) # Sleep briefly
continue
target.send(line)

这里是一个读取文件,然后将文件内容通过target发送到target来处理的过程。所以我们这里要做的就是将target作为一个参数传入给follow方法。当然,target在这里必须是一个coroutine,它来接收和处理这些数据。target的定义如下:

@coroutine
def printer():
while True:
line = (yield)
print line,

这里用到了yield,然后就可以直接将接收到的数据打印出来了。当然,还有一个值得注意的地方是用到了一个@coroutine的decorator。因为在每次使用coroutine之前我们需要调用一次target.next()或者target.send(None)来初始化它。这样我们使用的时候很容易忘记这一步,一种办法就是定义好一个这样的decorator,然后每次将这个decorator加上就保证这一步被执行了。@coroutine decorator的定义如下:

def coroutine(func):
def start(*args,**kwargs):
cr = func(*args,**kwargs)
cr.next()
return cr
return start

对于这部分decorator理解如果有问题的话可以参考我decorator相关的这篇文章。我们最终使用前面这部分的代码如下:

if __name__ == '__main__':
f = open("access-log")
follow(f,printer())

当然,这个示例主要讲的是使用一个coroutine来处理一个传递过来的消息。如果我们要构造类似于pipeline的东西,可以将一个coroutine同时当作数据处理的部分,也可以当作数据传递的部分,比如看如下的代码:

@coroutine
def grep(pattern,target):
while True:
line = (yield) # Receive a line
if pattern in line:
target.send(line) # Send to next stage

这里我们定义的grep方法在接收到数据之后,相当于做了一个判断,如果pattern在传入的line中间,则将这个数据传递给下一个coroutine处理。这就实现了一个pipeline的雏形。当然,除了这种传输的示例,我们也可以将消息传递给多个coroutine。这些示例我们可以参考后面的一些资料去做详细的分析。

关于控制传递

在wiki上关于coroutine的介绍,引用了经典的producer-consumer问题。以前针对这个问题,更多的是针对多线程的producer-consumer问题处理。在那种场景下,我们需要通过锁或者某些互斥变量来实现对队列元素的处理。而实际上,用coroutine来解决这个问题也是一种很理想的方法。在wiki上给出的伪代码是这样的:

coroutine produce
loop
while q is not full
create some new items
add the items to q
yield to consume
coroutine consume
loop
while q is not empty
remove some items from q
use the items
yield to produce

这些代码里最有意思的地方就是,我们完全可以用python coroutine的方式来做一个实现。比如说,我们可以将queue里元素的个数作为传递的参数。作为consumer,可以采用这样的方式来写:

@coroutine
def consumer(target):
while True:
items = (yield)
if items > 0:
remove item from queue
items -= 1
target.send(items)

作为producer也可以采用类似的方法来写。这里就不再赘述了。当然,还有一个要注意的地方就是,这里虽然实现了控制的转移,但是并没有牵涉到线程的变换,这里所有的一切其实还是在同一个进程里执行的。只不过这种控制转移的方式在很多地方有比较高效率的应用,它在实现事件循环和分发、非阻塞IO访问方面有很多的应用。我也会在后续的一些文章里针对它们的一些应用做进一步的分析。

总结

coroutine是一个实现多个任务之间互相切换的手段,它相当于一种将一个当前执行的结果传递给另外一个过程。和generator的使用过程比起来,它更像是一种“推”模式。因为我们要使用一个coroutine的时候,必然是需要有其它的过程send数据过来。因为yield的过程有点类似于一个操作系统里中断的概念,它相当于将一个进程的当前执行过程暂停,然后跳转到另外一个过程。这种过程和我们传统通过栈实现的子过程调用很不一样,所以表面上理解起来还是有点困难。

参考材料

http://en.wikipedia.org/wiki/Coroutine

http://www.dabeaz.com/coroutines/

from:http://www.myexception.cn/perl-python/1673253.html

05-06 19:04