基於RabbitMQ的MQTT協議及應用

MQTT的開源代碼地址先貼在這裏:https://github.com/mqtt/mqtt.github.io/wiki/serversphp

MQTT定義:html

  MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是IBM開發的一個即時通信協議,有可能成爲物聯網的重要組成部分。該協議支持全部平臺,幾乎能夠把全部聯網物品和外部鏈接起來,被用來當作傳感器和制動器(好比經過Twitter讓房屋聯網)的通訊協議。java

MQTT特色:git

  MQTT協議是爲大量計算能力有限,且工做在低帶寬、不可靠的網絡的遠程傳感器和控制設備通信而設計的協議,它具備如下主要的幾項特性:
一、使用發佈/訂閱消息模式,提供一對多的消息發佈,解除應用程序耦合;
二、對負載內容屏蔽的消息傳輸;
三、使用 TCP/IP 提供網絡鏈接;
四、有三種消息發佈服務質量:
    • 「至多一次」,消息發佈徹底依賴底層 TCP/IP 網絡。會發生消息丟失或重複。這一級別可用於以下狀況,環境傳感器數據,丟失一次讀記錄無所謂,由於不久後還會有第二次發送。
    • 「至少一次」,確保消息到達,但消息重複可能會發生。
    • 「只有一次」,確保消息到達一次。這一級別可用於以下狀況,在計費系統中,消息重複或丟失會致使不正確的結果。
五、小型傳輸,開銷很小(固定長度的頭部是 2 字節),協議交換最小化,以下降網絡流量;
六、使用 Last Will 和 Testament 特性通知有關各方客戶端異常中斷的機制;  
 
MQTT協議數據包結構:

在MQTT協議中,一個MQTT數據包由:固定頭(Fixed header)、可變頭(Variable header)、消息體(payload)三部分構成。MQTT數據包結構以下:github

(1)固定頭(Fixed header)。存在於全部MQTT數據包中,表示數據包類型及數據包的分組類標識。web

(2)可變頭(Variable header)。存在於部分MQTT數據包中,數據包類型決定了可變頭是否存在及其具體內容。api

(3)消息體(Payload)。存在於部分MQTT數據包中,表示客戶端收到的具體內容。服務器

固定頭部

固定頭部,使用兩個字節,共16位:微信

bit 7 6 5 4 3 2 1 0
byte 1 Message Type DUP flag QoS level RETAIN
byte 2 Remaining Length

第一個字節(byte 1)

消息類型(4-7),使用4位二進制表示,可表明16種消息類型:網絡

Mnemonic Enumeration Description
Reserved 0 Reserved
CONNECT 1 Client request to connect to Server
CONNACK 2 Connect Acknowledgment
PUBLISH 3 Publish message
PUBACK 4 Publish Acknowledgment
PUBREC 5 Publish Received (assured delivery part 1)
PUBREL 6 Publish Release (assured delivery part 2)
PUBCOMP 7 Publish Complete (assured delivery part 3)
SUBSCRIBE 8 Client Subscribe request
SUBACK 9 Subscribe Acknowledgment
UNSUBSCRIBE 10 Client Unsubscribe request
UNSUBACK 11 Unsubscribe Acknowledgment
PINGREQ 12 PING Request
PINGRESP 13 PING Response
DISCONNECT 14 Client is Disconnecting
Reserved 15 Reserved

除去0和15位置屬於保留待用,共14種消息事件類型。

DUP flag(打開標誌)

發佈消息的副本。用來在保證消息的可靠傳輸,若是設置爲1,則在下面的變長中增長MessageId,而且須要回覆確認,以保證消息傳輸完成,但不能用於檢測消息重複發送。

保證消息可靠傳輸,默認爲0,只佔用一個字節,表示第一次發送。不能用於檢測消息重複發送等。只適用於客戶端或服務器端嘗試重發PUBLISH, PUBREL, SUBSCRIBE 或 UNSUBSCRIBE消息,注意須要知足如下條件:

當QoS > 0
 消息須要回覆確認

此時,在可變頭部須要包含消息ID。當值爲1時,表示當前消息先前已經被傳送過。

QoS(Quality of Service,服務質量)

使用兩個二進制表示PUBLISH類型消息:

