OpenStack Mitaka从零开始——消息队列基础

安装客户端python-pika
python-pika.spec  redhat 6

点击(此处)折叠或打开

  1. %{!?python_sitelib: %global python_sitelib %(%{__python} -c "from distutils.sysconfig import get_python_lib; print get_python_lib()")}
  2. %define short_name pika
  3. Name: python-%{short_name}
  4. Version: 0.10.0
  5. Release: 0%{?dist}
  6. Summary: AMQP 0-9-1 client library for Python
  7. Group: Development/Libraries
  8. License: MPLv1.1 or GPLv2
  9. URL: http://github.com/%{short_name}/%{short_name}
  10. # The tarball comes from here:
  11. # http://github.com/%{short_name}/%{short_name}/tarball/v%{version}
  12. # GitHub has layers of redirection and renames that make this a troublesome
  13. # URL to include directly.
  14. Source0: %{short_name}-%{version}.tar.gz
  15. BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
  16. BuildArch: noarch
  17. BuildRequires: python-setuptools
  18. BuildRequires: python-devel
  19. Requires: python-pyev
  20. Requires: python-tornado
  21. Requires: python-twisted
  22. Requires: python >= 2.6.5
  23. %description
  24. Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that
  25. tries to stay fairly independent of the underlying network support
  26. library.
  27. %prep
  28. %setup -q -n %{short_name}-%{version}
  29. %build
  30. %{__python} setup.py build
  31. %install
  32. %{__rm} -rf %{buildroot}
  33. %{__python} setup.py install -O1 --skip-build --root %{buildroot}
  34. install -D -m 644 LICENSE %{buildroot}%{_docdir}/%{name}-%{version}
  35. install -D -m 644 README.rst %{buildroot}%{_docdir}/%{name}-%{version}
  36. install -D -m 644 PKG-INFO %{buildroot}%{_docdir}/%{name}-%{version}
  37. %clean
  38. %{__rm} -rf %{buildroot}
  39. %files
  40. %defattr(-,root,root,-)
  41. %dir %{python_sitelib}/%{short_name}*
  42. %{python_sitelib}/%{short_name}*/*
  43. %doc README.rst
  44. %doc LICENSE
  45. %doc PKG-INFO
  46. %changelog
  47. * Sat Aug 20 2016 gcy - 0.10.0
  48. - update version 0.10.0
  49. * Tue Dec 13 2011 Daniel Aharon - 0.9.5-2
  50. - Patch pika/adapters/blocking_connection.py
  51. * Sun Apr 3 2011 Ilia Cheishvili - 0.9.5-1
  52. - Upgrade to version 0.9.5
  53. * Sun Mar 6 2011 Ilia Cheishvili - 0.9.4-1
  54. - Upgrade to version 0.9.4
  55. * Sat Feb 19 2011 Ilia Cheishvili - 0.9.3-1
  56. - Upgrade to version 0.9.3
  57. * Sat Oct 2 2010 Ilia Cheishvili - 0.5.2-1
  58. - Initial Package

安装服务端rabbitmq-server
rabbitmq配置(rpm包自己搞定)
cat /etc/rabbitmq/rabbitmq-env.conf 
# 文件位置
RABBITMQ_MNESIA_BASE=/data/rabbitmq_mnesia
# 监听IP 端口
RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
RABBITMQ_NODE_PORT=5673
如果上述配置无效,自己去init脚本里看怎么读配置文件的


rabbitmqctl add_vhost 添加一个vhost
rabbitmqctl add_user 添加一个用户
添加完就能用了



接下来是如何使用消息队列

先搞清楚几个比较重要的概念
1、什么是消息队列, 什么是Producer、Exchange、Queue、Consumer、Topic、Fanout、Routing key
http://www.tuicool.com/articles/ruEnUf

2、no_ack
http://my.oschina.net/moooofly/blog/143883

3、流量控制
http://www.tuicool.com/articles/RVR3eu   (Fair dispatch 公平分发 部分)
http://my.oschina.net/hncscwc/blog/195560
简介:默认状态下,RabbitMQ将第n个Message分发给第n个Consumer。当然n是取余后的。它不管Consumer是否还有unacked Message,只是按照这个默认机制进行分发。
那么如果有个Consumer工作比较重,那么就会导致有的Consumer基本没事可做,有的Consumer却是毫无休息的机会。那么,RabbitMQ是如何处理这种问题呢?
通过 basic.qos 方法设置 prefetch_count=1 。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。
换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。

现在上简单代码,直接用pika

点击(此处)折叠或打开

  1. #!/usr/bin/python
  2. # -*- coding: UTF-8 -*-

  3. import pika
  4. from pika import PlainCredentials
  5. from pika import ConnectionParameters


  6. def main():
  7.     user = 'phptest'
  8.     passwd = 'phptest'
  9.     vhost = 'phptest'
  10.     ip = '127.0.0.1'
  11.     port = 5673
  12.     identified = PlainCredentials(user, passwd)
  13.     paarameters = ConnectionParameters(ip, port, vhost, identified)
  14.     connection = pika.BlockingConnection(paarameters)
  15.     channel = connection.channel()
  16.     print 'connect success'
  17.     channel.exchange_declare(exchange='gcy2', auto_delete=True)
  18.     #channel._delivery_confirmation = 1
  19.     print 'start send data'
  20.     channel.basic_publish(exchange='gcy2',routing_key='a', body='wtffffff1')
  21.     channel.basic_publish(exchange='gcy2',routing_key='a', body='wtffffff2')
  22.     print 'end'


  23. if __name__ == '__main__':
  24.     main()

发送者部分非常简单
账号密码用PlainCredentials类封装
连接的参数用ConnectionParameters封装, rabbitmq的vhost相当于mysql实例一个库,用来互相隔离权限范围的
BlockingConnection类就是一个rabbitmq的连接,如果要用多连接的话,pika有一个pika-pool的库,在openstack里,server端是单连接的,只有发送端才用到了多连接。
不要看BlockingConnection是block打头就以为是block的了,实际上blockconnection是一个分装好的上层类,实际会调用下面的select pool epoll 甚至event。一般都直接使用BlockingConnection,openstack就是用的BlockingConnection(linux上会以epoll来处理socket数据,下次详细讲openstack的rpc通信的时候会详细说明)
channel这个玩意比较蛋碎,之前我看了很久就是为了看明白为什么不直接用connection还要在connection上封一层channel,后来大致明白,其实就是为了不多建立多个connection也能做隔离
数据实际是从connection获取到后分发到channel
生成channel后, 声明一个叫gcy2的 交换机(exchange), 默认的exchange_type是direct,即单点的,
然后发送数据 到gcy2 这个exchange, 接收者的routing_key是a
发送者代码完结


接收者会稍微复杂一些

点击(此处)折叠或打开

  1. def main():
  2.     user = 'phptest'
  3.     passwd = 'phptest'
  4.     vhost = 'phptest'
  5.     ip = '127.0.0.1'
  6.     port = 5673
  7.     identified = PlainCredentials(user, passwd)
  8.     paarameters = ConnectionParameters(ip, port, vhost, identified)
  9.     connection = pika.BlockingConnection(paarameters)
  10.     channel = connection.channel()
  11.     print 'connect success'
  12.     channel.queue_declare(queue='myqeueu')
  13.     channel.queue_bind(queue='myqeueu', exchange='gcy2', routing_key='a')
  14.     get_list = []
  15.     def callback(ch, method, properties, body):
  16.         print 'get body %s ' % body,
  17.         get_list.append([method.delivery_tag, body])
  18.         print method.consumer_tag
  19.         #ch.basic_ack(method.delivery_tag)
  20.         #ch.stop_consuming()
  21.     channel.basic_qos(prefetch_count=5)
  22.     tag1 = channel.basic_consume(callback, queue='myqeueu', no_ack=True)
  23.     tag2 = channel.basic_consume(callback, queue='myqeueu', no_ack=False)
  24.     print tag1
  25.     print tag2
  26.     def get_data():
  27.         while 1:
  28.             last_queue_size = len(get_list)
  29.             if last_queue_size >= 5:
  30.                 ret = get_list[:5]
  31.                 del get_list[:5]
  32.                 return ret
  33.             else:
  34.                 connection.process_data_events()
  35.                 if last_queue_size == len(get_list):
  36.                     ret = get_list[:5]
  37.                     del get_list[:5]
  38.                     return ret
  39.     while 1:
  40.         ret = get_data()
  41.         if ret:
  42.             print 'fucked ',
  43.             print ret[0][1]
  44.             #channel.basic_ack(ret[0][0])
  45.     print 'end'
  46.     connection.process_data_events()
  47.     connection.close()


  48. if __name__ == '__main__':
  49.     main()

上面没有使用网上常用的start_consuming()写法,这里的写法模仿了openstack的pika驱动的写法
上述代码的写法是有问题的,我们先一步步说明再解释错误



10-18 06:08