创建连接及RabbitMQ结构体实例代码见 https://www.cnblogs.com/prince5460/p/11895844.html

1.创建路由模式RabbitMQ实例

func NewRabbitMQRouting(exchangeName, routingKey string) *RabbitMQ {
    //创建RabbitMQ实例
    rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
    var err error
    //获取connection
    rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
    rabbitmq.failOnErr(err, "Failed to connect rabbitmq!")
    //获取channel
    rabbitmq.channel, err = rabbitmq.conn.Channel()
    rabbitmq.failOnErr(err, "Failed to open a channel!")
    return rabbitmq

}

2.路由模式发送消息

func (r *RabbitMQ) PublishRouting(message string) {
    //1.尝试创建交换机
    err := r.channel.ExchangeDeclare(
        r.Exchange,
        //要改成direct
        "direct",
        true,
        false,
        false,
        false,
        nil,
    )
    r.failOnErr(err, "Failed to declare an exchange!")

    //2.发送消息
    err = r.channel.Publish(
        r.Exchange,
        //要设置
        r.key,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(message),
        })
}

3.路由模式接收消息

func (r *RabbitMQ) ReceiveRouting() {
    //1.试探性创建交换机
    err := r.channel.ExchangeDeclare(
        r.Exchange,
        //交换机类型
        "direct",
        true,
        false,
        //true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
        false,
        false,
        nil,
    )
    r.failOnErr(err, "Failed to declare an exchange!")

    //2.试探性创建队列,注意队列名称不要写
    q, err := r.channel.QueueDeclare(
        "", //随机生成队列名称
        false,
        false,
        true,
        false,
        nil,
    )
    r.failOnErr(err, "Failed to declare an exchange!")

    //3.绑定队列到exchange中
    err = r.channel.QueueBind(
        q.Name,
        //需要绑定key
        r.key,
        r.Exchange,
        false,
        nil,
    )

    //4.消费消息
    messages, err := r.channel.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )

    forever := make(chan bool)

    go func() {
        for d := range messages {
            log.Printf("Received a message :%s", d.Body)
        }
    }()

    fmt.Println("[*] Waiting for messages,To exit press CTRL+C")
    <-forever
}

4.测试代码

  • Publish
package main

import (
    "fmt"
    "go-rabbitmq/RabbitMQ"
    "strconv"
    "time"
)

func main() {
    testOne := RabbitMQ.NewRabbitMQRouting("exTest", "test_one")
    testTwo := RabbitMQ.NewRabbitMQRouting("exTest", "test_two")
    for i := 0; i <= 10; i++ {
        testOne.PublishRouting("Hello test one:" + strconv.Itoa(i))
        testTwo.PublishRouting("Hello test two:" + strconv.Itoa(i))
        time.Sleep(time.Second)
        fmt.Println("publish:",i)
    }
}
  • Receive1
package main

import "go-rabbitmq/RabbitMQ"

func main() {
    testOne := RabbitMQ.NewRabbitMQRouting("exTest","test_one")
    testOne.ReceiveRouting()
}
  • Receive2
package main

import "go-rabbitmq/RabbitMQ"

func main() {
    testTwo := RabbitMQ.NewRabbitMQRouting("exTest","test_two")
    testTwo.ReceiveRouting()
}
01-01 03:03