物聯網時代-跟着Thingsboard學IOT架構-MQTT設備協議

演示視頻請移步: james-1258744956.cos.ap-shanghai.myqcloud.com/thingsboard…html


Thingsboard的MQTT設備協議

thingsboard官網: thingsboard.io/java

thingsboard GitHub: github.com/thingsboard…git

thingsboard提供的體驗地址: demo.thingsboard.io/github

BY Thingsboard teamnpm

如下內容是在原文基礎上演繹的譯文。除非另行註明,頁面上全部內容採用知識共享-署名(CC BY 2.5 AU)協議共享。json

原文地址: ThingsBoard API參考:MQTT設備APIapi


MQTT基礎知識

MQTT是一種輕量級的發佈 - 訂閱消息傳遞協議,可能使其最適合各類物聯網設備。您能夠在此處找到有關MQTT的更多信息。安全

ThingsBoard服務器節點充當MQTT Broker,支持QoS級別0(最多一次)和1(至少一次)以及一組預約義主題。bash


客戶端庫設置

您能夠在Web上找到大量MQTT客戶端庫。本文中的示例將基於Mosquitto,MQTT.jsPaho,要設置其中一個工具。服務器

鍵值格式

默認狀況下,ThingsBoard支持JSON中的鍵值內容。Key始終是一個字符串,而value能夠是string,boolean,double或long。也可使用自定義二進制格式或某些序列化框架。有關詳細信息,請參閱物模型。例如:

{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}
複製代碼

遙測上傳API

爲了將遙測數據發佈到ThingsBoard服務器節點,請將PUBLISH消息發送到如下主題:

v1/devices/me/telemetry
複製代碼

最簡單的支持數據格式是:

{"key1":"value1", "key2":"value2"}
複製代碼

要麼

[{"key1":"value1"}, {"key2":"value2"}]
複製代碼

請注意,在這種狀況下,服務器端時間戳將分配給上傳的數據!

若是您的設備可以獲取客戶端時間戳,您可使用如下格式:

{"ts":1451649600512, "values":{"key1":"value1", "key2":"value2"}}
複製代碼

在上面的示例中,咱們假設「1451649600512」是具備毫秒精度的unix時間戳。例如,值'1451649600512'對應於'Fri,2016年1月1日12:00:00.512 GMT'

屬性API

ThingsBoard屬性API容許設備

  • 客戶端設備屬性上載到服務器。
將屬性更新發布到服務器

要將客戶端設備屬性發布到ThingsBoard服務器節點,請將PUBLISH消息發送到如下主題:

v1/devices/me/attributes
複製代碼

更多請看上文給出的鏈接。


Thingsboard的MQTT傳輸協議架構

由於Thingsboard最新release,是基於微服務架構,不利用單獨理解代碼。

Thingsboard源代碼: github.com/thingsboard…

本文基於上面源代碼後,剔除相關的安全驗證和處理以後搭建簡易的講解項目:

github.com/sanshengshu…


MQTT框架

由於Thingsboard是一個JVM技術棧的PaaS平臺,因此使用的是基於Java通信框架的Netty,若是有對Netty不太熟悉的同窗,能夠參考我以前搭建的Netty實踐學習案例: github.com/sanshengshu…

項目結構

.
├── IOT-Guide-MQTT.iml
├── pom.xml
└── src
    └── main
        └── java
            └── com
                └── sanshengshui
                    └── mqtt
                        ├── adapter
                        │   └── JsonMqttAdaptor.java // MQTT json轉換器,在跟Thingsboard學習IOT-物模型有所講解
                        ├── IOTMqttServer.java // MQTT服務
                        ├── MqttTopicMatcher.java
                        ├── MqttTopics.java 
                        ├── MqttTransportHandler.java //MQTT處理類
                        └── MqttTransportServerInitializer.java

複製代碼

項目代碼講解

IOTMqttServer

private static final int PORT = 1884;
    private static final String leakDetectorLevel = "DISABLED";
    private static final Integer bossGroupThreadCount = 1;
    private static final Integer workerGroupThreadCount = 12;
    private static final Integer maxPayloadSize = 65536;

    public static void main(String[] args) throws Exception {
        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));

        EventLoopGroup bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
        EventLoopGroup workerGroup = new NioEventLoopGroup(workerGroupThreadCount);

        try {

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new MqttTransportServerInitializer(maxPayloadSize));
            ChannelFuture f = b.bind(PORT);
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
複製代碼