QoS value bit 2 bit 1 Description
0 0 0 至多一次 發完即丟棄 <=1
1 0 1 至少一次 須要確認回覆 >=1
2 1 0 只有一次 須要確認回覆 =1
3 1 1 待用,保留位置

RETAIN(保持)

僅針對PUBLISH消息。不一樣值,不一樣含義:

1:表示發送的消息須要一直持久保存(不受服務器重啓影響),不但要發送給當前的訂閱者,而且之後新來的訂閱了此Topic name的訂閱者會立刻獲得推送。

備註:新來乍到的訂閱者,只會取出最新的一個RETAIN flag = 1的消息推送。

0:僅僅爲當前訂閱者推送此消息。

假如服務器收到一個空消息體(zero-length payload)、RETAIN = 一、已存在Topic name的PUBLISH消息,服務器能夠刪除掉對應的已被持久化的PUBLISH消息。

Remaining Length(剩餘長度)

固定頭的第二字節用來保存變長頭部和消息體的總大小的,但不是直接保存的。這一字節是能夠擴展,其保存機制,前7位用於保存長度,後一部用作標識。當最後一位爲1時,表示長度不足,須要使用二個字節繼續保存。

在當前消息中剩餘的byte(字節)數,包含可變頭部和負荷(內容)。

單個字節最大值:01111111,16進制:0x7F,10進製爲127。

MQTT協議規定,第八位(最高位)若爲1,則表示還有後續字節存在。

MQTT協議最多容許4個字節表示剩餘長度。最大長度爲:0xFF,0xFF,0xFF,0x7F,二進制表示爲:11111111,11111111,11111111,01111111,十進制:268435455 byte=261120KB=256MB=0.25GB 四個字節之間值的範圍:

Digits From To
1字節 0 (0x00) 127 (0x7F)
2字節 128 (0x80, 0x01) 16 383 (0xFF, 0x7F)
3字節 16 384 (0x80, 0x80, 0x01) 2 097 151 (0xFF, 0xFF, 0x7F)
4字節 2 097 152 (0x80, 0x80, 0x80, 0x01) 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F)

其實換個方式理解:第1字節的基數是1,而第2字節的基數:128,以此類推,第三字節的基數是:128*128=2的14次方,第四字節是:128*128*128=2的21次方;

例如,須要表達321=2*128+65.(2字節):10100001 0000 0011.

(和咱們理解的低位運算放置順序不同,第一個字節是低位,後續字節是高位,但字節內部自己是低位右邊,高位左邊)。

可變頭部

固定頭部僅定義了消息類型和一些標誌位,一些消息的元數據,須要放入可變頭部中。可變頭部內容字節長度 + Playload/負荷字節長度 = 剩餘長度。

可變頭部,包含了協議名稱,版本號,鏈接標誌,用戶受權,心跳時間等內容。

可變頭部居於固定頭部和payload中間。

可變剩餘長度(remaing length)不是可變頭部的一部分,固然該長度值也是從可變頭部開始計算,包含可變頭部的長度+payload的長度。

可變頭部的字段以下:

協議名稱: MQTT CONNECT message. UTF編碼:如 MQIsdp, capitalized.

協議版本:8位無符號,當前使用:3 (0x03),以下:

bit 7 6 5 4 3 2 1 0
  Protocol Version
  0 0 0 0 0 0 1 1

Connect flags

Clean session, Will, Will QoS, Retain flags 該字段的設置

一個字節表示,除了第1位是保留未使用,其它7位都具備不一樣含義。

業務上很重要,對消息整體流程影響很大,須要牢記。

 

Clean session flag

Position: bit 1 ,鏈接標誌.

0:server須要存儲client的訂閱。包括存儲Qos 1和2的訂閱主題(當client重連時能將消息發送);當鏈接丟失的時候 服務器必須維護正在發送的消息的狀態直到客戶端從新鏈接到服務器。

1:server MUST忽略以前維護關於client的信息,而且將該connection當成clean的。server MUST 忽略任何client斷開的狀態。

 

原文翻譯不能,因此參考了下一位大牛的表達:

0,表示若是訂閱的客戶機斷線了,要保存爲其要推送的消息(QoS爲1和QoS爲2),若其從新鏈接時,需將這些消息推送(若客戶端長時間不鏈接,須要設置一個過時值)。 
1,斷線服務器即清理相關信息,從新鏈接上來以後,會再次訂閱。

