本文介绍如何使用Android(JAVA)客户机连接MQTT服务器
第一步先引入MQTT
在 build.gradle 添加以下:
dependencies { implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0' }
接下来演示如何连接
创建MQTT客户机类
MqttConnect.class
public class MqttConnect { private String HOST = ContentValue.TCP_URL; private final String clientId = UUID.randomUUID().toString(); private static MqttClient mqttClient; /** * 客户端connect连接mqtt服务器 * * @param username 用户名 * @param password 密码 * @param mqttCallback 回调函数 **/ public void setMqttClient(String username, String password, MqttCallback mqttCallback) throws MqttException { MqttConnectOptions options = mqttConnectOptions(username, password); mqttClient.setCallback(mqttCallback); mqttClient.connect(options); if(mqttClient.isConnected()) { System.out.println("MQTT连接成功"); } } /** * MQTT连接参数设置 */ private MqttConnectOptions mqttConnectOptions(String userName, String passWord) throws MqttException { mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(userName); options.setPassword(passWord.toCharArray()); options.setConnectionTimeout(60); options.setAutomaticReconnect(true); options.setCleanSession(false); return options; } /** * 关闭MQTT连接 */ public void close() throws MqttException { mqttClient.close(); mqttClient.disconnect(); } /** * 向某个主题发布消息 默认qos:1 */ public static void pub(String topic, String msg) throws MqttException { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(msg.getBytes()); MqttTopic mqttTopic = mqttClient.getTopic(topic); MqttDeliveryToken token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } /** * 向某个主题发布消息 * * @param topic: 发布的主题 * @param msg: 发布的消息 * @param qos: 消息质量 Qos:0、1、2 */ public static void pub(String topic, String msg, int qos) throws MqttException { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setPayload(msg.getBytes()); MqttTopic mqttTopic = mqttClient.getTopic(topic); MqttDeliveryToken token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } /** * 订阅某一个主题 ,此方法默认的的Qos等级为:1 * * @param topic 主题 */ public static void sub(String topic) throws MqttException { mqttClient.subscribe(topic); } /** * 订阅某一个主题,可携带Qos * * @param topic 所要订阅的主题 * @param qos 消息质量:0、1、2 */ public void sub(String topic, int qos) throws MqttException { mqttClient.subscribe(topic, qos); } }
创建MQTT消息回调类
MqttInitCallback.class
public class MqttInitCallback implements MqttCallback { /** * publish发布成功后会执行到这里 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { } @Override public void connectionLost(Throwable throwable) { System.out.println("mqtt close..."); } /** * subscribe订阅后得到的消息会执行到这里 */ @Override public void messageArrived(String topic, MqttMessage message) { System.out.println("topic-message:" + message.toString()); } }
到这里主要的两个函数也就完成了,分别为 MQTT客户机连接类 以及 MQTT数据回调类
接下来是调用连接以及订阅监听消息
// 用户名 private String username = ContentValue.TCP_USERNAME; // 密码 private String password = ContentValue.TCP_PASSWORD; // MQTT客户机连接类 private MqttConnect server = new MqttConnect(); // MQTT回调函数 private MqttInitCallback initCallback = new MqttInitCallback(); // MQTT连接 server.setMqttClient(username, password, initCallback); // MQTT订阅 "/test"是主题 0是qos,一共可以设置0,1,2 server.sub("/test",0)
可以使用MQTT.FX进行测试
如果需要免费版安装包,可以加我微信好友领取
配置以下连接
测试推送一条消息到主题
我们来查看一下Android的运行日志
连接成功,并收到监听主题的消息。