第8行,設置服務端Netty內存讀寫泄漏級別,缺省條件下爲:DISABLED

第10行和第11行,設置boss線程組和work線程組的線程數量。默認狀況下,boss線程組的線程數量爲1,work線程組的數量爲運行服務機器內核數量的2倍。

第15行,經過建立ServerBootstrap對象,在第16行設置使用EventLoopGroup

在17和19行,設置要被實例化的NioServerSockerChannel類,並設置最大的負載內容數量。

最後咱們經過shutdowGracefully()函數優雅的關閉bossGroup和workGroup。


MqttTransportHandler#processMqttMsg()

private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
        address = (InetSocketAddress) ctx.channel().remoteAddress();
        if (msg.fixedHeader() == null) {
            processDisconnect(ctx);
            return;
        }

        switch (msg.fixedHeader().messageType()) {
            case CONNECT:
                processConnect(ctx, (MqttConnectMessage) msg);
                break;
            case PUBLISH:
                processPublish(ctx, (MqttPublishMessage) msg);
                break;
            case SUBSCRIBE:
                processSubscribe(ctx, (MqttSubscribeMessage) msg);
                break;
            case UNSUBSCRIBE:
                processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
                break;
            case PINGREQ:
                if (checkConnected(ctx)) {
                    ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP,false,AT_MOST_ONCE, false, 0)));
                }
                break;
            case DISCONNECT:
                if (checkConnected(ctx)) {
                    processDisconnect(ctx);
                }
                break;
            default:
                break;

        }
    }

複製代碼

第3行,經過判斷消息的固定頭部是否爲空,若是空;則經過processDisconnect(ctx)將設備鏈接關閉。

processDisconnect(channelHandlerContext ctx)

private void processDisconnect(ChannelHandlerContext ctx) {
        ctx.close();  // 關閉socket通道
    }

複製代碼

第8行,經過判斷固定頭部的MQTT消息類型,針對不一樣消息作相應的處理。


MqttTransportHandler#PublishDevicePublish

如下是對發佈消息進行相關的解讀,更多消息類型的處理類,你們請參考我上面的IOT-Guide-MQTT進行閱讀。

private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
        try {
            if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) { //若是主題爲v1/devices/me/attributes
                JsonMqttAdaptor.convertToMsg(POST_TELEMETRY_REQUEST, mqttMsg);
            } else if(topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
                JsonMqttAdaptor.convertToMsg(POST_ATTRIBUTES_REQUEST, mqttMsg);
            } else if(topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
                JsonMqttAdaptor.convertToMsg(GET_ATTRIBUTES_REQUEST, mqttMsg);
            }
        } catch (AdaptorException e) {

        }

    }

複製代碼

我上面的代碼僅是對消息的主題進行判斷,而後對主題內的內容進行物模型的解析,獲得相關屬性或者遙測數據的得到。


演示效果

咱們經過Paho或者MQTT.js和服務進行鏈接,發佈消息到如下主題:

v1/devices/me/telemetry

複製代碼

簡易的數據格式以下:

{"key1":"value1", "key2":"value2"}
複製代碼

Paho圖示:

服務器控制檯打印數據:

七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xf2bfb3a8] REGISTERED
七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xf2bfb3a8] BIND: 0.0.0.0/0.0.0.0:1884
七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xf2bfb3a8, L:/0:0:0:0:0:0:0:0:1884] ACTIVE
七月 24, 2019 1:37:22 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xf2bfb3a8, L:/0:0:0:0:0:0:0:0:1884] RECEIVED: [id: 0xe08abd12, L:/127.0.0.1:1884 - R:/127.0.0.1:48816]
key= 1563946708305
屬性名=temperature 屬性值=38
屬性名=humidity 屬性值=60
複製代碼

如上所示,但願你們對Thingsboard的IOT架構-MQTT設備協議這塊有所瞭解!

相關文章
相關標籤/搜索