MQTT(MQ Telemetry Transport) 消息隊列遙測傳輸協議是IBM開發的一種網絡應用層的協議,提供輕量級的,支持可發佈/可訂閱的的消息推送模式,使設備對設備之間的短消息通訊變得簡單,好比如今應用普遍的低功耗傳感器,手機、嵌入式計算機、微型控制器,衛星等移動設備。javascript
MQTT 的獨特之處在於,它的每消息標題能夠短至 2 個byte。MQ 和 HTTP 都擁有高得多的每消息開銷。對於 HTTP,爲每一個新請求消息從新創建 HTTP 鏈接會致使重大的開銷。MQ 和 MQTT 所使用的永久鏈接顯著減小了這一開銷。html
您須要可以及時地將通知傳遞給客戶。爲此,必須採用某種按期輪詢或推送方法;從電池、系統負載和帶寬角度講,推送是最佳解決方案。MQTT 是專門針對低功耗目標而設計的。HTTP 的設計沒有考慮此因素,所以增長了功耗。java
在 HTTP 堆棧上,維護數百萬個併發鏈接,須要作許多的工做來提供支持。儘管能夠實現此支持,但大多數商業產品都爲處理這一數量級的永久鏈接而進行了優化。IBM 提供了 IBM MessageSight,這是一個單機架裝載服務器,通過測試能處理多達 100 萬個經過 MQTT 併發鏈接的設備。相反,MQ 不是爲大量併發客戶端而設計的。c++
MQTT提供三種不一樣消息傳遞等級,讓消息能按需到達目的地,適應在不穩定工做的網絡傳輸需求。MQTT 和 MQ 可以從斷開等故障中恢復,並且沒有進一步的代碼需求。可是,HTTP 沒法原生地實現此目的,須要客戶端重試編碼,這可能增長冪等性問題。spring
支持各類流行編程語言(包括C,Java,Ruby,Python 等等)且易於使用的客戶端。macos
支持發佈 / 訂閱模型,簡化應用程序的開發。編程
企業可能須要在沒有第三方中介的狀況下發送敏感的信息。這下降了特定於操做系統的解決方案(好比 Apple iOS、Google Play 通知)做爲主要傳輸機制的價值。安全
HTTP 只容許使用一種稱爲COMET 的方法,使用持久的 HTTP 請求來執行推送。從客戶端和服務器的角度講,此方法都很昂貴。MQ 和 MQTT 都支持推送,這是它們的一個基本特性。服務器
一些企業防火牆將出站鏈接限制到一些已定義的端口。這些端口一般被限制爲 HTTP(80 端口)、HTTPS(443 端口)等。HTTP 顯然能夠在這些狀況下運行。MQTT 可封裝在一個 WebSockets 鏈接中,顯示爲一個 HTTP 升級請求,從而容許在這些狀況下運行。MQ 不容許採用這種模式。cookie
因爲MQTT自己的各項技術優點,愈來愈多的企業傾向於選用MQTT做爲物聯網產品通信的標準協議,也所以,工程師們漸漸發現MQTT協議要想大規模商用,也有一些有待完善的功能。好比:
不一樣的異構終端,須要有對應的與MQTT服務器通訊的軟件SDK包,好比MCU、Linux、Android、IOS、WEB等之間要實現互聯互通必然須要不一樣的SDK包。
有些應用場景,須要傳輸的信息可能不只僅限於指令,好比聲音信號和視頻信號,這些須要經過File和AV來實現通訊。
雖然MQTT協議優於普通的HTTP協議,可是基於傳統的HTTP協議的WEB服務器仍然佔主流市場,那麼這些服務器要實現與MQTT協議的互聯互通,以下降升級成本也尤其關鍵。
用戶在進行設備的行爲數據分析的時候,顯得尤其重要,這又是工業4.0、大數據時代的必然需求。
消息彌補設備離線之後,MQTT服務器對設備的控制信息丟失的問題。
(解決方案:https://helpcdn.aliyun.com/document_detail/59914.html)
採用標準的MQTT協議,理論上能夠經過相互訂閱的方式實現點對點通訊,可是邏輯相對複雜,而且對設備的安全性方面存在擔心。當設備B和設備C在同一主題的狀況下,設備A沒法知道是設備B仍是設備C發送的消息,也有可能消息被設備D竊聽。
實現了對羣組成員的管理,羣組成員之間能互通消息,這在一個設備被多人控制,或者多個設備被一人控制的這種場景下,尤其有用。
怎麼樣,是否是一目瞭然,很是簡單
一款實現了消息推送協議 MQTT v3.1 的開源消息代理軟件,提供輕量級的,支持可發佈/可訂閱的的消息推送模式,使設備對設備之間的短消息通訊變得簡單,好比如今應用普遍的低功耗傳感器,手機、嵌入式計算機、微型控制器等移動設備。一個典型的應用案例就是 Andy Stanford-ClarkMosquitto(MQTT協議創始人之一)在家中實現的遠程監控和自動化。並在 OggCamp 的演講上,對MQTT協議進行詳細闡述。
https://mosquitto.org/
2.3.1使用簡單
2.3.2 網絡資料豐富
2.4.1 沒有可視化管理後臺
2.4.2 商用實例很少
1.源碼包下載:http://mosquitto.org/files/source/
或者wget http://mosquitto.org/files/source/mosquitto-1.4.9.tar.gz
解壓:tar -zxvf mosquitto-1.4.tar.gz
進入目錄:cd mosquitto-1.4
2.編譯安裝
打開配置文件,去掉暫且不須要的功能:
vi config.mk
如:WITH_TLS,WITH_TLS_PSK, WITH_SRV, WITH_WEBSOCKETS, WITH_SOCKS, WITH_UUID等
保存退出:wq
安裝mosquitto
make
make install
踩過的坑:
a】編譯找不到openssl/ssl.h
安裝openssl sudo apt-get install libssl-dev
【b】編譯過程找不到ares.h
sudo apt-get install libc-ares-dev
【c】編譯過程找不到uuid/uuid.h
sudo apt-get install uuid-dev
【d】使用過程當中找不到libmosquitto.so.1
error while loading shared libraries: libmosquitto.so.1: cannot open shared object file: No such file or directory
【解決方法】——修改libmosquitto.so位置
# 建立連接
sudo ln -s /usr/local/lib/libmosquitto.so.1 /usr/lib/libmosquitto.so.1
# 更新動態連接庫
sudo ldconfig
【e】make: g++:命令未找到
【解決方法】
安裝g++編譯器
sudo apt-get install g++
啓動 mosquitto broker
mosquitto -c /etc/mosquitto/mosquitto.conf.example &
-c : specify the broker config file.
-d : put the broker into the background after starting.
-h : display this help.
-p : start the broker listening on the specified port.
Not recommended in conjunction with the -c option.
-v : verbose mode - enable all logging types. This overrides
any logging options given in the config file.
訂閱消息:
./mosquitto_sub -h 127.0.0.1 -p 1883 -t "/sports/wordcup"
發佈消息:
./mosquitto_pub -h 127.0.0.1 -p 1883 -t "/sports/wordcup " -m "this is carter hello"
或者
./mosquitto_sub -h 10.129.4.12 -p 1883 -t "/sports/wordcup"
./mosquitto_pub -h 10.129.4.12 -p 1883 -t "/sports/wordcup" -m "this is carter hello 666"
外網地址不行,因此我在本機用paho代碼一直報timeOut異常,緣由是服務器防火牆未放開端口
【解決辦法】
放開防火牆端口:firewall-cmd --add-port=1883/tcp –permanent
重啓防火牆:systemctl restart firewalld
Java客戶端實現
採用eclipse.paho框架
新建maven工程,加入依賴
<!-- spring整合mqtt 開始--> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>4.1.0.RELEASE</version> <exclusions> <exclusion> <groupId>org.eclipse.paho</groupId> <artifactId>mqtt-client</artifactId> </exclusion> </exclusions> </dependency> <!-- spring整合mqtt 結束--> <!-- mqtt依賴 開始 --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>mqtt-client</artifactId> </dependency> <!-- mqtt依賴 結束 -->
public class ServerMQTT { //tcp://MQTT安裝的服務器地址:MQTT定義的端口號 public static final String HOST = "tcp://111.9.116.136:1883"; //定義一個主題 public static final String TOPIC = "pos_message_all"; //定義MQTT的ID,能夠在MQTT服務配置中指定 private static final String clientid = "server11"; private MqttClient client; private MqttTopic topic11; private String userName = "mosquitto"; //非必須 private String passWord = ""; //非必須 private MqttMessage message; /** * 構造函數 * @throws MqttException */ public ServerMQTT() throws MqttException { // MemoryPersistence設置clientid的保存形式,默認爲之內存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence()); connect(); } /** * 用來鏈接服務器 */ private void connect() { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(passWord.toCharArray()); // 設置超時時間 options.setConnectionTimeout(10); // 設置會話心跳時間 options.setKeepAliveInterval(20); try { client.setCallback(new PushCallBack()); client.connect(options); topic11 = client.getTopic(TOPIC); } catch (Exception e) { e.printStackTrace(); } } /** * @param topic * @param message * @throws MqttPersistenceException * @throws MqttException */ public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException, MqttException { MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); System.out.println("message is published completely! " + token.isComplete()); } /** * 啓動入口 * @param args * @throws MqttException */ public static void main(String[] args) throws MqttException { ServerMQTT server = new ServerMQTT(); server.message = new MqttMessage(); server.message.setQos(1); //保證消息能到達一次 server.message.setRetained(true); server.message.setPayload("I love this Summer 8888".getBytes()); server.publish(server.topic11 , server.message); System.out.println(server.message.isRetained() + "------ratained狀態"); } }
/** * 模擬一個客戶端接收消息 */ public class ClientMQTT { public static final String HOST = "tcp://111.9.116.136:1883"; public static final String TOPIC1 = "pos_message_all"; private static final String clientid = "client11"; private MqttClient client; private MqttConnectOptions options; private String userName = "admin"; //非必須 private String passWord = "password"; //非必須 @SuppressWarnings("unused") private ScheduledExecutorService scheduler; /** * $SYS中各主題說明以下: $SYS/broker/load/connections/+ 不一樣時間段內服務器接收到的connections包的平均數。最後的「+」但是1min,5min,15min。分別表示1分鐘,5分鐘,15分鐘的平均數。 $SYS/broker/load/bytes/received/+ 不一樣時間段內服務器接收數據的平均字節數。最後的「+」但是1min,5min,15min。 $SYS/broker/load/bytes/sent/+ 不一樣時間段內服務器發送數據的平均字節數。最後的「+」但是1min,5min,15min。 $SYS/broker/load/messages/received/+ 不一樣時間段內服務器接收到的全部類型消息的平均數。最後的「+」但是1min,5min,15min。 $SYS/broker/load/messages/sent/+ 不一樣時間段內服務器發送的全部類型的消息的平均數。最後的「+」但是1min,5min,15min。 $SYS/broker/load/publish/dropped/+ 不一樣時間段內服務器丟棄的消息的平均數,這代表了那些持久鏈接但與服務器斷開的客戶端失去消息的速率。最後的「+」但是1min,5min,15min。 $SYS/broker/load/publish/received/+ 不一樣時間段內服務器接收的發佈消息的平均數。最後的「+」但是1min,5min,15min。 $SYS/broker/load/publish/sent/+ 不一樣時間段內服務器發送的發佈消息的平均數。最後的「+」但是1min,5min,15min。 $SYS/broker/load/sockets/+ 不一樣時間段內服務器打開的socket鏈接的平均數。最後的「+」但是1min,5min,15min。 $SYS/broker/messages/inflight 等待確認的Qos>0的消息的數量。 $SYS/broker/messages/received 自服務器啓動以來接收的全部類型的消息總數。 $SYS/broker/messages/sent 自服務器啓動以來發送的全部類型的消息總數。 $SYS/broker/messages/stored 服務器存儲的消息的總數,包括保留消息和持久鏈接客戶端的消息隊列中的消息數。 $SYS/broker/publish/messages/dropped 因爲inflight/queuing限制而直接丟棄的消息的總數,相關設置請查看mosquitto.conf中max_inflight_messages 和max_queued_messages參數。 $SYS/broker/publish/messages/received 自服務器啓動以來接收的發佈消息的總數。 $SYS/broker/publish/messages/sent 自服務器啓動以來發送的發佈消息的總數。 $SYS/broker/retained messages/count 服務器保留的消息總數。 $SYS/broker/subscriptions/count 服務器訂閱主題總數。 $SYS/broker/timestamp Mosquitto軟件build的詳細時間(Static)。 $SYS/broker/uptime Mosquitto啓動時長(單位:秒)。 $SYS/broker/version Mosquitto軟件版本號(Static)。 */ public static final String TOPIC2 = "$SYS/broker/bytes/received"; //自服務器啓動以來共接收的字節數 public static final String TOPIC3 = "$SYS/broker/bytes/sent"; //自服務器啓動以來共發送的字節數 public static final String TOPIC4 = "$SYS/broker/clients/expired"; //超過有效期被斷開鏈接的客戶端數量,有效期經過persistent_client_expiration參數設置。 public static final String TOPIC5 = "$SYS/broker/clients/disconnected"; //自服務器啓動以來斷開的鏈接數 public static final String TOPIC6 = "$SYS/broker/clients/maximum"; //服務器同一時間鏈接的最大客戶端數量 public static final String TOPIC7 = "$SYS/broker/clients/total"; //有效和無效鏈接、註冊到服務器上的總數。 public static final String TOPIC8 = "$SYS/broker/connection/#"; //若是服務器設置了橋接,系統會提供一個主題來標識鏈接狀態,默認使用$SYS/broker/connection/,若是主題值爲1表示鏈接激活,若是爲0表示鏈接沒有激活。 public static final String TOPIC9 = "$SYS/broker/heap/current size"; //Mosquitto正在使用的堆內存大小。注意這個主題是否可使用取決於系統編譯時的相關參數設置。 public static final String TOPIC10 = "$SYS/broker/heap/maximum size"; //Mosquitto使用的最大堆內存。這個參數是否有效也取決於系統編譯時的相關參數設置。 private void start() { try { // host爲主機名,clientid即鏈接MQTT的客戶端ID,通常以惟一標識符表示,MemoryPersistence設置clientid的保存形式,默認爲之內存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence()); // MQTT的鏈接設置 options = new MqttConnectOptions(); // 設置是否清空session,這裏若是設置爲false表示服務器會保留客戶端的鏈接記錄,設置爲true表示每次鏈接到服務器都以新的身份鏈接 options.setCleanSession(false); // 設置鏈接的用戶名 options.setUserName(userName); // 設置鏈接的密碼 options.setPassword(passWord.toCharArray()); // 設置超時時間 單位爲秒 options.setConnectionTimeout(10); // 設置會話心跳時間 單位爲秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並無重連的機制 options.setKeepAliveInterval(20); // 設置回調 client.setCallback(new PushCallBack()); MqttTopic topic = client.getTopic(TOPIC1); //setWill方法,若是項目中須要知道客戶端是否掉線能夠調用該方法。設置最終端口的通知消息 options.setWill(topic, "close".getBytes(), 2, true);//遺囑 client.connect(options); //訂閱消息 int[] Qos = {1,0,2,1,0,2,1,0,2}; String[] topic1 = {TOPIC2,TOPIC3,TOPIC4,TOPIC5,TOPIC6,TOPIC7,TOPIC8,TOPIC9,TOPIC10}; // int[] Qos = {1}; // String[] topic1 = {TOPIC1}; client.subscribe(topic1, Qos); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws MqttException { ClientMQTT client = new ClientMQTT(); client.start(); } }
PushCallBack 必須實現 MqttCallback 接口
有三個方法:
public void connectionLost(Throwable cause) { // 鏈接丟失後,通常在這裏面進行重連 System.out.println("鏈接斷開,能夠作重連"); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete() +token.getMessageId()); } public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe後獲得的消息會執行到這裏面 System.out.println("接收消息主題 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); System.out.println("接收消息內容 : " + new String(message.getPayload())); }
MQTT提供三種Qos的消息傳遞質量:
最多一次(Atmost once delivery):QoS=0,協議對此等級應用信息不要求迴應確認,也沒有重發機制,這類信息可能會發生消息丟失或重複,取決於TCP/IP提供的盡最大努力交互的數據包服務。
(0:消息最多被傳遞一次,好比通常類廣告,通知)
最少一次(Atleast once delivery):QoS=1,確保信息到達,但消息重複可能發生,發送者若是在指定時間內沒有收到PUBACK控制報文,應用信息會被從新發送,且控制報文中DUP標誌位置1。
(1 :消息會被傳遞但可能會重複傳遞,好比帳戶餘額通知)
僅僅一次(Exactlyonce delivery):QoS=2,最高級別的服務質量,消息丟失和重複都是不可接受的。
(2 :消息保證傳遞且僅有一次傳遞,好比交易支付批覆通知)
EMQ 2.0 (Erlang/Enterprise/Elastic MQTT Broker) 是基於 Erlang/OTP 語言平臺開發,支持大規模鏈接和分佈式集羣,發佈訂閱模式的開源 MQTT 消息服務器。
http://www.emqtt.com/docs/v2/index.html
開放18083端口訪問管理後臺
官方的回覆是8核心32G的配置可以承載160W臺設備的連接
nzip emqttd-macosx-v2.0.zip && cd emqttd # 啓動emqttd ./bin/emqttd start # 檢查運行狀態./bin/emqttd_ctl status # 中止emqttd ./bin/emqttd stop 默認配置文件 /bin/emqenv [ "x" = "x$EMQ_NODE_NAME" ] && EMQ_NODE_NAME=emqttd@127.0.0.1 [ "x" = "x$EMQ_NODE_COOKIE" ] && EMQ_NODE_COOKIE=emqsecretcookie [ "x" = "x$EMQ_MAX_PACKET_SIZE" ] && EMQ_MAX_PACKET_SIZE=64KB [ "x" = "x$EMQ_MAX_PORTS" ] && EMQ_MAX_PORTS=65536 [ "x" = "x$EMQ_TCP_PORT" ] && EMQ_TCP_PORT=1883 [ "x" = "x$EMQ_SSL_PORT" ] && EMQ_SSL_PORT=8883 [ "x" = "x$EMQ_WS_PORT" ] && EMQ_WS_PORT=8083 [ "x" = "x$EMQ_WSS_PORT" ] && EMQ_WSS_PORT=8084 對外暴露的tcp端口依然是1883 和mosquitto同樣
在客戶端分別訂閱和發佈消息,在管理後臺列表能夠看到消息的狀態,管理後臺默認端口爲18083