1. 发布消息
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class BookProducer implements Runnable{
public static final String BROKER_URL = "tcp://localhost:61616";
@Override
public void run() {
try {
//1.创建连接工厂,指定ip和端口
ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
//2.使用连接工厂创建一个连接对象
Connection connection = factory.createConnection();
//3.开启连接(JMS会话)
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用会话创建目的地
/**
* ① 点对点(Point-to-Point)。在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列(javax.jms.Queue)相关联。
* ② 发布/订阅(Publish/Subscribe)。发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,
而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题(javax.jms.Topic)关联。
*/
Destination destination = session.createQueue("book-broker")
//创建生产者/消费者
MessageProducer producer = session.createProducer(destination);
// MessageConsumer consumer = session.createConsumer(destination);
//consumer.receive();
/**
* 创建消息,支持的消息类型:
* TextMessage
* MapMessage
* ObjectMessage:对象需要实现序列化接口
* BytesMessage
* StreamMessage
*/
Message message = session.createTextMessage("我是一个香蕉.......");
//发送消息
producer.send(message);
//释放资源
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
2. 接收消息
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
public class BookConsumer implements Runnable {
@Override
public void run() {
try {
var connection = new ActiveMQConnectionFactory(BookProducer.BROKER_URL).createConnection();
connection.start();
/**
* connection.createSession(boolean transacted, int acknowledgeMode);
* transacted:是否使用事务
* acknowledgeMode:应答模式
* AUTO_ACKNOWLEDGE:自动应答
* 对于同步消费者,receive方法调用返回,且没有异常发生时,将自动对收到的消息予以确认.
* 对于异步消息,当onMessage方法返回,且没有异常发生时,即对收到的消息自动确认.
* CLIENT_ACKNOWLEDGE:客户端手动应答
* 这种方式要求客户端使用javax.jms.Message.acknowledge()方法完成确认.
* DUPS_OK_ACKNOWLEDGE:延时//批量通知
* 这种确认方式允许JMS不必急于确认收到的消息,允许在收到多个消息之后一次完成确认,
与Auto_AcKnowledge相比,这种确认方式在某些情况下可能更有效,
因为没有确认,当系统崩溃或者网络出现故障的时候,消息可以被重新传递.
* 使用事务消息确认模式:
* SESSION_TRANSACTED
*/
var session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
var consumer = session.createConsumer(session.createQueue("tmall-queue"));
var message = ((TextMessage)consumer.receive()).getText();
System.out.println(message);
session.close();
connection.close();
}
}
或者设置监听器接收(消费者不用一直在线,监听到消息自动接收)
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
public class BookConsumer implements Runnable {
@Override
public void run() {
try {
var connection = new ActiveMQConnectionFactory(BookProducer.BROKER_URL).createConnection();
connection.start();
var session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
var consumer = session.createConsumer(session.createQueue("tmall-queue"));
consumer.setMessageListener(message -> {
try {
System.out.println(((TextMessage) message).getText());
}catch (JMSException e){
e.printStackTrace();
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}