Will Flag

定義了客戶端(沒有主動發送DISCONNECT消息)出現網絡異常致使鏈接中斷的狀況下,服務器須要作的一些措施。

簡而言之,就是客戶端預先定義好,在本身異常斷開的狀況下,所留下的最後遺願(Last Will),也稱之爲遺囑(Testament)。 這個遺囑就是一個由客戶端預先定義好的主題和對應消息,附加在CONNECT的可變頭部中,在客戶端鏈接出現異常的狀況下,由服務器主動發佈此消息。

只有在Will Flag位爲1時,Will Qos和Will Retain纔會被讀取,此時消息體Playload中要出現Will Topic和Will Message具體內容,不然,Will QoS和Will Retain值會被忽略掉。

Will Qos

兩位表示,和PUBLISH消息固定頭部的QoS level含義同樣。

若標識了Will Flag值爲1,那麼Will QoS就會生效,不然會被忽略掉。

Will RETAIN

若是設置Will Flag,Will Retain標誌就是有效的,不然它將被忽略。

當客戶端意外斷開服務器發佈其Will Message以後,服務器是否應該繼續保存。這個屬性和PUBLISH固定頭部的RETAIN標誌含義同樣,這裏先掠過。

User name 和 password Flag:

用於受權,二者要麼爲0要麼爲1,不然都是無效。都爲0,表示客戶端可自由鏈接/訂閱,都爲1,表示鏈接/訂閱須要受權。

 

bit 7 6 5 4 3 2 1 0
  User Name Flag Password Flag Will Retain Will QoS Will Flag Clean Session Reserved
  x x x x x x   x

 

Playload/消息體/負荷

消息體主要是爲配合固定/可變頭部命令(好比CONNECT可變頭部User name標記若爲1則須要在消息體中附加用戶名稱字符串)而存在。

CONNECT/SUBSCRIBE/SUBACK/PUBLISH等消息有消息體。PUBLISH的消息體以二進制形式對待。

MQTT協議只容許在PUBLISH類型消息體中使用自定義特性,在固定/可變頭部想加入自定義私有特性是不容許的。

這也是爲了協議免於流於形式,變得很分裂也爲了兼顧現有客戶端等。好比支持壓縮等,那就能夠在Playload中定義數據支持,在應用中進行讀取處理。

這部分會在後面詳細論述。

消息標識符/消息ID

固定頭中的QoS level標誌值爲1或2時纔會在:PUBLISH,PUBACK,PUBREC,PUBREL,PUBCOMP,SUBSCRIBE,SUBACK,UNSUBSCRIBE,UNSUBACK等消息的可變頭中出現。

一個16位無符號位的short類型值(值不能爲 0,0作保留做爲無效的消息ID),僅僅要求在一個特定方向(服務器發往客戶端爲一個方向,客戶端發送到服務器端爲另外一個方向)的通訊消息中必須惟一。好比客戶端發往服務器,有可能存在服務器發往客戶端會同時存在重複,但不礙事。

可變頭部中,須要兩個字節的順序是MSB(Most Significant Bit) LSB(Last/Least Significant Bit),翻譯成中文就是,最高有效位,最低有效位。最高有效位在最低有效位左邊/上面,表示這是一個大端字節/網絡字節序,符合人的閱讀習慣,高位在最左邊。

bit 7 6 5 4 3 2 1 0
  Message Identifier MSB
  Message Identifier LSB

最大長度可爲: 65535

UTF-8編碼

有關字符串,MQTT採用的是修改版的UTF-8編碼,通常形式爲以下:

bit 7 6 5 4 3 2 1 0
byte 1 String Length MSB
byte 2 String Length LSB
bytes 3 ... Encoded Character Data

 

 最後的結構以下:

 

  Description 7 6 5 4 3 2 1 0
Fixed header/固定頭部
    Message Type(1) DUP flag QoS level RETAIN
byte 1
  0 0 0 1 x x x x
