第一种:点对点

activemq两种实现方式-LMLPHP

#发布者
public class Producer { private static final String userName = ActiveMQXAConnectionFactory.DEFAULT_USER;
private static final String password = ActiveMQXAConnectionFactory.DEFAULT_PASSWORD;
private static final String brokerURL = "tcp://192.168.178.X:61616"; public static void main(String[] args) throws JMSException {
//1.创建连接工厂类
ConnectionFactory factory = new ActiveMQXAConnectionFactory(userName, password, brokerURL);
//2.创建连接
Connection connection = factory.createConnection();
//3.启动连接
connection.start(); //4.创建会话对象session(事务transacted为true,参数2不生效)
//acknowledgeMode:
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); //5.目的地
Queue queue = session.createQueue("mq-test-01");
//7.创建发送者
MessageProducer producer = session.createProducer(queue);
for(int i=1;i<=10;i++) {
//6.消息对象
TextMessage message = session.createTextMessage();
message.setText("消息"+i);
//8.发送消息
producer.send(message);
}
//9.会话提交
// session.commit(); //10.关闭连接
connection.close();
}
}
#消费者
public class Consumer1 { private static final String userName = ActiveMQXAConnectionFactory.DEFAULT_USER;
private static final String password = ActiveMQXAConnectionFactory.DEFAULT_PASSWORD;
private static final String brokerURL = "tcp://192.168.178.X:61616"; public static void main(String[] args) throws JMSException {
//1.创建连接工厂类
ConnectionFactory factory = new ActiveMQXAConnectionFactory(userName, password, brokerURL);
//2.创建连接
Connection connection = factory.createConnection();
//3.启动连接
connection.start(); //4.创建会话对象session(事务transacted为true,参数2不生效)
//acknowledgeMode:
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); //5.目的地
Queue queue = session.createQueue("mq-test-01"); //6.接收消息对象
MessageConsumer consumer = session.createConsumer(queue); //7.通过监听器接收消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
//获取消息
try {
String msg = textMessage.getText();
System.out.println(msg);
} catch (JMSException e) {
}
}
}); }
}

第二种: 发布者/订阅者

activemq两种实现方式-LMLPHP

启动顺序:先订阅、再发布

#订阅者
public class Subscriber1 { private static final String userName = ActiveMQXAConnectionFactory.DEFAULT_USER;
private static final String password = ActiveMQXAConnectionFactory.DEFAULT_PASSWORD;
private static final String brokerURL = "tcp://192.168.129.10:61616"; public static void main(String[] args) throws JMSException {
//1.创建连接工厂类
ConnectionFactory factory = new ActiveMQXAConnectionFactory(userName, password, brokerURL);
//2.创建连接
Connection connection = factory.createConnection();
//3.启动连接
connection.start(); //4.创建会话对象session(事务transacted为true,参数2不生效)
//acknowledgeMode:
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); //5.目的地
Topic topic = session.createTopic("mq-test-02");
//6.接收消息对象
MessageConsumer consumer = session.createConsumer(topic); //7.通过监听器接收消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
//获取消息
try {
String msg = textMessage.getText();
System.out.println(msg);
} catch (JMSException e) {
}
}
}); }
}
#发布者
public class Publisher { private static final String userName = ActiveMQXAConnectionFactory.DEFAULT_USER;
private static final String password = ActiveMQXAConnectionFactory.DEFAULT_PASSWORD;
private static final String brokerURL = "tcp://192.168.129.10:61616"; public static void main(String[] args) throws JMSException {
//1.创建连接工厂类
ConnectionFactory factory = new ActiveMQXAConnectionFactory(userName, password, brokerURL);
//2.创建连接
Connection connection = factory.createConnection();
//3.启动连接
connection.start(); //4.创建会话对象session(事务transacted为true,参数2不生效)
//acknowledgeMode:
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); //5.目的地
Topic topic = session.createTopic("mq-test-02");
//7.创建发送者
MessageProducer producer = session.createProducer(topic);
for(int i=1;i<=10;i++) {
//6.消息对象
TextMessage message = session.createTextMessage();
message.setText("消息"+i);
//8.发送消息
producer.send(message);
//设置自动持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
}
//9.会话提交
// session.commit(); //10.关闭连接
connection.close();
}
}
05-17 23:47