消息中間件技術 - 淺談mqtt協議及其實現【轉】

消息中間件技術 - 淺談mqtt協議及其實現

 

做者:carter(佘虎),轉載請註明出處,特別說明:本博文來自博主原博客,爲保證新博客中博文的完整性,特複製到此留存,如需轉載請註明新博客地址便可。

1.1概念

MQTT(MQ Telemetry Transport) 消息隊列遙測傳輸協議是IBM開發的一種網絡應用層的協議,提供輕量級的,支持可發佈/可訂閱的的消息推送模式,使設備對設備之間的短消息通訊變得簡單,好比如今應用普遍的低功耗傳感器,手機、嵌入式計算機、微型控制器,衛星等移動設備。javascript

1.2優勢

1.2.1很是低的通訊開銷

MQTT 的獨特之處在於,它的每消息標題能夠短至 2 個byte。MQ 和 HTTP 都擁有高得多的每消息開銷。對於 HTTP,爲每一個新請求消息從新創建 HTTP 鏈接會致使重大的開銷。MQ 和 MQTT 所使用的永久鏈接顯著減小了這一開銷。html

1.2.2低功耗,省電

您須要可以及時地將通知傳遞給客戶。爲此,必須採用某種按期輪詢或推送方法;從電池、系統負載和帶寬角度講,推送是最佳解決方案。MQTT 是專門針對低功耗目標而設計的。HTTP 的設計沒有考慮此因素,所以增長了功耗。java

1.2.3單機百萬級併發

在 HTTP 堆棧上,維護數百萬個併發鏈接,須要作許多的工做來提供支持。儘管能夠實現此支持,但大多數商業產品都爲處理這一數量級的永久鏈接而進行了優化。IBM 提供了 IBM MessageSight,這是一個單機架裝載服務器,通過測試能處理多達 100 萬個經過 MQTT 併發鏈接的設備。相反,MQ 不是爲大量併發客戶端而設計的。c++

1.2.4對網絡環境的容忍度

MQTT提供三種不一樣消息傳遞等級,讓消息能按需到達目的地,適應在不穩定工做的網絡傳輸需求。MQTT 和 MQ 可以從斷開等故障中恢復,並且沒有進一步的代碼需求。可是,HTTP 沒法原生地實現此目的,須要客戶端重試編碼,這可能增長冪等性問題。spring

1.2.5客戶端多平臺支持

支持各類流行編程語言(包括C,Java,Ruby,Python 等等)且易於使用的客戶端。macos

1.2.6發佈/訂閱模式,開發簡易

支持發佈 / 訂閱模型,簡化應用程序的開發。編程

1.2.7推送通知

企業可能須要在沒有第三方中介的狀況下發送敏感的信息。這下降了特定於操做系統的解決方案(好比 Apple iOS、Google Play 通知)做爲主要傳輸機制的價值。安全

HTTP 只容許使用一種稱爲COMET 的方法,使用持久的 HTTP 請求來執行推送。從客戶端和服務器的角度講,此方法都很昂貴。MQ 和 MQTT 都支持推送,這是它們的一個基本特性。服務器

1.2.8防火牆容錯

一些企業防火牆將出站鏈接限制到一些已定義的端口。這些端口一般被限制爲 HTTP(80 端口)、HTTPS(443 端口)等。HTTP 顯然能夠在這些狀況下運行。MQTT 可封裝在一個 WebSockets 鏈接中,顯示爲一個 HTTP 升級請求,從而容許在這些狀況下運行。MQ 不容許採用這種模式。cookie

 

1.3缺點

因爲MQTT自己的各項技術優點,愈來愈多的企業傾向於選用MQTT做爲物聯網產品通信的標準協議,也所以,工程師們漸漸發現MQTT協議要想大規模商用,也有一些有待完善的功能。好比:

1.3.1沒有齊備的SDK