byte 2 Remaining Length
Variable header/可變頭部
Protocol Name
byte 1 Length MSB (0) 0 0 0 0 0 0 0 0
byte 2 Length LSB (6) 0 0 0 0 0 1 1 0
byte 3 'M' 0 1 0 0 1 1 0 1
byte 4 'Q' 0 1 0 1 0 0 0 1
byte 5 'I' 0 1 0 0 1 0 0 1
byte 6 's' 0 1 1 1 0 0 1 1
byte 7 'd' 0 1 1 0 0 1 0 0
byte 8 'p' 0 1 1 1 0 0 0 0
Protocol Version Number
byte 9 Version (3) 0 0 0 0 0 0 1 1
Connect Flags
  User Name Flag Password Flag Will Retain Will QoS Will Flag Clean Session Reserved
byte 10
1 1 0 0 1 1 1 x
Keep Alive timer
byte 11 Keep Alive MSB (0) 0 0 0 0 0 0 0 0
byte 12 Keep Alive LSB (10) 0 0 0 0 1 0 1 0
Playload/消息體

Client Identifier(客戶端ID)

1-23個字符長度,客戶端到服務器的全局惟一標誌,若是客戶端ID超出23個字符長度,服務器須要返回碼爲2,標識符被拒絕響應的CONNACK消息。
處理QoS級別1和2的消息ID中,可使用到。
必填項。

Will Topic

Will Flag值爲1,這裏即是Will Topic的內容。QoS級別經過Will QoS字段定義,RETAIN值經過Will RETAIN標識,都定義在可變頭裏面。

Will Message

Will Flag若設爲1,這裏即是Will Message定義消息的內容,對應的主題爲Will Topic。若是客戶端意外的斷開觸發服務器PUBLISH此消息。
長度有可能爲0。
在CONNECT消息中的Will Message是UTF-8編碼的,當被服務器發佈時則做爲二進制的消息體。

User Name

若是設置User Name標識,能夠在此讀取用戶名稱。通常可用於身份驗證。協議建議用戶名爲很少於12個字符,不是必須。

Password

若是設置Password標識,即可讀取用戶密碼。建議密碼爲12個字符或者更少,但不是必須。

 

心跳時間(Keep Alive timer)

以秒爲單位,定義服務器端從客戶端接收消息的最大時間間隔。通常應用服務會在業務層次檢測客戶端網絡是否鏈接,不是TCP/IP協議層面的心跳機制(好比開啓SOCKET的SO_KEEPALIVE選項)。 通常來說,在一個心跳間隔內,客戶端發送一個PINGREQ消息到服務器,服務器返回PINGRESP消息,完成一次心跳交互,繼而等待下一輪。若客戶端沒有收到心跳反饋,會關閉掉TCP/IP端口鏈接,離線。 16位兩個字節,可看作一個無符號的short類型值。最大值,2^16-1 = 65535秒 = 18小時。最小值能夠爲0,表示客戶端不斷開。通常設爲幾分鐘,好比微信心跳週期爲300秒。

Will Message編碼

Will Message在CONNECT Payload/消息體中,使用UTF-8編碼。假設內容爲「abcd」,大概以下:

  Description 7 6 5 4 3 2 1 0
byte 1 Length MSB (0) 0 0 0 0 0 0 0 0
byte 2 Length LSB (4) 0 0 0 0 0 1 0 0
byte 3 'a' (0x61) 0 1 1 0 0 0 0 1
byte 4 'b' (0x62) 0 1 1 0 0 0 1 0
byte 5 'c' (0x63) 0 1 1 0 0 0 1 1
byte 6 'd' (0x64) 0 1 1 0 0 1 0 0

有一點須要記住,PUBLISH的Payload/消息體中以二進制編碼保存。

某刻客戶端異常關閉觸發服務器會PUBLISH此消息。那麼服務器會直接把byte3-byte6之間字符取出,保存爲二進制,附加到PUBLISH消息體中,大概存儲以下:

  Description 7 6 5 4 3 2 1 0
byte 1 'a' (0x61) 0 1 1 0 0 0 0 1
byte 2 'b' (0x62) 0 1 1 0 0 0 1 0
byte 3 'c' (0x63) 0 1 1 0 0 0 1 1
byte 4 'd' (0x64) 0 1 1 0 0 1 0 0

另外,MQTT 3.1協議對Will message的說明很容易引發誤解,3.1.1草案已經獲得修正。

