一、是什么
二、协议
1.TCP(默认)
2.NIO
- New I/O API Protocol(NIO)
- 1.NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。
它允许开发人员对同一资源可有更多的client调用和服务器端有更多的负载。
- 2.适合使用NIO协议的场景:
- 可能有大量的Client去连接到Broker上,一般情况下,大量的Client去连接Broker是被操作系统的线程所限制的。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议。
- 可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。
- 3.NIO连接的URI形式:nio://hostname:port?key=value&key=value
- 协议参数文档地址:https://activemq.apache.org/components/classic/documentation/nio-transport-reference
- 默认端口:61618
NIO和TCP协议的编码是一样的,所以只需要替换URL的协议即可实现切换。
3.AMQP
- Advanced Message Queuing Protocol(AMQP)
- 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件限制。
- 默认端口:5672
注意:编码和TCP不一样
4.STOMP
- Streaming Text Orientation Message Protocol(STOMP)
- 是
流文本定向消息协议
,是一种为MOM(Message Oriented Middleware,面向消息中间件)设计的简单文本协议。
- 默认端口:61613
5.SSL
- Secure Sockets Layer Protocol(SSL)
6.MQTT
7 WS
三、NIO配置案例
1.修改activemq.xml
- 打开activemq的配置文件,在active安装目录下:conf/activemq.xml
- 将下面的内容复制到<transportConnectors>标签内
<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true" />
- 如果你不特别指定ActiveMQ的网络监听端口,那么这些端口都讲使用BIO网络IO模型
- 所以为了首先提高单节点的网络吞吐性能,我们需要明确指定ActiveMQ网络IO模型。
- 如上所示:URI格式头以“nio”开头,表示这个端口使用以TCP协议为基础的NIO网络IO模型。
2.重启
./bin/active
- 去控制台查看,是否加成功了。
3.生产者/消费者
NIO和TCP协议的编码是一样的,所以只需要替换URL的协议即可实现切换。
package com.qingsi.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce {
// ACTIVEMQ_URL重点看
public static final String ACTIVEMQ_URL = "nio://192.168.86.128:61618";
public static final String QUEUE_NAME = "transport_nio";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的URL,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话session
//两个参数transacted=事务,acknowledgeMode=确认模式(签收)
//开启事务需要commit
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列queue还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息的生产者,并设置不持久化消息
MessageProducer producer = session.createProducer(queue);
//6.通过使用消息生产者,生产三条消息,发送到MQ的队列里面
// 7.发送消息
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("tx msg--" + i);
producer.send(textMessage);
}
//8.关闭资源
producer.close();
session.close();
connection.close();
}
}
4.性能提升
- 问题:URI格式以"nio"开头,代表这个端口使用TCP协议为基础的NIO网络模型。但是这样的设置方式,只能使这个端口支持Openwire协议。
- 需要将所有的BIO模型,都替换成NIO模型,性能得到提升。
4.1 配置
<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&wireFormat.maxFrameSize=104857600&org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50" />
./bin/activemq restart
4.2 生产者/消费者
- 都是只改动了URL,其他的代码一样。所以下面举例了生产者的URL
- 只要
package com.qingsi.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce {
// ACTIVEMQ_URL:下面就是 NIO协议+NIO模型
public static final String ACTIVEMQ_URL = "nio://192.168.86.128:61608";
// public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61608"; 这样就是 TCP协议+NIO模型
public static final String QUEUE_NAME = "nio_auto";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的URL,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话session
//两个参数transacted=事务,acknowledgeMode=确认模式(签收)
//开启事务需要commit
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列queue还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息的生产者,并设置不持久化消息
MessageProducer producer = session.createProducer(queue);
//6.通过使用消息生产者,生产三条消息,发送到MQ的队列里面
// 7.发送消息
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("tx msg--" + i);
producer.send(textMessage);
}
//8.关闭资源
producer.close();
session.close();
connection.close();
}
}