不一樣的異構終端,須要有對應的與MQTT服務器通訊的軟件SDK包,好比MCU、Linux、Android、IOS、WEB等之間要實現互聯互通必然須要不一樣的SDK包。

1.3.2不支持File和AV

有些應用場景,須要傳輸的信息可能不只僅限於指令,好比聲音信號和視頻信號,這些須要經過File和AV來實現通訊。

1.3.3不支持與第三方的HTTP集成

雖然MQTT協議優於普通的HTTP協議,可是基於傳統的HTTP協議的WEB服務器仍然佔主流市場,那麼這些服務器要實現與MQTT協議的互聯互通,以下降升級成本也尤其關鍵。

1.3.4不支持用戶管理接口

用戶在進行設備的行爲數據分析的時候,顯得尤其重要,這又是工業4.0、大數據時代的必然需求。

1.3.5原生不支持離線

消息彌補設備離線之後,MQTT服務器對設備的控制信息丟失的問題。

(解決方案:https://helpcdn.aliyun.com/document_detail/59914.html)

1.3.6不支持點對點通訊

採用標準的MQTT協議,理論上能夠經過相互訂閱的方式實現點對點通訊,可是邏輯相對複雜,而且對設備的安全性方面存在擔心。當設備B和設備C在同一主題的狀況下,設備A沒法知道是設備B仍是設備C發送的消息,也有可能消息被設備D竊聽。

1.3.7不支持羣通訊和羣管理

實現了對羣組成員的管理,羣組成員之間能互通消息,這在一個設備被多人控制,或者多個設備被一人控制的這種場景下,尤其有用。

 

1.4實現

1.4.1 交互圖

 

怎麼樣,是否是一目瞭然,很是簡單

1.4.2 MQTT代理

1.4.2.1 mosquitto

1.4.2.2 EMQ

1.4.2.3 HiveMQ

1.4.2.4 ActiveMQ

1.4.2.5 mosca

1.4.3 MQTT客戶端

1.4.3.1 eclipse Paho      (支持C,C++, JavaJavaScriptPython, Go, C#)

1.4.3.2 M2MQTT     (C#)

1.4.3.3 Fusesource MQTTClient     (Java)

1.4.3.4 MQTT.js    (javascript)

1.4.3.5 Libmosquitto    (c/c++)

1.4.3.6 Twisted

 

1.  Mosquito

2.1 簡介

一款實現了消息推送協議 MQTT v3.1 的開源消息代理軟件,提供輕量級的,支持可發佈/可訂閱的的消息推送模式,使設備對設備之間的短消息通訊變得簡單,好比如今應用普遍的低功耗傳感器,手機、嵌入式計算機、微型控制器等移動設備。一個典型的應用案例就是 Andy Stanford-ClarkMosquitto(MQTT協議創始人之一)在家中實現的遠程監控和自動化。並在 OggCamp 的演講上,對MQTT協議進行詳細闡述。

1.2         官網

https://mosquitto.org/

1.3         優勢

2.3.1使用簡單

2.3.2 網絡資料豐富

 

1.4         缺點

2.4.1 沒有可視化管理後臺

2.4.2 商用實例很少

2.5 安裝及踩坑

1.源碼包下載:http://mosquitto.org/files/source/

或者wget http://mosquitto.org/files/source/mosquitto-1.4.9.tar.gz

版本:mosquitto-1.4.tar.gz

解壓:tar -zxvf mosquitto-1.4.tar.gz

進入目錄:cd mosquitto-1.4

2.編譯安裝

打開配置文件,去掉暫且不須要的功能:

vi config.mk

如:WITH_TLS,WITH_TLS_PSK, WITH_SRV, WITH_WEBSOCKETS, WITH_SOCKS, WITH_UUID等

保存退出:wq

安裝mosquitto

make

make install

踩過的坑:

a】編譯找不到openssl/ssl.h

  安裝openssl     sudo apt-get install libssl-dev

【b】編譯過程找不到ares.h

sudo apt-get install libc-ares-dev

【c】編譯過程找不到uuid/uuid.h

sudo apt-get install uuid-dev

【d】使用過程當中找不到libmosquitto.so.1

error while loading shared libraries: libmosquitto.so.1: cannot open shared object file: No such file or directory

    【解決方法】——修改libmosquitto.so位置

# 建立連接

sudo ln -s /usr/local/lib/libmosquitto.so.1 /usr/lib/libmosquitto.so.1

# 更新動態連接庫

sudo ldconfig

【e】make: g++:命令未找到  

    【解決方法】

    安裝g++編譯器

sudo apt-get install g++

 

啓動 mosquitto broker

mosquitto -c /etc/mosquitto/mosquitto.conf.example &

-c : specify the broker config file.

 -d : put the broker into the background after starting.

 -h : display this help.

 -p : start the broker listening on the specified port.

      Not recommended in conjunction with the -c option.

 -v : verbose mode - enable all logging types. This overrides

      any logging options given in the config file.

訂閱消息:

./mosquitto_sub  -h 127.0.0.1 -p 1883 -t "/sports/wordcup"

 

發佈消息:

./mosquitto_pub -h 127.0.0.1 -p 1883 -t "/sports/wordcup " -m "this is carter hello"

 

或者

./mosquitto_sub  -h 10.129.4.12 -p 1883 -t "/sports/wordcup"

./mosquitto_pub -h 10.129.4.12 -p 1883 -t "/sports/wordcup" -m "this is carter hello 666"

 

外網地址不行,因此我在本機用paho代碼一直報timeOut異常,緣由是服務器防火牆未放開端口

【解決辦法】

放開防火牆端口:firewall-cmd --add-port=1883/tcp –permanent

重啓防火牆:systemctl restart firewalld

2.6         java客戶端實現(eclipse.paho)

2.6.1 添加依賴

Java客戶端實現

採用eclipse.paho框架

新建maven工程,加入依賴

 
複製代碼
<!-- spring整合mqtt 開始-->

<dependency>

   <groupId>org.springframework.integration</groupId>

   <artifactId>spring-integration-mqtt</artifactId>

   <version>4.1.0.RELEASE</version>

   <exclusions>

      <exclusion>

         <groupId>org.eclipse.paho</groupId>

         <artifactId>mqtt-client</artifactId>

      </exclusion>

   </exclusions>

</dependency>

<!-- spring整合mqtt 結束-->

<!-- mqtt依賴 開始 -->

<dependency>

   <groupId>org.eclipse.paho</groupId>

   <artifactId>org.eclipse.paho.client.mqttv3</artifactId>

   <version>1.2.0</version>

</dependency>

<dependency>

   <groupId>org.eclipse.paho</groupId>

   <artifactId>mqtt-client</artifactId>

</dependency>

<!-- mqtt依賴 結束 -->
複製代碼

2.6.2 發佈消息

複製代碼
public class ServerMQTT {

    //tcp://MQTT安裝的服務器地址:MQTT定義的端口號

    public static final String HOST = "tcp://111.9.116.136:1883";

    //定義一個主題

    public static final String TOPIC = "pos_message_all";

    //定義MQTT的ID,能夠在MQTT服務配置中指定

    private static final String clientid = "server11";


    private MqttClient client;

    private MqttTopic topic11;

    private String userName = "mosquitto";  //非必須

    private String passWord = "";  //非必須

    private MqttMessage message;

    /**

     * 構造函數

     * @throws MqttException

     */

    public ServerMQTT() throws MqttException {

        // MemoryPersistence設置clientid的保存形式,默認爲之內存保存

        client = new MqttClient(HOST, clientid, new MemoryPersistence());

        connect();

    }

    /**

     *  用來鏈接服務器

     */

    private void connect() {

        MqttConnectOptions options = new MqttConnectOptions();

        options.setCleanSession(false);

        options.setUserName(userName);

        options.setPassword(passWord.toCharArray());

        // 設置超時時間

        options.setConnectionTimeout(10);

        // 設置會話心跳時間

        options.setKeepAliveInterval(20);

        try {

            client.setCallback(new PushCallBack());

            client.connect(options);

            topic11 = client.getTopic(TOPIC);

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

    /**

     * @param topic

     * @param message

     * @throws MqttPersistenceException

     * @throws MqttException

     */

    public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,

            MqttException {

        MqttDeliveryToken token = topic.publish(message);

        token.waitForCompletion();

        System.out.println("message is published completely! "

                + token.isComplete());

    }

    /**

     *  啓動入口

     * @param args

     * @throws MqttException

     */

    public static void main(String[] args) throws MqttException {

        ServerMQTT server = new ServerMQTT();

        server.message = new MqttMessage();

        server.message.setQos(1);  //保證消息能到達一次

        server.message.setRetained(true);

        server.message.setPayload("I love this Summer 8888".getBytes());

        server.publish(server.topic11 , server.message);

        System.out.println(server.message.isRetained() + "------ratained狀態");

    }

}
複製代碼

 

publish –> pushCallBack.deliveryComplete()

2.6.3 訂閱消息

複製代碼
/**

 * 模擬一個客戶端接收消息

 */

public class ClientMQTT {

    public static final String HOST = "tcp://111.9.116.136:1883";

    public static final String TOPIC1 = "pos_message_all";

    private static final String clientid = "client11";

    private MqttClient client;

    private MqttConnectOptions options;

    private String userName = "admin";    //非必須

    private String passWord = "password";  //非必須

    @SuppressWarnings("unused")

    private ScheduledExecutorService scheduler;

    /**

     * $SYS中各主題說明以下:

     $SYS/broker/load/connections/+     不一樣時間段內服務器接收到的connections包的平均數。最後的「+」但是1min,5min,15min。分別表示1分鐘,5分鐘,15分鐘的平均數。

     $SYS/broker/load/bytes/received/+     不一樣時間段內服務器接收數據的平均字節數。最後的「+」但是1min,5min,15min。

     $SYS/broker/load/bytes/sent/+     不一樣時間段內服務器發送數據的平均字節數。最後的「+」但是1min,5min,15min。

     $SYS/broker/load/messages/received/+     不一樣時間段內服務器接收到的全部類型消息的平均數。最後的「+」但是1min,5min,15min。

     $SYS/broker/load/messages/sent/+     不一樣時間段內服務器發送的全部類型的消息的平均數。最後的「+」但是1min,5min,15min。

     $SYS/broker/load/publish/dropped/+     不一樣時間段內服務器丟棄的消息的平均數,這代表了那些持久鏈接但與服務器斷開的客戶端失去消息的速率。最後的「+」但是1min,5min,15min。

     $SYS/broker/load/publish/received/+     不一樣時間段內服務器接收的發佈消息的平均數。最後的「+」但是1min,5min,15min。

     $SYS/broker/load/publish/sent/+     不一樣時間段內服務器發送的發佈消息的平均數。最後的「+」但是1min,5min,15min。

     $SYS/broker/load/sockets/+     不一樣時間段內服務器打開的socket鏈接的平均數。最後的「+」但是1min,5min,15min。

     $SYS/broker/messages/inflight     等待確認的Qos>0的消息的數量。

     $SYS/broker/messages/received     自服務器啓動以來接收的全部類型的消息總數。

     $SYS/broker/messages/sent     自服務器啓動以來發送的全部類型的消息總數。

     $SYS/broker/messages/stored     服務器存儲的消息的總數,包括保留消息和持久鏈接客戶端的消息隊列中的消息數。

     $SYS/broker/publish/messages/dropped     因爲inflight/queuing限制而直接丟棄的消息的總數,相關設置請查看mosquitto.conf中max_inflight_messages 和max_queued_messages參數。

     $SYS/broker/publish/messages/received     自服務器啓動以來接收的發佈消息的總數。

     $SYS/broker/publish/messages/sent     自服務器啓動以來發送的發佈消息的總數。

     $SYS/broker/retained messages/count     服務器保留的消息總數。

     $SYS/broker/subscriptions/count     服務器訂閱主題總數。

     $SYS/broker/timestamp     Mosquitto軟件build的詳細時間(Static)。

     $SYS/broker/uptime     Mosquitto啓動時長(單位:秒)。

     $SYS/broker/version     Mosquitto軟件版本號(Static)。

     */

    public static final String TOPIC2 = "$SYS/broker/bytes/received";  //自服務器啓動以來共接收的字節數

    public static final String TOPIC3 = "$SYS/broker/bytes/sent";  //自服務器啓動以來共發送的字節數

    public static final String TOPIC4 = "$SYS/broker/clients/expired";  //超過有效期被斷開鏈接的客戶端數量,有效期經過persistent_client_expiration參數設置。

    public static final String TOPIC5 = "$SYS/broker/clients/disconnected";  //自服務器啓動以來斷開的鏈接數

    public static final String TOPIC6 = "$SYS/broker/clients/maximum";  //服務器同一時間鏈接的最大客戶端數量

    public static final String TOPIC7 = "$SYS/broker/clients/total";  //有效和無效鏈接、註冊到服務器上的總數。

    public static final String TOPIC8 = "$SYS/broker/connection/#";  //若是服務器設置了橋接,系統會提供一個主題來標識鏈接狀態,默認使用$SYS/broker/connection/,若是主題值爲1表示鏈接激活,若是爲0表示鏈接沒有激活。

    public static final String TOPIC9 = "$SYS/broker/heap/current size";  //Mosquitto正在使用的堆內存大小。注意這個主題是否可使用取決於系統編譯時的相關參數設置。

    public static final String TOPIC10 = "$SYS/broker/heap/maximum size";  //Mosquitto使用的最大堆內存。這個參數是否有效也取決於系統編譯時的相關參數設置。

    private void start() {

        try {

            // host爲主機名,clientid即鏈接MQTT的客戶端ID,通常以惟一標識符表示,MemoryPersistence設置clientid的保存形式,默認爲之內存保存

            client = new MqttClient(HOST, clientid, new MemoryPersistence());

            // MQTT的鏈接設置

            options = new MqttConnectOptions();

            // 設置是否清空session,這裏若是設置爲false表示服務器會保留客戶端的鏈接記錄,設置爲true表示每次鏈接到服務器都以新的身份鏈接

            options.setCleanSession(false);

            // 設置鏈接的用戶名

            options.setUserName(userName);

            // 設置鏈接的密碼

            options.setPassword(passWord.toCharArray());

            // 設置超時時間 單位爲秒

            options.setConnectionTimeout(10);

            // 設置會話心跳時間 單位爲秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並無重連的機制

            options.setKeepAliveInterval(20);

            // 設置回調

            client.setCallback(new PushCallBack());

            MqttTopic topic = client.getTopic(TOPIC1);

            //setWill方法,若是項目中須要知道客戶端是否掉線能夠調用該方法。設置最終端口的通知消息

            options.setWill(topic, "close".getBytes(), 2, true);//遺囑

            client.connect(options);

            //訂閱消息

            int[] Qos  = {1,0,2,1,0,2,1,0,2};

            String[] topic1 = {TOPIC2,TOPIC3,TOPIC4,TOPIC5,TOPIC6,TOPIC7,TOPIC8,TOPIC9,TOPIC10};

//            int[] Qos  = {1};

//            String[] topic1 = {TOPIC1};

            client.subscribe(topic1, Qos);

        } catch (Exception e) {

            e.printStackTrace();
        }

    }

    public static void main(String[] args) throws MqttException {

        ClientMQTT client = new ClientMQTT();

        client.start();

    }

}
複製代碼

Client.start()->pushCallBack.messageArrived()

2.6.4回調函數實現mqttCallBack接口

PushCallBack 必須實現 MqttCallback 接口
有三個方法:
複製代碼
public void connectionLost(Throwable cause) {

    // 鏈接丟失後,通常在這裏面進行重連

    System.out.println("鏈接斷開,能夠作重連");

}

public void deliveryComplete(IMqttDeliveryToken token) {

    System.out.println("deliveryComplete---------" + token.isComplete() +token.getMessageId());

}


public void messageArrived(String topic, MqttMessage message) throws Exception {

    // subscribe後獲得的消息會執行到這裏面

    System.out.println("接收消息主題 : " + topic);

    System.out.println("接收消息Qos : " + message.getQos());

    System.out.println("接收消息內容 : " + new String(message.getPayload()));

}
複製代碼

2.6.5  服務質量等級說明(Qos)

MQTT提供三種Qos的消息傳遞質量:

最多一次(Atmost once delivery):QoS=0,協議對此等級應用信息不要求迴應確認,也沒有重發機制,這類信息可能會發生消息丟失或重複,取決於TCP/IP提供的盡最大努力交互的數據包服務。

(0:消息最多被傳遞一次,好比通常類廣告,通知)

最少一次(Atleast once delivery):QoS=1,確保信息到達,但消息重複可能發生,發送者若是在指定時間內沒有收到PUBACK控制報文,應用信息會被從新發送,且控制報文中DUP標誌位置1。

(1 :消息會被傳遞但可能會重複傳遞,好比帳戶餘額通知)

僅僅一次(Exactlyonce delivery):QoS=2,最高級別的服務質量,消息丟失和重複都是不可接受的。

(2 :消息保證傳遞且僅有一次傳遞,好比交易支付批覆通知)

2.6.6 斷開連接

pushCallBack.connectionLost()

3    EMQ

3.1 簡介

EMQ 2.0 (Erlang/Enterprise/Elastic MQTT Broker) 是基於 Erlang/OTP 語言平臺開發,支持大規模鏈接和分佈式集羣,發佈訂閱模式的開源 MQTT 消息服務器。

3.1.1官網

http://www.emqtt.com/docs/v2/index.html

3.1.2優勢

3.3.1 商用比較普及,有完備的運營團隊支撐

3.3.2 可視化的管理後臺

開放18083端口訪問管理後臺

3.3.3 能抗高併發

官方的回覆是8核心32G的配置可以承載160W臺設備的連接

3.1.3 缺點

3.1.4安裝及發現的問題

複製代碼
nzip emqttd-macosx-v2.0.zip && cd emqttd
# 啓動emqttd  ./bin/emqttd start
# 檢查運行狀態./bin/emqttd_ctl status
# 中止emqttd ./bin/emqttd stop
默認配置文件 /bin/emqenv
[ "x" = "x$EMQ_NODE_NAME" ] && EMQ_NODE_NAME=emqttd@127.0.0.1
[ "x" = "x$EMQ_NODE_COOKIE" ] && EMQ_NODE_COOKIE=emqsecretcookie
[ "x" = "x$EMQ_MAX_PACKET_SIZE" ] && EMQ_MAX_PACKET_SIZE=64KB
[ "x" = "x$EMQ_MAX_PORTS" ] && EMQ_MAX_PORTS=65536
[ "x" = "x$EMQ_TCP_PORT" ] && EMQ_TCP_PORT=1883
[ "x" = "x$EMQ_SSL_PORT" ] && EMQ_SSL_PORT=8883
[ "x" = "x$EMQ_WS_PORT" ] && EMQ_WS_PORT=8083
[ "x" = "x$EMQ_WSS_PORT" ] && EMQ_WSS_PORT=8084
對外暴露的tcp端口依然是1883  和mosquitto同樣
複製代碼

 

3.1.5 操做

在客戶端分別訂閱和發佈消息,在管理後臺列表能夠看到消息的狀態,管理後臺默認端口爲18083

相關文章
相關標籤/搜索