<!-- TOC -->

<!-- /TOC -->

代码

#!/usr/bin/python
# -*- coding:utf-8 -*-
import pika
import hashlib
import json

def getMd5(input_str):
    """
    :param str input_str: Unicode-objects must be encoded before hashing
    :rtype: str
    """
    hash_obj = hashlib.md5(input_str.encode("utf-8"))
    return hash_obj.hexdigest()

class RabbitMQClient:
    """RabbitMQClient using pika library

    default: exchange type is 'topic', routing key is '#', dead letter exchange is 'DLX' and dead letter queue is 'DLQ'.
    """
    __default_exchange_type = "topic"
    # (hash) can substitute for zero or more words, * (star) can substitute for exactly one word.
    __default_routing_key = "#"
    __default_DeadLetterExchange = "DLX"
    __default_DeadLetterQueue = "DLQ"

    def __init__(self, username, password, host, port=5672):
        self.host = str(host)
        self.port = int(port)
        # set heartbeat=0, deactivate heartbeat default
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host,
			port=self.port, credentials=pika.PlainCredentials(username,password), heartbeat=0))
        self.channel = self.connection.channel()

    #
    # basic operations
    #

    def close_connection(self):
        self.connection.close()

    def declare_exchange(self, exchange, exchange_type=__default_exchange_type):
        self.channel.exchange_declare(exchange=exchange, exchange_type=exchange_type, durable=True)

    def delete_exchange(self, exchange):
        self.channel.exchange_delete(exchange=exchange)

    def declare_queue(self, queue):
        self.channel.queue_declare(queue=queue, durable=True)

    def declare_queue_dlx(self, queue, dlx=__default_DeadLetterQueue):
        self.channel.queue_declare(queue=queue, durable=True, arguments={'x-dead-letter-exchange': dlx})

    def declare_queue_ttl(self, queue, ttl_seconds):
        self.channel.queue_declare(queue=queue, durable=True, arguments={'x-message-ttl': ttl_seconds})

    def delete_queue(self, queue):
        self.channel.queue_delete(queue=queue)

    def bind_exchange_queue(self, queue, exchange, binding_key=__default_routing_key):
        self.channel.queue_bind(queue=queue, exchange=exchange, routing_key=binding_key)

    #
    # combined operations
    #

    def declare_dlx_dlq(self, dlx=__default_DeadLetterExchange, dlq=__default_DeadLetterQueue):
        """
        :param str dlx: dead letter exchange
        :param str dlq: dead letter queue
        """

        self.declare_exchange(exchange=dlx, exchange_type='fanout')
        self.declare_queue(queue=dlq)
        self.bind_exchange_queue(exchange=dlx, queue=dlq)

    def publish(self, message, exchange, queue, routing_key, message_id=None,
        close_connection=True):
        """
        publish messages with message_id, disk persistency property
        """

        if message_id is None:
            message_id = getMd5(input_str=message)
        self.declare_queue(queue=queue)
        self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message,
            properties=pika.BasicProperties(delivery_mode=2,message_id=message_id,content_type="application/json"))
        if close_connection:
            self.close_connection()

    def consume(self, callback, queue, dlx=__default_DeadLetterExchange, dlq=__default_DeadLetterQueue,
        exclusive=False, consumer_tag=None,**kwargs):
        self.declare_dlx_dlq(dlx=dlx, dlq=dlq)
        self.channel.basic_consume(queue=queue, on_message_callback=callback, exclusive=exclusive,
            consumer_tag=consumer_tag,**kwargs)
        try:
            self.channel.start_consuming()
        except KeyboardInterrupt:
            self.channel.stop_consuming()
            self.close_connection()


    @staticmethod
    def ack_message(channel, method):
        channel.basic_ack(delivery_tag=method.delivery_tag)

    @staticmethod
    def reject_to_dlx(channel, method):
        """
        need the queue from which message is consuming has dead letter exchage property
        """
        channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

    @staticmethod
    def transmit(channel, method, properties, message, exchange=__default_DeadLetterExchange,
        routing_key=__default_routing_key, queue=__default_DeadLetterQueue,handler=None):
        if handler is not None:
            message = handler(message)
        message_id = properties.message_id
        if message_id is None:
            message_id = getMd5(input_str=message)
        channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message,
            properties=pika.BasicProperties(delivery_mode=2,message_id=message_id,content_type="application/json"))
        channel.basic_ack(delivery_tag=method.delivery_tag)
#
### Testing
#
def callback(ch, method, properties, body):
    print("consumer_tag %r, consume_func %r, %r" % (method.consumer_tag, method.routing_key, properties.message_id))
    # RabbitMQClient.transmit(channel=ch, method=method, properties=properties, message=str(body, 'utf-8'), handler=handler)
    RabbitMQClient.ack_message(channel=ch, method=method)

def handler(input_str):
    return "hadled"+input_str

if __name__ == "__main__":
    mqc = RabbitMQClient(username='xxx',password='xxx',host='xxx',port=5672)
    msg = json.dumps({'a':'aaa'})
    queue = "DLQ"
    # mqc.publish(message=msg, exchange='', routing_key=queue, queue=queue)
    # mqc.consume(callback=callback, queue=queue, consumer_tag='consumer-1')
    print("==done==")

参考

注意

  • Connection

    • a real TCP connection to the message broker,It is designed to be long-lived。
    • 设置connection属性heartbeat=0, deactivate heartbeat,这样连接就不会超时,一直保持。
  • Channel

    • a virtual connection (AMPQ connection) inside a Connection,designed to be transient。
    • 一个Channel下设置一个consumer
      • Channel instances must not be shared between threads. Channels are not generally thread-safe as it would make no sense to share them among threads. If you have another thread that needs to use the broker, a new channel is needed.
  • Exchange, Routing_key, Queue

    • Exchange --- Routing_key ---> Queue
    • topic形式的Exchange几乎可以模拟其他的所有模式。
        • (star) can substitute for exactly one word. # (hash) can substitute for zero or more words.
      • When special characters "*" (star) and "#" (hash) aren't used in bindings, the topic exchange will behave just like a direct one => direct模式
      • When a queue is bound with "#" (hash) binding key - it will receive all the messages, regardless of the routing key => like in fanout exchange
    • Queue 可以设置TTL(Time To Live)属性,设置一定时间后,消息自动移除队列,单位为妙。
      • queue_declare(queue=queue, durable=True, arguments={'x-message-ttl': ttl_seconds})
  • Publisher

    • 发布、消费消息之前都需要声明所需的Exchange Queue。
    • 推荐每个消息都设置其 message_id 属性,方便后续业务追踪、打点等。
  • Consumer

    • 多消费者 Round-Robin(RR)公平调度消费

      • Each Consumer runs in its own thread allocated from the consumer thread pool. If multiple Consumers are subscribed to the same Queue(In different channnels), the broker uses round-robin to distribute the messages between them equally.
    • 对应消费者,最好在消费消息前指定死亡信件交换器和死信队列(DLX:dead letter exchange, DLQ:dead letter queue)。在消费过程中,不能够处理或处理出现异常的消息可以转发至DLX和DLQ。在转发前也可以对消息进行特定的处理和包装。(如果声明队列的时候指定了DLX属性,如arguments={'x-dead-letter-exchange': dlx}, 消费者在消费时可以直接reject消息,被拒绝的消息会直接到DLX, 这样的好处是不用自己写转发逻辑,缺点是不够灵活,不能够对消息进行处理和包装。)

<全文完>

03-13 21:42