相關說明:

http://mqtt.org/wiki/doku.php/willmessageutf8_support

https://tools.oasis-open.org/issues/browse/MQTT-2

鏈接異常中斷通知機制

CONNECT消息一旦設置在可變頭部設置了Will flag標記,那就啓用了Last-Will-And-Testament特性,此特性很贊。

一旦客戶端出現異常中斷,便會觸發服務器發佈Will Message消息到Will Topic主題上去,通知Will Topic訂閱者,對方因異常退出。

接收CONNECT後的響應動做

接收到CONNECT消息以後,服務器應該返回一個CONNACK消息做爲響應:

  1. 若客戶端繞過CONNECT消息直接發送其它類型消息,服務器應關閉此非法鏈接 若客戶端發送CONNECT以後未收到CONNACT,須要關閉當前鏈接,而後從新鏈接
  2. 相同Client ID客戶端已鏈接到服務器,先前客戶端必須斷開鏈接後,服務器才能完成新的客戶端CONNECT鏈接 客戶端發送無效非法CONNECT消息,服務器須要關閉

CONNACK

一個完整的CONNACK消息大體以下:

  Description 7 6 5 4 3 2 1 0
Fixed header/固定頭部
byte 1   Message type (2) DUP flag QoS flags RETAIN
    0 0 1 0 x x x x
byte 2   Remaining Length (2)
    0 0 0 0 0 0 1 0
Variable header/可變頭部
Topic Name Compression Response
byte 1 Reserved values. Not used. x x x x x x x x
Connect Return Code
byte 2 Return Code                

可變頭部第一個字節爲保留,無甚用處。第二個字節爲鏈接握手返回碼:

返回值 16進制 含義
0 0x00 Connection Accepted
1 0x01 Connection Refused: unacceptable protocol version
2 0x02 Connection Refused: identifier rejected
3 0x03 Connection Refused: server unavailable
4 0x04 Connection Refused: bad user name or password
5 0x05 Connection Refused: not authorized
6-255   Reserved for future use

只有0-5目前被使用到,其餘值有待往後使用。通常返回值爲0x00,表示鏈接創建。非法的請求,須要返回相應的數值。

從上面看出,一個CONNACT,四個字節表示。一個正常的CONNACT消息實際內容可能以下: 0x20 0x02 0x00 0x00

如果在私有協議中,兩個字節就足夠了。

不少時候,客戶端和服務器端在沒有消息傳遞時,會一直保持着鏈接。雖然不能依靠TCP心跳機制(好比SO_KEEPALIVE選項),業務層面定義心跳機制,會讓鏈接狀態檢測、控制更爲直觀。

 

PINGREQ

由客戶端發送到服務器端,證實本身還在一直鏈接着呢。兩個字節,固定值。

  Description 7 6 5 4 3 2 1 0
Fixed header/固定頭部
byte 1   Message type (12) DUP flag QoS flags RETAIN
    1 1 0 0 x x x x
byte 2   Remaining Length (0)
    0 0 0 0 0 0 0 0

客戶端會在一個心跳週期內發送一條PINGREQ消息到服務器端。

心跳頻率在CONNECT可變頭部「Keep Alive timer」中定義時間,單位爲秒,無符號16位short表示。

PINGRESP

服務器收到PINGREQ請求以後,會當即響應一個兩個字節固定格式的PINGRESP消息。

  Description 7 6 5 4 3 2 1 0
Fixed header/固定頭部
byte 1   Message type (13) DUP flag QoS flags RETAIN
    1 1 0 1 x x x x
byte 2   Remaining Length (0)
    0 0 0 0 0 0 0 0

服務器通常若在1.5倍的心跳週期內接收不到客戶端發送的PINGREQ,可考慮關閉客戶端的鏈接描述符。此時的關閉鏈接的行爲和接收到客戶端發送DISCONNECT消息的處理行爲一致,但對客戶端的訂閱不會產生影響(不會清除客戶端訂閱數據),這個須要牢記。

若客戶端發送PINGREQ以後的一個心跳週期內接收不到PINGRESP消息,可考慮關閉TCP/IP套接字鏈接。

DISCONNECT

