消息队列

消息队列”是在消息的传输过程中保存消息的容器。
消息队列最经典的用法就是消费者和生成者之间通过消息管道来传递消息,消费者和生成者是不通的进程。生产者往管道中写消息,消费者从管道中读消息。
操作系统提供了很多机制来实现进程间的通信 ,multiprocessing模块就提供了Queue和Pipe两种方法来实现。

使用multiprocessing里面的Queue来实现消息队列


from multiprocessing import Queue

q = Queue

q.put(data)

data = q.get(data)

示例代码如下:

点击(此处)折叠或打开

  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # Author :Alvin.xie
  4. # @Time :2017-12-15 9:34
  5. # @file :1.py


  6. from multiprocessing import Process, Queue
  7. import os, time, random
  8. # 写数据进程执行的代码:
  9. def write(q):
  10.     print('Process to write: %s' % os.getpid())
  11.     for value in ['A', 'B', 'C']:
  12.         print('Put %s to queue...' % value)
  13.         q.put(value)
  14.         time.sleep(random.random())
  15. # 读数据进程执行的代码:
  16. def read(q):
  17.     print('Process to read: %s' % os.getpid())
  18.     while True:
  19.         value = q.get(True)
  20.         print('Get %s from queue.' % value)

  21. # 父进程创建Queue,并传给各个子进程
  22. if __name__=='__main__':
  23.     q = Queue()
  24.     pw = Process(target=write, args=(q,))
  25.     pr = Process(target=read, args=(q,))
  26.     # 启动子进程pw,写入:
  27.     pw.start()
  28.     # 启动子进程pr,读取:
  29.     pr.start()
  30.     # 等待pw结束:
  31.     pw.join()
  32.     # pr进程里是死循环,无法等待其结束,只能强行终止:
  33.     pr.terminate()
执行结果如下:
Process to read: 7760
Process to write: 7964
Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

通过Mutiprocess里面的Pipe来实现消息队列:
1,  Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。
2,  send和receive方法分别是发送和接受消息的方法。close方法表示关闭管道,当消息接受结束以后,关闭管道。

代码示例如下:

点击(此处)折叠或打开

  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # Author :Alvin.xie
  4. # @Time :2017-12-15 10:04
  5. # @file :2.py

  6. from multiprocessing import Process, Pipe
  7. import time

  8. def proc1(pipe):
  9.     for i in xrange(1, 10):
  10.         pipe.send(i)
  11.         print("send {0} to pipe".format(i))
  12.         time.sleep(2)

  13. def proc2(pipe):
  14.     n = 9
  15.     while n>0:
  16.         result = pipe.recv()
  17.         print ("recv {0} from pipe".format(result))
  18.         n -= 1


  19. def main():
  20.     pipe = Pipe(duplex=False)
  21.     print (type(pipe))
  22.     p1 = Process(target=proc1, args=(pipe[1],))
  23.     p2 = Process(target=proc2, args=(pipe[0],))
  24.     p1.start()
  25.     p2.start()
  26.     p1.join()
  27.     p2.join()
  28.     pipe[0].close()
  29.     pipe[1].close()

  30. if __name__ == '__main__':
  31.     main()
执行结果:

send 1 to pipe
recv 1 from pipe
send 2 to pipe
recv 2 from pipe
recv 3 from pipe
send 3 to pipe
send 4 to pipe
recv 4 from pipe
send 5 to pipe
recv 5 from pipe
send 6 to pipe
recv 6 from pipe
send 7 to pipe
recv 7 from pipe
send 8 to pipe
recv 8 from pipe
send 9 to pipe
recv 9 from pipe










10-18 06:21
查看更多