<!-- TOC -->
<!-- /TOC -->
代码
#!/usr/bin/python
# -*- coding:utf-8 -*-
import pika
import hashlib
import json
def getMd5(input_str):
"""
:param str input_str: Unicode-objects must be encoded before hashing
:rtype: str
"""
hash_obj = hashlib.md5(input_str.encode("utf-8"))
return hash_obj.hexdigest()
class RabbitMQClient:
"""RabbitMQClient using pika library
default: exchange type is 'topic', routing key is '#', dead letter exchange is 'DLX' and dead letter queue is 'DLQ'.
"""
__default_exchange_type = "topic"
# (hash) can substitute for zero or more words, * (star) can substitute for exactly one word.
__default_routing_key = "#"
__default_DeadLetterExchange = "DLX"
__default_DeadLetterQueue = "DLQ"
def __init__(self, username, password, host, port=5672):
self.host = str(host)
self.port = int(port)
# set heartbeat=0, deactivate heartbeat default
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host,
port=self.port, credentials=pika.PlainCredentials(username,password), heartbeat=0))
self.channel = self.connection.channel()
#
# basic operations
#
def close_connection(self):
self.connection.close()
def declare_exchange(self, exchange, exchange_type=__default_exchange_type):
self.channel.exchange_declare(exchange=exchange, exchange_type=exchange_type, durable=True)
def delete_exchange(self, exchange):
self.channel.exchange_delete(exchange=exchange)
def declare_queue(self, queue):
self.channel.queue_declare(queue=queue, durable=True)
def declare_queue_dlx(self, queue, dlx=__default_DeadLetterQueue):
self.channel.queue_declare(queue=queue, durable=True, arguments={'x-dead-letter-exchange': dlx})
def declare_queue_ttl(self, queue, ttl_seconds):
self.channel.queue_declare(queue=queue, durable=True, arguments={'x-message-ttl': ttl_seconds})
def delete_queue(self, queue):
self.channel.queue_delete(queue=queue)
def bind_exchange_queue(self, queue, exchange, binding_key=__default_routing_key):
self.channel.queue_bind(queue=queue, exchange=exchange, routing_key=binding_key)
#
# combined operations
#
def declare_dlx_dlq(self, dlx=__default_DeadLetterExchange, dlq=__default_DeadLetterQueue):
"""
:param str dlx: dead letter exchange
:param str dlq: dead letter queue
"""
self.declare_exchange(exchange=dlx, exchange_type='fanout')
self.declare_queue(queue=dlq)
self.bind_exchange_queue(exchange=dlx, queue=dlq)
def publish(self, message, exchange, queue, routing_key, message_id=None,
close_connection=True):
"""
publish messages with message_id, disk persistency property
"""
if message_id is None:
message_id = getMd5(input_str=message)
self.declare_queue(queue=queue)
self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message,
properties=pika.BasicProperties(delivery_mode=2,message_id=message_id,content_type="application/json"))
if close_connection:
self.close_connection()
def consume(self, callback, queue, dlx=__default_DeadLetterExchange, dlq=__default_DeadLetterQueue,
exclusive=False, consumer_tag=None,**kwargs):
self.declare_dlx_dlq(dlx=dlx, dlq=dlq)
self.channel.basic_consume(queue=queue, on_message_callback=callback, exclusive=exclusive,
consumer_tag=consumer_tag,**kwargs)
try:
self.channel.start_consuming()
except KeyboardInterrupt:
self.channel.stop_consuming()
self.close_connection()
@staticmethod
def ack_message(channel, method):
channel.basic_ack(delivery_tag=method.delivery_tag)
@staticmethod
def reject_to_dlx(channel, method):
"""
need the queue from which message is consuming has dead letter exchage property
"""
channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
@staticmethod
def transmit(channel, method, properties, message, exchange=__default_DeadLetterExchange,
routing_key=__default_routing_key, queue=__default_DeadLetterQueue,handler=None):
if handler is not None:
message = handler(message)
message_id = properties.message_id
if message_id is None:
message_id = getMd5(input_str=message)
channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message,
properties=pika.BasicProperties(delivery_mode=2,message_id=message_id,content_type="application/json"))
channel.basic_ack(delivery_tag=method.delivery_tag)
#
### Testing
#
def callback(ch, method, properties, body):
print("consumer_tag %r, consume_func %r, %r" % (method.consumer_tag, method.routing_key, properties.message_id))
# RabbitMQClient.transmit(channel=ch, method=method, properties=properties, message=str(body, 'utf-8'), handler=handler)
RabbitMQClient.ack_message(channel=ch, method=method)
def handler(input_str):
return "hadled"+input_str
if __name__ == "__main__":
mqc = RabbitMQClient(username='xxx',password='xxx',host='xxx',port=5672)
msg = json.dumps({'a':'aaa'})
queue = "DLQ"
# mqc.publish(message=msg, exchange='', routing_key=queue, queue=queue)
# mqc.consume(callback=callback, queue=queue, consumer_tag='consumer-1')
print("==done==")
参考
注意
-
Connection
- a real TCP connection to the message broker,It is designed to be long-lived。
- 设置connection属性heartbeat=0, deactivate heartbeat,这样连接就不会超时,一直保持。
-
Channel
- a virtual connection (AMPQ connection) inside a Connection,designed to be transient。
- 一个Channel下设置一个consumer
- Channel instances must not be shared between threads. Channels are not generally thread-safe as it would make no sense to share them among threads. If you have another thread that needs to use the broker, a new channel is needed.
-
Exchange, Routing_key, Queue
- Exchange --- Routing_key ---> Queue
- topic形式的Exchange几乎可以模拟其他的所有模式。
-
- (star) can substitute for exactly one word. # (hash) can substitute for zero or more words.
- When special characters "*" (star) and "#" (hash) aren't used in bindings, the topic exchange will behave just like a direct one => direct模式
- When a queue is bound with "#" (hash) binding key - it will receive all the messages, regardless of the routing key => like in fanout exchange
-
- Queue 可以设置TTL(Time To Live)属性,设置一定时间后,消息自动移除队列,单位为妙。
queue_declare(queue=queue, durable=True, arguments={'x-message-ttl': ttl_seconds})
-
Publisher
- 发布、消费消息之前都需要声明所需的Exchange Queue。
- 推荐每个消息都设置其 message_id 属性,方便后续业务追踪、打点等。
-
Consumer
-
多消费者 Round-Robin(RR)公平调度消费
- Each Consumer runs in its own thread allocated from the consumer thread pool. If multiple Consumers are subscribed to the same Queue(In different channnels), the broker uses round-robin to distribute the messages between them equally.
-
对应消费者,最好在消费消息前指定死亡信件交换器和死信队列(DLX:dead letter exchange, DLQ:dead letter queue)。在消费过程中,不能够处理或处理出现异常的消息可以转发至DLX和DLQ。在转发前也可以对消息进行特定的处理和包装。(如果声明队列的时候指定了DLX属性,如
arguments={'x-dead-letter-exchange': dlx}
, 消费者在消费时可以直接reject消息,被拒绝的消息会直接到DLX, 这样的好处是不用自己写转发逻辑,缺点是不够灵活,不能够对消息进行处理和包装。)
-
<全文完>