客戶端主動發送到服務器端,代表即將關閉TCP/IP鏈接。此時要求服務器要完整、乾淨的進行斷開處理,不能僅僅相似於關閉鏈接描述符相似草草處理之。 須要兩個字節,值固定:

  Description 7 6 5 4 3 2 1 0
Fixed header/固定頭部
byte 1   Message type (14) DUP flag QoS flags RETAIN
    1 1 1 0 x x x x
byte 2   Remaining Length (0)
    0 0 0 0 0 0 0 0

服務器要根據先前此客戶端在發送CONNECT消息可變頭部Connect flag中的「Clean session flag」所設置值,再次複習一下:

  1. 值爲0,服務器必須在客戶端斷開以後繼續存儲/保持客戶端的訂閱狀態。這些狀態包括:

    • 存儲訂閱的消息QoS1和QoS2消息
    • 正在發送消息期間鏈接丟失致使發送失敗的消息
    • 以便當客戶端從新鏈接時以上消息能夠被從新傳遞。
  2. 值爲1,服務器須要馬上清理鏈接狀態數據。

有一點須要牢記,服務器在接收到客戶端發送的DISCONNECT消息以後,須要主動關閉TCP/IP鏈接。

有關mqtt的api請參考:https://www.ibm.com/support/knowledgecenter/SSFKSJ_7.5.0/com.ibm.mq.javadoc.doc/WMQMQxrClasses/org/eclipse/paho/client/mqttv3/package-summary.html

 上代碼

1、首先開啓rabbitmq服務的mqtt插件

 rabbitmq-plugins enable rabbitmq_mqtt

 以後重啓一下rabbitmq-server就能夠了 systemctl restart/start rabbitmq-server.service

2、添加mqtt客戶端依賴

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.1.1</version>
</dependency>

3、建立消息發送端

 1 package com.sharp.forward.mqtt;
 2 
 3 import org.eclipse.paho.client.mqttv3.MqttClient;
 4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 5 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
 6 import org.eclipse.paho.client.mqttv3.MqttException;
 7 import org.eclipse.paho.client.mqttv3.MqttMessage;
 8 import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
 9 import org.eclipse.paho.client.mqttv3.MqttSecurityException;
10 import org.eclipse.paho.client.mqttv3.MqttTopic;
11 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
12 
13 public class MQTTServer {
14     
15     private final String host="tcp://192.168.135.129:1883";
16     private final String clientId = "server_001";
17     private final String userName = "guest";
18     private final String password = "guest";
19     private MqttClient mqttClient;
20     private MqttTopic topic;
21 //    private MqttMessage message;
22     
23     
24 //    經過構造函數初始化mqtt的鏈接
25     public MQTTServer() throws MqttException {
26 //        服務器的地址應該是URI,對於TCP鏈接使用「tcp://」方案,對於由SSL / TLS保護的TCP鏈接使用「ssl://」方案。
27         mqttClient = new MqttClient(host, clientId,new MemoryPersistence());
28         connect();
29     }
30     public void connect() throws MqttSecurityException, MqttException {
31 //        配置鏈接的選項,MqttConnectOptions包含控制客戶端鏈接到服務器的方式的選項。
32         MqttConnectOptions options = new MqttConnectOptions();
33 //         設置鏈接用戶名和密碼
34         options.setUserName(userName);
35         options.setPassword(password.toCharArray());
36 //        設置超時時間
37         options.setConnectionTimeout(30);
38 //        設置心跳時間間隔
39         options.setKeepAliveInterval(60);
40 //        設置服務器是否應該記住從新鏈接時客戶端的狀態
41         options.setCleanSession(true);
42         mqttClient.connect(options);
43 //        設置消息發送後的回調方法
44         mqttClient.setCallback(new MQTTCallback());
45 //        經過字符串獲取MqttTopic類型的主題
46         topic = mqttClient.getTopic("topic_001");
47     }
48 //    將消息發佈到服務器上的主題
49     public void publish(MqttTopic topic,MqttMessage message) throws MqttPersistenceException, MqttException {
50 //        將指定的消息發佈到此主題,但不等待消息的傳遞完成。
51         MqttDeliveryToken token = topic.publish(message);
52 //         阻止當前線程,直到與此令牌關聯的操做完成爲止。
53         token.waitForCompletion();
54     }
55     public static void main(String[] args) throws MqttException {
56         // TODO Auto-generated method stub
57         MQTTServer server = new MQTTServer();
58 //        for(int i=0;i<5;i++) {
59             MqttMessage message = new MqttMessage();
60             message.setQos(2);
61             message.setId(100);
62             message.setPayload("hello world!".getBytes());
63             server.publish(server.topic, message);
64 //        }
65         System.out.println("發送完畢!");
66     }
67 
68 }

