Thingsboard的MQTT设备协议
thingsboard官网: https://thingsboard.io/
thingsboard GitHub: https://github.com/thingsboard/thingsboard
thingsboard提供的体验地址: http://demo.thingsboard.io/
MQTT基础知识
MQTT是一种轻量级的发布 - 订阅消息传递协议,可能使其最适合各种物联网设备。您可以在此处找到有关MQTT的更多信息。
ThingsBoard服务器节点充当MQTT Broker,支持QoS级别0(最多一次)和1(至少一次)以及一组预定义主题。
客户端库设置
您可以在Web上找到大量MQTT客户端库。本文中的示例将基于Mosquitto,MQTT.js和Paho,要设置其中一个工具。
键值格式
默认情况下,ThingsBoard支持JSON中的键值内容。Key始终是一个字符串,而value可以是string,boolean,double或long。也可以使用自定义二进制格式或某些序列化框架。有关详细信息,请参阅物模型。例如:
{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}
遥测上传API
为了将遥测数据发布到ThingsBoard服务器节点,请将PUBLISH消息发送到以下主题:
v1/devices/me/telemetry
最简单的支持数据格式是:
{"key1":"value1", "key2":"value2"}
要么
[{"key1":"value1"}, {"key2":"value2"}]
请注意,在这种情况下,服务器端时间戳将分配给上传的数据!
如果您的设备能够获取客户端时间戳,您可以使用以下格式:
{"ts":1451649600512, "values":{"key1":"value1", "key2":"value2"}}
在上面的示例中,我们假设“1451649600512”是具有毫秒精度的unix时间戳。例如,值'1451649600512'对应于'Fri,2016年1月1日12:00:00.512 GMT'
属性API
ThingsBoard属性API允许设备
将客户端设备属性上载到服务器。
将属性更新发布到服务器
要将客户端设备属性发布到ThingsBoard服务器节点,请将PUBLISH消息发送到以下主题:
v1/devices/me/attributes
更多请看上文给出的连接。
Thingsboard的MQTT传输协议架构
Thingsboard源代码: https://github.com/thingsboard/thingsboard/tree/release-2.0/transport/mqtt
本文基于上面源代码后,剔除相关的安全验证和处理之后搭建简易的讲解项目:
https://github.com/sanshengshui/IOT-Technical-Guide/tree/master/IOT-Guide-MQTT
MQTT框架
因为Thingsboard是一个JVM技术栈的PaaS平台,所以使用的是基于Java通讯框架的Netty,如果有对Netty不太熟悉的同学,可以参考我之前搭建的Netty实践学习案例: https://github.com/sanshengshui/netty-learning-example
项目结构
.
├── IOT-Guide-MQTT.iml ├── pom.xml └── src └── main └── java └── com └── sanshengshui └── mqtt ├── adapter │ └── JsonMqttAdaptor.java // MQTT json转换器,在跟Thingsboard学习IOT-物模型有所讲解 ├── IOTMqttServer.java // MQTT服务 ├── MqttTopicMatcher.java ├── MqttTopics.java ├── MqttTransportHandler.java //MQTT处理类 └── MqttTransportServerInitializer.java
项目代码讲解
IOTMqttServer
1 private static final int PORT = 1884; 2 private static final String leakDetectorLevel = "DISABLED"; 3 private static final Integer bossGroupThreadCount = 1; 4 private static final Integer workerGroupThreadCount = 12; 5 private static final Integer maxPayloadSize = 65536; 6 7 public static void main(String[] args) throws Exception { 8 ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase())); 9 10 EventLoopGroup bossGroup = new NioEventLoopGroup(bossGroupThreadCount); 11 EventLoopGroup workerGroup = new NioEventLoopGroup(workerGroupThreadCount); 12 13 try { 14 15 ServerBootstrap b = new ServerBootstrap(); 16 b.group(bossGroup,workerGroup) 17 .channel(NioServerSocketChannel.class) 18 .handler(new LoggingHandler(LogLevel.INFO)) 19 .childHandler(new MqttTransportServerInitializer(maxPayloadSize)); 20 ChannelFuture f = b.bind(PORT); 21 f.channel().closeFuture().sync(); 22 } finally { 23 bossGroup.shutdownGracefully(); 24 workerGroup.shutdownGracefully(); 25 } 26 }
第8行,设置服务端Netty内存读写泄漏级别,缺省条件下为:DISABLED
第10行和第11行,设置boss线程组和work线程组的线程数量。默认情况下,boss线程组的线程数量为1,work线程组的数量为运行服务机器内核数量的2倍。
第15行,通过创建ServerBootstrap
对象,在第16行设置使用EventLoopGroup
。
在17和19行,设置要被实例化的NioServerSockerChannel
类,并设置最大的负载内容数量。
最后我们通过shutdowGracefully()
函数优雅的关闭bossGroup和workGroup。
MqttTransportHandler#processMqttMsg()
1 private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) { 2 address = (InetSocketAddress) ctx.channel().remoteAddress(); 3 if (msg.fixedHeader() == null) { 4 processDisconnect(ctx); 5 return; 6 } 7 8 switch (msg.fixedHeader().messageType()) { 9 case CONNECT: 10 processConnect(ctx, (MqttConnectMessage) msg); 11 break; 12 case PUBLISH: 13 processPublish(ctx, (MqttPublishMessage) msg); 14 break; 15 case SUBSCRIBE: 16 processSubscribe(ctx, (MqttSubscribeMessage) msg); 17 break; 18 case UNSUBSCRIBE: 19 processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); 20 break; 21 case PINGREQ: 22 if (checkConnected(ctx)) { 23 ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP,false,AT_MOST_ONCE, false, 0))); 24 } 25 break; 26 case DISCONNECT: 27 if (checkConnected(ctx)) { 28 processDisconnect(ctx); 29 } 30 break; 31 default: 32 break; 33 34 } 35 } 36
第3行,通过判断消息的固定头部是否为空,如果空;则通过processDisconnect(ctx)
将设备连接关闭。
private void processDisconnect(ChannelHandlerContext ctx) { ctx.close(); // 关闭socket通道 }
第8行,通过判断固定头部的MQTT消息类型,针对不同消息做相应的处理。
MqttTransportHandler#PublishDevicePublish
以下是对发布消息进行相关的解读,更多消息类型的处理类,大家请参考我上面的IOT-Guide-MQTT进行阅读。
private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) { try { if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) { //如果主题为v1/devices/me/attributes JsonMqttAdaptor.convertToMsg(POST_TELEMETRY_REQUEST, mqttMsg); } else if(topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { JsonMqttAdaptor.convertToMsg(POST_ATTRIBUTES_REQUEST, mqttMsg); } else if(topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) { JsonMqttAdaptor.convertToMsg(GET_ATTRIBUTES_REQUEST, mqttMsg); } } catch (AdaptorException e) { } }
我上面的代码仅是对消息的主题进行判断,然后对主题内的内容进行物模型的解析,得到相关属性或者遥测数据的获得。
演示效果
我们通过Paho或者MQTT.js和服务进行连接,发布消息到以下主题:
简易的数据格式如下:
Paho图示:
服务器控制台打印数据:
七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler channelRegistered 信息: [id: 0xf2bfb3a8] REGISTERED 七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler bind 信息: [id: 0xf2bfb3a8] BIND: 0.0.0.0/0.0.0.0:1884 七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler channelActive 信息: [id: 0xf2bfb3a8, L:/0:0:0:0:0:0:0:0:1884] ACTIVE 七月 24, 2019 1:37:22 下午 io.netty.handler.logging.LoggingHandler channelRead 信息: [id: 0xf2bfb3a8, L:/0:0:0:0:0:0:0:0:1884] RECEIVED: [id: 0xe08abd12, L:/127.0.0.1:1884 - R:/127.0.0.1:48816] key= 1563946708305 属性名=temperature 属性值=38 属性名=humidity 属性值=60
如上所示,希望大家对Thingsboard的IOT架构-MQTT设备协议这块有所了解!