演示視頻請移步: james-1258744956.cos.ap-shanghai.myqcloud.com/thingsboard…html
thingsboard官網: thingsboard.io/java
thingsboard GitHub: github.com/thingsboard…git
thingsboard提供的體驗地址: demo.thingsboard.io/github
BY Thingsboard teamnpm
如下內容是在原文基礎上演繹的譯文。除非另行註明,頁面上全部內容採用知識共享-署名(CC BY 2.5 AU)協議共享。json
原文地址: ThingsBoard API參考:MQTT設備APIapi
MQTT是一種輕量級的發佈 - 訂閱消息傳遞協議,可能使其最適合各類物聯網設備。您能夠在此處找到有關MQTT的更多信息。安全
ThingsBoard服務器節點充當MQTT Broker,支持QoS級別0(最多一次)和1(至少一次)以及一組預約義主題。bash
您能夠在Web上找到大量MQTT客戶端庫。本文中的示例將基於Mosquitto,MQTT.js和Paho,要設置其中一個工具。服務器
默認狀況下,ThingsBoard支持JSON中的鍵值內容。Key始終是一個字符串,而value能夠是string,boolean,double或long。也可使用自定義二進制格式或某些序列化框架。有關詳細信息,請參閱物模型。例如:
{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}
複製代碼
爲了將遙測數據發佈到ThingsBoard服務器節點,請將PUBLISH消息發送到如下主題:
v1/devices/me/telemetry
複製代碼
最簡單的支持數據格式是:
{"key1":"value1", "key2":"value2"}
複製代碼
要麼
[{"key1":"value1"}, {"key2":"value2"}]
複製代碼
請注意,在這種狀況下,服務器端時間戳將分配給上傳的數據!
若是您的設備可以獲取客戶端時間戳,您可使用如下格式:
{"ts":1451649600512, "values":{"key1":"value1", "key2":"value2"}}
複製代碼
在上面的示例中,咱們假設「1451649600512」是具備毫秒精度的unix時間戳。例如,值'1451649600512'對應於'Fri,2016年1月1日12:00:00.512 GMT'
ThingsBoard屬性API容許設備
要將客戶端設備屬性發布到ThingsBoard服務器節點,請將PUBLISH消息發送到如下主題:
v1/devices/me/attributes
複製代碼
更多請看上文給出的鏈接。
由於Thingsboard最新release,是基於微服務架構,不利用單獨理解代碼。
Thingsboard源代碼: github.com/thingsboard…
本文基於上面源代碼後,剔除相關的安全驗證和處理以後搭建簡易的講解項目:
由於Thingsboard是一個JVM技術棧的PaaS平臺,因此使用的是基於Java通信框架的Netty,若是有對Netty不太熟悉的同窗,能夠參考我以前搭建的Netty實踐學習案例: github.com/sanshengshu…
.
├── IOT-Guide-MQTT.iml
├── pom.xml
└── src
└── main
└── java
└── com
└── sanshengshui
└── mqtt
├── adapter
│ └── JsonMqttAdaptor.java // MQTT json轉換器,在跟Thingsboard學習IOT-物模型有所講解
├── IOTMqttServer.java // MQTT服務
├── MqttTopicMatcher.java
├── MqttTopics.java 
├── MqttTransportHandler.java //MQTT處理類
└── MqttTransportServerInitializer.java
複製代碼
private static final int PORT = 1884;
private static final String leakDetectorLevel = "DISABLED";
private static final Integer bossGroupThreadCount = 1;
private static final Integer workerGroupThreadCount = 12;
private static final Integer maxPayloadSize = 65536;
public static void main(String[] args) throws Exception {
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
EventLoopGroup bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
EventLoopGroup workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new MqttTransportServerInitializer(maxPayloadSize));
ChannelFuture f = b.bind(PORT);
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
複製代碼
第8行,設置服務端Netty內存讀寫泄漏級別,缺省條件下爲:DISABLED
第10行和第11行,設置boss線程組和work線程組的線程數量。默認狀況下,boss線程組的線程數量爲1,work線程組的數量爲運行服務機器內核數量的2倍。
第15行,經過建立ServerBootstrap
對象,在第16行設置使用EventLoopGroup
。
在17和19行,設置要被實例化的NioServerSockerChannel
類,並設置最大的負載內容數量。
最後咱們經過shutdowGracefully()
函數優雅的關閉bossGroup和workGroup。
private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
address = (InetSocketAddress) ctx.channel().remoteAddress();
if (msg.fixedHeader() == null) {
processDisconnect(ctx);
return;
}
switch (msg.fixedHeader().messageType()) {
case CONNECT:
processConnect(ctx, (MqttConnectMessage) msg);
break;
case PUBLISH:
processPublish(ctx, (MqttPublishMessage) msg);
break;
case SUBSCRIBE:
processSubscribe(ctx, (MqttSubscribeMessage) msg);
break;
case UNSUBSCRIBE:
processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break;
case PINGREQ:
if (checkConnected(ctx)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP,false,AT_MOST_ONCE, false, 0)));
}
break;
case DISCONNECT:
if (checkConnected(ctx)) {
processDisconnect(ctx);
}
break;
default:
break;
}
}
複製代碼
第3行,經過判斷消息的固定頭部是否爲空,若是空;則經過processDisconnect(ctx)
將設備鏈接關閉。
processDisconnect(channelHandlerContext ctx)
private void processDisconnect(ChannelHandlerContext ctx) {
ctx.close(); // 關閉socket通道
}
複製代碼
第8行,經過判斷固定頭部的MQTT消息類型,針對不一樣消息作相應的處理。
如下是對發佈消息進行相關的解讀,更多消息類型的處理類,你們請參考我上面的IOT-Guide-MQTT進行閱讀。
private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
try {
if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) { //若是主題爲v1/devices/me/attributes
JsonMqttAdaptor.convertToMsg(POST_TELEMETRY_REQUEST, mqttMsg);
} else if(topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
JsonMqttAdaptor.convertToMsg(POST_ATTRIBUTES_REQUEST, mqttMsg);
} else if(topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
JsonMqttAdaptor.convertToMsg(GET_ATTRIBUTES_REQUEST, mqttMsg);
}
} catch (AdaptorException e) {
}
}
複製代碼
我上面的代碼僅是對消息的主題進行判斷,而後對主題內的內容進行物模型的解析,獲得相關屬性或者遙測數據的得到。
咱們經過Paho或者MQTT.js和服務進行鏈接,發佈消息到如下主題:
v1/devices/me/telemetry
複製代碼
簡易的數據格式以下:
{"key1":"value1", "key2":"value2"}
複製代碼
Paho圖示:
服務器控制檯打印數據:
七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xf2bfb3a8] REGISTERED
七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xf2bfb3a8] BIND: 0.0.0.0/0.0.0.0:1884
七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xf2bfb3a8, L:/0:0:0:0:0:0:0:0:1884] ACTIVE
七月 24, 2019 1:37:22 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xf2bfb3a8, L:/0:0:0:0:0:0:0:0:1884] RECEIVED: [id: 0xe08abd12, L:/127.0.0.1:1884 - R:/127.0.0.1:48816]
key= 1563946708305
屬性名=temperature 屬性值=38
屬性名=humidity 屬性值=60
複製代碼
如上所示,但願你們對Thingsboard的IOT架構-MQTT設備協議這塊有所瞭解!