1发布订阅模式
发送者
package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class PublishProduct {
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQ
factory.setHost("192.168.74.75");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
// 创建一个通道
channel = connection.createChannel();
//创建交换机
channel.exchangeDeclare("qy172-fanout-exchange", BuiltinExchangeType.FANOUT, true);
//创建队列,如果存在则不会创建
channel.queueDeclare("qy172-publish-queue01", true, false, false, null);
channel.queueDeclare("qy172-publish-queue02", true, false, false, null);
//交互机和队列绑定
channel.queueBind("qy172-publish-queue01", "qy172-fanout-exchange", "");
channel.queueBind("qy172-publish-queue02", "qy172-fanout-exchange", "");
// 创建消息内容
HashMap<String, Object> map = new HashMap<>();
map.put("name", "张三");
map.put("age", "22");
//把数据给交换机,让他分发给队列
channel.basicPublish("qy172-fanout-exchange", "", null, JSON.toJSONBytes(map));
System.out.println("发送成功");
} catch (IOException e) {
// 发生 IO 异常时抛出运行时异常
throw new RuntimeException(e);
} catch (TimeoutException e) {
// 发生超时异常时抛出运行时异常
throw new RuntimeException(e);
} finally {
if (channel != null) {
try {
// 关闭通道
channel.close();
} catch (IOException | TimeoutException e) {
// 发生 IO 或超时异常时抛出运行时异常
throw new RuntimeException(e);
}
}
if (connection != null) {
try {
// 关闭连接
connection.close();
} catch (IOException e) {
// 发生 IO 异常时抛出运行时异常
throw new RuntimeException(e);
}
}
}
}
}
2订阅个订阅者
订阅者1
package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {
public static void main(String[] args) throws Exception {
// 创建连接工厂对象
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
factory.setHost("192.168.74.75");
Connection connection = factory.newConnection();
// 创建一个 RabbitMQ 连接
Channel channel = connection.createChannel();
// 创建一个通道,用于与 RabbitMQ 之间的通信
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
// 创建一个消费者对象,并重写其方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消费消息的处理方法
String json = new String(body);
// 将消息内容转换为字符串
Map map = JSON.parseObject(json, Map.class);
// 使用 JSON 解析成 Map 对象
System.out.println("消息内容Consumer01"+map);
// 输出消息内容
}
};
channel.basicConsume("qy172-publish-queue01",true,consumer);
}
}
订阅者2
package com.aaa;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Consumer02 {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.74.75");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String json = new String(body);
Map map = JSON.parseObject(json, Map.class);
System.out.println("消息内容Consumer02" + map);
}
};
//订阅者2
channel.basicConsume("qy172-publish-queue02",true,consumer);
} catch (IOException | TimeoutException e) {
// 处理连接、通道创建或消费消息时可能抛出的异常
e.printStackTrace();
}
}
}
2路由模式
发送者
package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class PublishProduct {
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQ
factory.setHost("192.168.74.75");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
// 创建一个通道
channel = connection.createChannel();
//创建交换机,
channel.exchangeDeclare("qy172-router-exchange", BuiltinExchangeType.DIRECT, true);
//创建队列,如果存在则不会创建
channel.queueDeclare("qy172-router-queue01", true, false, false, null);
channel.queueDeclare("qy172-router-queue02", true, false, false, null);
//交互机和队列绑定
channel.queueBind("qy172-router-queue01", "qy172-router-exchange", "error");
channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "error");
channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "info");
channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "warning");
// 创建消息内容
HashMap<String, Object> map = new HashMap<>();
map.put("name", "张三");
map.put("age", "22");
//把数据给交换机,让他分发给队列
channel.basicPublish("qy172-router-exchange","error",null,JSON.toJSONBytes(map));
// channel.basicPublish("qy172-router-exchange","info",null,JSON.toJSONBytes(map));
System.out.println("发送成功");
} catch (IOException e) {
// 发生 IO 异常时抛出运行时异常
throw new RuntimeException(e);
} catch (TimeoutException e) {
// 发生超时异常时抛出运行时异常
throw new RuntimeException(e);
} finally {
if (channel != null) {
try {
// 关闭通道
channel.close();
} catch (IOException | TimeoutException e) {
// 发生 IO 或超时异常时抛出运行时异常
throw new RuntimeException(e);
}
}
if (connection != null) {
try {
// 关闭连接
connection.close();
} catch (IOException e) {
// 发生 IO 异常时抛出运行时异常
throw new RuntimeException(e);
}
}
}
}
}
接收者1
package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {
public static void main(String[] args) throws Exception {
// 创建连接工厂对象
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
factory.setHost("192.168.74.75");
Connection connection = factory.newConnection();
// 创建一个 RabbitMQ 连接
Channel channel = connection.createChannel();
// 创建一个通道,用于与 RabbitMQ 之间的通信
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
// 创建一个消费者对象,并重写其方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消费消息的处理方法
String json = new String(body);
// 将消息内容转换为字符串
Map map = JSON.parseObject(json, Map.class);
// 使用 JSON 解析成 Map 对象
System.out.println("消息内容Consumer01"+map);
// 输出消息内容
}
};
channel.basicConsume("qy172-router-queue01",true,consumer);
}
}
接收者2
package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {
public static void main(String[] args) throws Exception {
// 创建连接工厂对象
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
factory.setHost("192.168.74.75");
Connection connection = factory.newConnection();
// 创建一个 RabbitMQ 连接
Channel channel = connection.createChannel();
// 创建一个通道,用于与 RabbitMQ 之间的通信
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
// 创建一个消费者对象,并重写其方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消费消息的处理方法
String json = new String(body);
// 将消息内容转换为字符串
Map map = JSON.parseObject(json, Map.class);
// 使用 JSON 解析成 Map 对象
System.out.println("消息内容Consumer01"+map);
// 输出消息内容
}
};
channel.basicConsume("qy172-router-queue01",true,consumer);
}
}
3主题模式
发送者
package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class PublishProduct {
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQ
factory.setHost("192.168.74.75");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
// 创建一个通道
channel = connection.createChannel();
//创建交换机,
channel.exchangeDeclare("qy172-topic-exchange", BuiltinExchangeType.TOPIC, true);
//创建队列,如果存在则不会创建
channel.queueDeclare("qy172-topic-queue01", true, false, false, null);
channel.queueDeclare("qy172-topic-queue02", true, false, false, null);
//交互机和队列绑定
//主题匹配给这个
channel.queueBind("qy172-topic-queue01", "qy172-topic-exchange", "*.orange.*");
//主题,也匹配给这个
channel.queueBind("qy172-topic-queue02", "qy172-topic-exchange", "*.*.rabbit");
channel.queueBind("qy172-topic-queue02", "qy172-topic-exchange", "lazy.#");
// 创建消息内容
HashMap<String, Object> map = new HashMap<>();
map.put("name", "张三");
map.put("age", "22");
//把数据给交换机,让他分发给队列
channel.basicPublish("qy172-topic-exchange","lazy.orange.rabbit",null,JSON.toJSONBytes(map));
System.out.println("发送成功");
} catch (IOException e) {
// 发生 IO 异常时抛出运行时异常
throw new RuntimeException(e);
} catch (TimeoutException e) {
// 发生超时异常时抛出运行时异常
throw new RuntimeException(e);
} finally {
if (channel != null) {
try {
// 关闭通道
channel.close();
} catch (IOException | TimeoutException e) {
// 发生 IO 或超时异常时抛出运行时异常
throw new RuntimeException(e);
}
}
if (connection != null) {
try {
// 关闭连接
connection.close();
} catch (IOException e) {
// 发生 IO 异常时抛出运行时异常
throw new RuntimeException(e);
}
}
}
}
}
接收者1
package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {
public static void main(String[] args) throws Exception {
// 创建连接工厂对象
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
factory.setHost("192.168.74.75");
Connection connection = factory.newConnection();
// 创建一个 RabbitMQ 连接
Channel channel = connection.createChannel();
// 创建一个通道,用于与 RabbitMQ 之间的通信
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
// 创建一个消费者对象,并重写其方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消费消息的处理方法
String json = new String(body);
// 将消息内容转换为字符串
Map map = JSON.parseObject(json, Map.class);
// 使用 JSON 解析成 Map 对象
System.out.println("消息内容Consumer01"+map);
// 输出消息内容
}
};
channel.basicConsume("qy172-topic-queue01",true,consumer);
}
}
接收者2
package com.aaa;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Consumer02 {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.74.75");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String json = new String(body);
Map map = JSON.parseObject(json, Map.class);
System.out.println("消息内容Consumer02" + map);
}
};
//订阅者2
channel.basicConsume("qy172-topic-queue02",true,consumer);
} catch (IOException | TimeoutException e) {
// 处理连接、通道创建或消费消息时可能抛出的异常
e.printStackTrace();
}
}
}