1.安装
a.安装Erlang:在官网 http://www.erlang.org/downloads 下载 OTP 22.1 Windows 64-bit Binary File,并安装。
b.配置环境变量:新增环境变量 ERLANG_HOME :D:\Tools\erl10.5,在Path添加 %ERLANG_HOME%\bin。
c.在CMD中执行 erl,查看是否安装成功。
d.安装 RabbitMQ:在官网 https://www.rabbitmq.com/download.html 下载 Windows installer,并安装。
e.安装 RabbitMQ-Plugins 管理插件:在CMD中进入RabbitMQ的 sbin 目录,执行命令 rabbitmq-plugins enable rabbitmq_management。
f.启动 RabbitMQ:在 sbin 中双击 rabbitmq-server.bat,或者CMD在 sbin 中执行 net start RabbitMQ。
g.在浏览器中访问 localhost:15672 即可进入 RabbitMQ 的管理界面(用户名:guest,密码:guest)。
2.简单消息队列
a.创建连接工具类
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtil { private static final String RABBIT_HOST = "localhost"; private static final String RABBIT_USERNAME = "guest"; private static final String RABBIT_PASSWORD = "guest"; private static Connection connection = null; public static Connection getConnection() { if(connection == null) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(RABBIT_HOST); connectionFactory.setUsername(RABBIT_USERNAME); connectionFactory.setPassword(RABBIT_PASSWORD); try { connection = connectionFactory.newConnection(); } catch (Exception e) { e.printStackTrace(); } } return connection; } }
b.创建生产者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wode.util.ConnectionUtil; public class SimpleProducer { private static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws Exception { //获取连接 Connection connection = ConnectionUtil.getConnection(); //创建通道 Channel channel = connection.createChannel(); /* * 声明(创建)队列 * 参数1:队列名称 * 参数2:为true时server重启队列不会消失 * 参数3:队列是否是独占的,如果为true只能被一个connection使用,其他连接建立时会抛出异常 * 参数4:队列不再使用时是否自动删除(没有连接,并且没有未处理的消息) * 参数5:建立队列时的其他参数 */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); for (int i = 0; i < 20; i++) { String message = "Hello World!" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("生产者发送消息:" + message); } channel.close(); connection.close(); } }
c.创建消费者
import com.rabbitmq.client.*; import com.wode.util.ConnectionUtil; import java.io.IOException; public class SimpleConsumer { private static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(1); channel.queueDeclare(QUEUE_NAME,false,false,false,null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message = new String(body,"UTF-8"); System.out.println("消费者接受消息:" + message); } }; //监听队列,当b为true时,为自动提交(只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费), // 当b为false时,为手动提交(消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈, // 如果消费者一直没有反馈,那么该消息将一直处于不可用状态。 //如果选用自动确认,在消费者拿走消息执行过程中出现宕机时,消息可能就会丢失!!) //使用channel.basicAck(envelope.getDeliveryTag(),false);进行消息确认 channel.basicConsume(QUEUE_NAME, true, consumer); } }
3.Work消息队列
a.创建生产者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wode.util.ConnectionUtil; public class WorkProducer { private final static String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); for (int i = 0; i < 20; i++) { String message = "" + i; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("生产者发送消息:" + message); } channel.close(); connection.close(); } }
b.创建两个消费者
import com.rabbitmq.client.*; import com.wode.util.ConnectionUtil; import java.io.IOException; public class WorkConsumer1 { private final static String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicQos(1);//能者多劳模式 channel.queueDeclare(QUEUE_NAME,false,false,false,null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message = new String(body,"UTF-8"); try { doWork(message); }finally { channel.basicAck(envelope.getDeliveryTag(),false); } } }; //监听队列,当b为true时,为自动提交(只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费), // 当b为false时,为手动提交(消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈, // 如果消费者一直没有反馈,那么该消息将一直处于不可用状态。 //如果选用自动确认,在消费者拿走消息执行过程中出现宕机时,消息可能就会丢失!!) //使用channel.basicAck(envelope.getDeliveryTag(),false);进行消息确认 channel.basicConsume(QUEUE_NAME,false,consumer); } //模拟处理数据 private static void doWork(String message) { try { System.out.println("消费者1接受数据:" + message); Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
import com.rabbitmq.client.*; import com.wode.util.ConnectionUtil; import java.io.IOException; public class WorkConsumer2 { private final static String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicQos(1);//能者多劳模式 channel.queueDeclare(QUEUE_NAME,false,false,false,null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message = new String(body,"UTF-8"); try { doWork(message); }finally { channel.basicAck(envelope.getDeliveryTag(),false); } } }; //监听队列,当b为true时,为自动提交(只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费), // 当b为false时,为手动提交(消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈, // 如果消费者一直没有反馈,那么该消息将一直处于不可用状态。 //如果选用自动确认,在消费者拿走消息执行过程中出现宕机时,消息可能就会丢失!!) //使用channel.basicAck(envelope.getDeliveryTag(),false);进行消息确认 channel.basicConsume(QUEUE_NAME,false,consumer); } //模拟处理数据 private static void doWork(String message) { try { System.out.println("消费者2接受数据:" + message); Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
3.Publish/Subscribe 发布订阅消息队列
a.创建生产者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wode.util.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; public class SubProducer { //交换机名称 private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); /* 声明exchange交换机 参数1:交换机名称 参数2:交换机类型 参数3:交换机持久性,如果为true则服务器重启时不会丢失 参数4:交换机在不被使用时是否删除 参数5:交换机的其他属性 消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中 */ channel.exchangeDeclare(EXCHANGE_NAME,"fanout", true,true,null); String message = "订阅消息"; channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes()); System.out.println("生产者发送消息:" + message); channel.close(); connection.close(); } }
b.创建两个消费者
import com.rabbitmq.client.*; import com.wode.util.ConnectionUtil; import java.io.IOException; public class SubConsumer1 { private static final String QUEUE_NAME = "fanout_queue_exchange_1"; private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); /* 绑定队列到交换机(这个交换机名称一定要和生产者的交换机名相同) 参数1:队列名 参数2:交换机名 参数3:Routing key 路由键 */ channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); //同一时刻服务器只会发一条数据给消费者 channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message = new String(body,"UTF-8"); System.out.println("消费者1接受消息:" + message); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(QUEUE_NAME,false,consumer); } }
public class SubConsumer2 { private static final String QUEUE_NAME = "fanout_queue_exchange_2"; private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); /* 绑定队列到交换机(这个交换机名称一定要和生产者的交换机名相同) 参数1:队列名 参数2:交换机名 参数3:Routing key 路由键 */ channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); //同一时刻服务器只会发一条数据给消费者 channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message = new String(body,"UTF-8"); System.out.println("消费者2接受消息:" + message); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(QUEUE_NAME,false,consumer); } }
4.Routing路由消息队列
a.创建生产者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wode.util.ConnectionUtil; public class RoutingProducer { private static final String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明exchange,路由模式声明direct channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 消息内容 String message = "这是消息B"; channel.basicPublish(EXCHANGE_NAME, "B", null, message.getBytes()); System.out.println(" 生产者发送消息:" + message); message = "这是消息A"; channel.basicPublish(EXCHANGE_NAME, "A", null, message.getBytes()); System.out.println(" 生产者发送消息:" + message); channel.close(); connection.close(); } }
b.创建两个消费者
import com.rabbitmq.client.*; import com.wode.util.ConnectionUtil; import java.io.IOException; public class RoutingConsumer1 { private final static String QUEUE_NAME = "direct_queue_A"; private final static String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); /* * 绑定队列到交换机 * 参数1:队列的名称 * 参数2:交换机的名称 * 参数3:routingKey */ channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "A"); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("消费者1接受消息:" + new String(body,"UTF-8")); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
import com.rabbitmq.client.*; import com.wode.util.ConnectionUtil; import java.io.IOException; public class RoutingConsumer2 { private final static String QUEUE_NAME = "direct_queue_B"; private final static String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); /* * 绑定队列到交换机 * 参数1:队列的名称 * 参数2:交换机的名称 * 参数3:routingKey */ channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "B"); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("消费者2接受消息:" + new String(body,"UTF-8")); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
5.Topics主题通配消息队列
a.创建生产者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wode.util.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; public class TopicsProducer { private static final String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); String message = "order.update"; channel.basicPublish(EXCHANGE_NAME,"order.update",false,false,null,message.getBytes()); System.out.println("生产者发送消息:" + message); message = "order.insert"; channel.basicPublish(EXCHANGE_NAME,"order.insert",false,false,null,message.getBytes()); System.out.println("生产者发送消息:" + message); channel.close(); connection.close(); } }
b.创建消费者
import com.rabbitmq.client.*; import com.wode.util.ConnectionUtil; import java.io.IOException; public class TopicsConsumer1 { private static final String EXCHANGE_NAME = "topic_exchange"; private static final String QUEUE_NAME = "topic_queue_1"; public static void main(String[] args) throws IOException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //order.# channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"order.*"); channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("消费者1接收数据:" + new String(body,"UTF-8")); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
import com.rabbitmq.client.*; import com.wode.util.ConnectionUtil; import java.io.IOException; public class TopicsConsumer2 { private static final String EXCHANGE_NAME = "topic_exchange"; private static final String QUEUE_NAME = "topic_queue_2"; public static void main(String[] args) throws IOException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //#.insert channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.insert"); channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("消费者2接收数据:" + new String(body,"UTF-8")); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
6.RPC远程调用
a.创建服务端
import com.rabbitmq.client.*; import com.wode.util.ConnectionUtil; import java.io.IOException; public class RpcServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] args) throws IOException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); AMQP.BasicProperties properties1 = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build(); String mes = new String(body, "UTF-8"); System.out.println("服务端接收数据:" + mes); mes = mes + "1"; channel.basicPublish("", properties.getReplyTo(), properties1, mes.getBytes()); System.out.println("服务端发送数据:" + mes); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(RPC_QUEUE_NAME, false, consumer); while (true) { synchronized (consumer) { try { consumer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
b.创建客户端
import com.rabbitmq.client.*; import com.wode.util.ConnectionUtil; import java.io.IOException; import java.util.UUID; import java.util.concurrent.TimeoutException; public class RpcClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; public RpcClient() throws IOException, TimeoutException { connection = ConnectionUtil.getConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); } public void call(String message) throws IOException, InterruptedException { final String corrId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); System.out.println("客户端发送消息:" + message); channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(corrId)) { System.out.println("客户端接收消息:" + new String(body, "UTF-8")); } } }); } public void close() throws IOException { connection.close(); } public static void main(String[] args) throws Exception { RpcClient rpcClient = new RpcClient(); rpcClient.call("2"); } }
7.参考文档:
https://www.jianshu.com/p/51b686371f94
https://www.cnblogs.com/lfalex0831/p/8963247.html