4、消息接收端

 1 package com.sharp.forward.mqtt;
 2 
 3 import org.eclipse.paho.client.mqttv3.MqttClient;
 4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 5 import org.eclipse.paho.client.mqttv3.MqttException;
 6 import org.eclipse.paho.client.mqttv3.MqttSecurityException;
 7 import org.eclipse.paho.client.mqttv3.MqttTopic;
 8 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 9 
10 public class MQClient {
11     private final String uri = "tcp://192.168.135.129:1883";
12     private final String clientId = "client_001";
13     private final String userName = "guest";
14     private final String password = "guest";
15     private MqttClient mqttClient;
16     private MqttTopic topic;
17     
18     public MQClient() throws MqttException {
19         mqttClient = new MqttClient(uri, clientId,new MemoryPersistence());
20         connect();
21     }
22     public void connect() throws MqttSecurityException, MqttException {
23         MqttConnectOptions options = new MqttConnectOptions();
24         options.setUserName(userName);
25         options.setPassword(password.toCharArray());
26         options.setCleanSession(true);
27         options.setConnectionTimeout(30);
28         options.setKeepAliveInterval(60);
29         mqttClient.setCallback(new MQTTCallback());
30         topic = mqttClient.getTopic("topic_001");
31         mqttClient.connect(options);
32     }
33     public void subscribe(String[] topic,int[] qos) throws MqttException {
34         mqttClient.subscribe(topic, qos);
35     }
36     public static void main(String[] args) throws MqttException {
37         MQClient mqClient = new MQClient();
38         String[] topic = {"topic_001"};
39         int[] qos = {2};
40         mqClient.subscribe(topic, qos);
41     }
42 
43 }

5、回調函數

 1 package com.sharp.forward.mqtt;
 2 
 3 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 4 import org.eclipse.paho.client.mqttv3.MqttCallback;
 5 import org.eclipse.paho.client.mqttv3.MqttException;
 6 import org.eclipse.paho.client.mqttv3.MqttMessage;
 7 
 8 /**
 9  * @author zuixiaoyao
10  * 該回調須要實現MqttCallback的接口
11  */
12 public class MQTTCallback implements MqttCallback{
13 //    該方法將在與服務器的鏈接斷開時調用
14     @Override
15     public void connectionLost(Throwable cause) {
16         // TODO Auto-generated method stub
17         System.out.println("進入connectionLost方法,能夠在此從新鏈接");
18     }
19 // 到消息到達服務器是調用此方法
20     @Override
21     public void messageArrived(String topic, MqttMessage message) throws Exception {
22         // TODO Auto-generated method stub
23         System.out.println("進入messageArrived方法---->:"+"\n主題:"+topic+"\n服務保障:"+message.getQos()+
24                 "\n消息id:"+message.getId()+"\n消息體:"+new String(message.getPayload()));
25     }
26 //    該方法在消息發送完成時調用,而且已經收到全部確認時調用此方法
27 //    接收到已經發布的 QoS 1 或 QoS 2 消息的傳遞令牌時調用。
28     @Override
29     public void deliveryComplete(IMqttDeliveryToken token) {
30         // TODO Auto-generated method stub
31         try {
32             System.out.println("進入deliveryComplete方法,消息發送是否完成:"+token.isComplete()+"/n消息id:"+token.getMessageId()+
33                     "/n消息服務:"+token.getMessage().getQos()+"\n消息內容:"+new String(token.getMessage().getPayload()));
34         } catch (MqttException e) {
35             // TODO Auto-generated catch block
36             e.printStackTrace();
37         }
38     }
39 
40 }

6、運行

 

 

 

說明:關於mqtt結構的部分參考摘自https://www.cnblogs.com/leeying/p/3791077.html方便閱讀

相關文章
相關標籤/搜索