创建连接及RabbitMQ结构体实例代码见 https://www.cnblogs.com/prince5460/p/11895844.html

1.创建话题模式RabbitMQ实例

func NewRabbitMQTopic(exchangeName, routingKey string) *RabbitMQ {
    //创建RabbitMQ实例
    rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
    var err error
    //获取connection
    rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
    rabbitmq.failOnErr(err, "Failed to connect rabbitmq!")
    //获取channel
    rabbitmq.channel, err = rabbitmq.conn.Channel()
    rabbitmq.failOnErr(err, "Failed to open a channel!")
    return rabbitmq
}

2.话题模式发送消息

func (r *RabbitMQ) PublishTopic(message string) {
    //1.尝试创建交换机
    err := r.channel.ExchangeDeclare(
        r.Exchange,
        //要改成topic
        "topic",
        true,
        false,
        false,
        false,
        nil,
    )
    r.failOnErr(err, "Failed to declare an exchange!")

    //2.发送消息
    err = r.channel.Publish(
        r.Exchange,
        //要设置
        r.key,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(message),
        })
}

3.话题模式接收消息

//要注意key规则
//其中"*"用于匹配一个单词,"#"用于匹配多个单词(可以是零个)
//匹配test.*表示匹配test.hello,但是test.hello.one需要用test.#才能匹配到
func (r *RabbitMQ) ReceiveTopic() {
    //1.试探性创建交换机
    err := r.channel.ExchangeDeclare(
        r.Exchange,
        //交换机类型
        "topic",
        true,
        false,
        //true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
        false,
        false,
        nil,
    )
    r.failOnErr(err, "Failed to declare an exchange!")

    //2.试探性创建队列,注意队列名称不要写
    q, err := r.channel.QueueDeclare(
        "", //随机生成队列名称
        false,
        false,
        true,
        false,
        nil,
    )
    r.failOnErr(err, "Failed to declare an exchange!")

    //3.绑定队列到exchange中
    err = r.channel.QueueBind(
        q.Name,
        //需要绑定key
        r.key,
        r.Exchange,
        false,
        nil,
    )

    //4.消费消息
    messages, err := r.channel.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )

    forever := make(chan bool)

    go func() {
        for d := range messages {
            log.Printf("Received a message :%s", d.Body)
        }
    }()

    fmt.Println("[*] Waiting for messages,To exit press CTRL+C")
    <-forever
}

4.测试代码

  • Publish
package main

import (
    "fmt"
    "go-rabbitmq/RabbitMQ"
    "strconv"
    "time"
)

func main() {
    testOne := RabbitMQ.NewRabbitMQTopic("exTestTopic", "test.topic.one")
    testTwo := RabbitMQ.NewRabbitMQTopic("exTestTopic", "test.topic.two")
    for i := 0; i <= 10; i++ {
        testOne.PublishTopic("Hello test topic one:" + strconv.Itoa(i))
        testTwo.PublishTopic("Hello test topic two:" + strconv.Itoa(i))
        time.Sleep(time.Second)
        fmt.Println("publish:", i)
    }
}
  • ReceiveAll
package main

import "go-rabbitmq/RabbitMQ"

func main() {
    testOne := RabbitMQ.NewRabbitMQTopic("exTestTopic","#")
    testOne.ReceiveTopic()
}
  • ReveiveOne
package main

import "go-rabbitmq/RabbitMQ"

func main() {
    testOne := RabbitMQ.NewRabbitMQTopic("exTestTopic", "test.*.one")
    testOne.ReceiveTopic()
}
12-30 15:22