简介
消息与消息队列
消息(Message)是指应用于应用之间传送的数据,消息的类型包括文本字符串、JSON、XML、内嵌对象等等...
所谓 消息中间件 / 消息队列(Message Queue Middleware,简称MQ)是利用高效可靠的消息传递机制进行数据交流,同时可以基于数据通信来进行分布式系统的继承,消息中间件一般有两种传递模式:点对点(Point-to-Point)模式和发布/订阅(Pub/Sub)模式,点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输成为了可能,发布订阅模式定义了如何向一个内容节点发布和订阅内容,这个内容节点叫topic,这种模式可以满足消费者发布一个消息,多个消费者同时消费同一信息的需求。
性能对比
了解什么是AMQP协议?
其中:Exchange和Message Queue存在着绑定的关系。
AMQP核心概念
- Server:又称Broker,接受客户端的连接,实现AMQP实体服务;
- Connection:连接,应用于程序与Broker的网络连接;
- Channel:网络通道,几乎所有的操作都是在channel中进行的,channel是进行消息读写的通道,客户可以建立多个channel,每个channel代表一个会话任务;
- Message:消息,服务器与应用程序之间传送的数据,有Properties和Body组成,Properties可以对消息进行修饰,比如消息的优先级,延迟等高级特性,Body是消息体内容;
- Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个Virtrual host里面可以有若干个Exchange和Queue,用一个Virtrual host里面不能有相同名字的Exchange或Queue;
- Exchange:交换机,接收消息,很久路由键转发消息到绑定的队列;
- Exchange和Queue之间的虚拟连接,binding中可以包含touting key;
- Routing key:一个路由规则,虚拟机可用它来确定一个如何路由一个特定的消息;
- Queue:也成Message Queue,消息队列,保存消息并将他们转发给消费者;
消息中间的作用和使用场景
RabbitMQ的核心组件
Hello World
“我会Hello World!”,学习一门技术,先出hello world开始,我们来编写一个Java项目来使用RabbitMQ来实现消息的生产和消费,这样能让我们能够更好的理解RabbitMQ的作用和原理,RabbitMQ是消息代理,它负责接收并转发消息,我们可以将它视为邮局,将要发送的邮件都放在又想着,可以确保Mailperson现在活或者女生最终将邮件传递给别人,因此:RabbitMQ是一个邮箱,一个邮局和一个邮递员的实例。
- 生产意味着发送,发送消息的程序是生产者
- 队列就是RabbitMQ内部的邮箱名称,消息是存储在队列中的,尽管消息流经RabbitMQ和你的应用程序,生产者可以发送一个队列信息,许多消费者可以尝试从一个队列里接收数据
- 消费与接受者是同一个身份,一个消费者是一个程序,主要是等待接收信息
pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
我们称其为消息发布者(发送者)MessageSend和我们的消息消费者(接收者) MessageRecv。发布者将连接到RabbitMQ,发送一条消息,然后退出。
MessageSend.java
/**
* @Author 林必昭
* @Date 2019/11/29 22:12
* @descr
*/
public class MassageSend {
private final static String QUEUE_NAME = "myQueue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//连接本地计算机代理,也就是本地主机,如果我们想连到另一台计算机,这里只需要指定IP地址
factory.setHost("localhost");
try(Connection conn = factory.newConnection();
//创建一个通道,完成我们的业务逻辑
Channel channel = conn.createChannel()){
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message = "Hello World!";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [xxx] 发送消息: '" + message + "'");
}
}
}
MessageRecv.java
public class MessageRecv {
private final static String QUEUE_NAME = "myQueue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("消费者正在等待消息,退出请按CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [xxx] 接收到了: '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}