使用IBM MQTTv3实现相关的发布订阅功能

MQTTv3的发布消息的实现:

  1. package com.etrip.mqttv3;
  2. import com.ibm.micro.client.mqttv3.MqttClient;
  3. import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
  4. import com.ibm.micro.client.mqttv3.MqttMessage;
  5. import com.ibm.micro.client.mqttv3.MqttTopic;
  6. /**
  7. * MQTTV3的发布消息类
  8. *
  9. * @author longgangbai
  10. */
  11. public class MQTTPub {
  12. public static void doTest(){
  13. try {
  14. MqttClient client = new MqttClient("tcp://192.168.208.46:1883","mqttserver-pub");
  15. MqttTopic topic = client.getTopic("tokudu/china");
  16. MqttMessage message = new MqttMessage("Hello World. Hello IBM".getBytes());
  17. message.setQos(1);
  18. client.connect();
  19. while(true){
  20. MqttDeliveryToken token = topic.publish(message);
  21. while (!token.isComplete()){
  22. token.waitForCompletion(1000);
  23. }
  24. }
  25. } catch (Exception e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }

MQTTV3的订阅消息类

  1. package com.etrip.mqttv3;
  2. import com.ibm.micro.client.mqttv3.MqttClient;
  3. import com.ibm.micro.client.mqttv3.MqttConnectOptions;
  4. /**
  5. * MQTTV3的订阅消息类
  6. *
  7. * @author longgangbai
  8. */
  9. public class MQTTSubsribe {
  10. public static String doTest() {
  11. try {
  12. //创建MqttClient
  13. MqttClient client = new MqttClient("tcp://192.168.208.46:1883", "java_client0000000000");
  14. //回调处理类
  15. CallBack callback = new CallBack();
  16. client.setCallback(callback);
  17. //创建连接可选项信息
  18. MqttConnectOptions conOptions = new MqttConnectOptions();
  19. //
  20. conOptions.setCleanSession(false);
  21. //连接broker
  22. client.connect(conOptions);
  23. //发布相关的订阅
  24. client.subscribe("tokudu/china", 1);
  25. //client.disconnect();
  26. } catch (Exception e) {
  27. e.printStackTrace();
  28. return "failed";
  29. }
  30. return "success";
  31. }
  32. }

回调处理类处理订阅的消息类

  1. package com.etrip.mqttv3;
  2. import com.ibm.micro.client.mqttv3.MqttCallback;
  3. import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
  4. import com.ibm.micro.client.mqttv3.MqttMessage;
  5. import com.ibm.micro.client.mqttv3.MqttTopic;
  6. /**
  7. * 回调处理类
  8. * 处理订阅的消息类
  9. *
  10. * @author longgangbai
  11. */
  12. public class CallBack implements MqttCallback {
  13. public CallBack() {
  14. }
  15. /**
  16. * 接收到信息的处理
  17. */
  18. public void messageArrived(MqttTopic topic, MqttMessage message) {
  19. try {
  20. System.out.println(" MQTTSubsribe  message.toString()"+message.toString());
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. public void connectionLost(Throwable cause) {
  26. }
  27. public void deliveryComplete(MqttDeliveryToken token) {
  28. }
  29. }

测试类:

  1. package com.etrip.mqttv3;
  2. /**
  3. * MQTTV3的测试类
  4. *
  5. * @author longgangbai
  6. */
  7. public class MQTTMain {
  8. public static void main(String[] args) {
  9. //订阅消息的方法
  10. MQTTSubsribe.doTest();
  11. //发布消息的类
  12. MQTTPub.doTest();
  13. }
  14. }
05-07 15:43