前段時間弄IoT相關的東西,系統學習了一下 MQTT 協議,在此分享出來。html
本文先是對 MQTT 協議作了簡單的介紹;接着是對 MQTT協議的內容作了較爲全面的解讀;最後使用 Python 語言去實現一個簡單的 MQTT 客戶端和服務器。python
MQTT 全稱是 Message Queue Telemetry Transport,翻譯成中文意思是「遙測傳輸協議」。它最早是由IBM提出,是一種基於 TCP 協議,具備簡單、輕量等優勢,特別適合於受限環境(帶寬低、網絡延遲高、網絡通訊不穩定)的消息分發。MQTT 協議有 3.x, 5.x 等多個版本,目前最經常使用的版本是 v3.1.1 ,本文也是對此版本的協議進行的解讀。MQTT 協議已歸入ISO標準 (ISO/IEC PRF 20922),現今主流的 IoT 平臺都支持該協議。git
MQTT 是一種發佈-訂閱協議,這意味着:shell
咱們能夠在本身的電腦上運行一個 MQTT 的服務端,和多個 MQTT 的客戶端來體驗這一過程。編程
MQTT 服務端有不少能夠選擇。這裏咱們使用 Mosquitto,按照其官方文檔的說明安裝便可,這裏很少作介紹。bash
Mac 用戶能夠用如下命令安裝並啓動 Mosquitto:服務器
brew install mosquitto brew services start mosquitto
Mosquitto 提供了命令行工具 mosquitto_sub 和 mosquitto_pub ,它們可用來向服務端訂閱主題 和發佈消息。網絡
在一個命令行窗口中,執行如下命令去訂閱名爲 「foo」 的主題:session
mosquitto_sub -h 127.0.0.1 -p 1883 -t foo -q 2
在另外一個命令行窗口中,執行如下命令發佈消息 「Hello, MQTT」 到 「foo」 主題:
mosquitto_pub -h 127.0.0.1 -p 1883 -t foo -q 2 -m 'Hello, MQTT'
最終咱們將看到,在第一個命令行窗口中,打印出了消息 「Hello, MQTT」。這意味着,第一個客戶端在主題 「foo」 上,收到了第二個客戶端發佈的消息。
從總體上看,數據包分爲3個部分:一個是固定頭部,它是必定存在的;另外一個是可變頭部,它不必定存在;剩下一個是載荷,它也不必定存在。數據採用大端方式存儲。
+----------------------------+ | | | 固 定 頭 部 (必 需 ) | | | +----------------------------+ | | | 可 變 頭 部 (非 必 需) | | | +----------------------------+ | | | 載 荷 (非 必 需 ) | | | +----------------------------+
固定頭部格式以下:
+---------------------------------------------------------+ | bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | Packet type | Flags | +---------------------------------------------------------+ | byte2...| Remaining Length | +---------------------------------------------------------+
Name | Value | Direction of flow | Description |
---|---|---|---|
Reserved | 0 | Forbidden | Reserved |
CONNECT | 1 | Client to Server | Client request to connect to Server |
CONNACK | 2 | Server to Client | Connect acknowledgment |
PUBLISH | 3 | Client to Server or Server to Client | Publish message |
PUBACK | 4 | Client to Server or Server to Client | Publish acknowledgment |
PUBREC | 5 | Client to Server or Server to Client | Publish received (assured delivery part 1) |
PUBREL | 6 | Client to Server or Server to Client | Publish release (assured delivery part 2) |
PUBCOMP | 7 | Client to Server or Server to Client | Publish complete (assured delivery part 3) |
SUBSCRIBE | 8 | Client to Server | Client subscribe request |
SUBACK | 9 | Server to Client | Subscribe acknowledgment |
UNSUBSCRIBE | 10 | Client to Server | Unsubscribe request |
UNSUBACK | 11 | Server to Client | Unsubscribe acknowledgment |
PINGREQ | 12 | Client to Server | PING request |
PINGRESP | 13 | Server to Client | PING response |
DISCONNECT | 14 | Client to Server | Client is disconnecting |
Reserved | 15 | Forbidden | Reserved |
不一樣包類型標記位含義不盡相同,具體狀況以下表所示:
Control Packet | Fixed header flags | Bit 3 | Bit 2 | Bit 1 | Bit 0 |
---|---|---|---|---|---|
CONNECT | Reserved | 0 | 0 | 0 | 0 |
CONNACK | Reserved | 0 | 0 | 0 | 0 |
PUBLISH | Used in MQTT 3.1.1 | DUP1 | QoS2 | QoS2 | RETAIN3 |
PUBACK | Reserved | 0 | 0 | 0 | 0 |
PUBREC | Reserved | 0 | 0 | 0 | 0 |
PUBREL | Reserved | 0 | 0 | 1 | 0 |
PUBCOMP | Reserved | 0 | 0 | 0 | 0 |
SUBSCRIBE | Reserved | 0 | 0 | 1 | 0 |
SUBACK | Reserved | 0 | 0 | 0 | 0 |
UNSUBSCRIBE | Reserved | 0 | 0 | 1 | 0 |
UNSUBACK | Reserved | 0 | 0 | 0 | 0 |
PINGREQ | Reserved | 0 | 0 | 0 | 0 |
PINGRESP | Reserved | 0 | 0 | 0 | 0 |
DISCONNECT | Reserved | 0 | 0 | 0 | 0 |
Remaining Length 表示的是本數據包剩餘部分的字節數,即可變頭部和載荷的字節數之和。爲了節省傳輸時的字節數,Remaining Length 採用的是一種變長編碼方式。這就是說 Remaining Length 字段的字節數不是固定的,它可能使用1~4個字節。既然 Remaining Length 的字節數是可變的,那麼問題來了,咱們在解碼包數據的時候,怎麼知道 Remaining Length 到底是使用幾個字節編碼的呢?解決這個問題的辦法是,將每一個字節的最高位(MSB)做爲標誌位。若該位的值是1,則意味着下一個字節屬於參與 Remaining Length 編碼的字節;若該位的值是0,則意味着本字節已是最後一個參與 Remaining Length 編碼的字節了。
舉幾個🌰, 當開始解碼 Remaining Length 時,
- 若當前讀到的字節是: 0x50(0101 0000),則說明 Remaining Length 字段只用1個字節編碼;
- 若連續讀到的字節是:0x80, 0x80, 0x01, 0x4B,則說明 Remaining Length 字段佔3個字節。
交代清楚 Remaining Length 的長度編碼規則後,再說一下它的實際值是怎麼計算出來。
假設 \(B_0\), \(B1\), ... \(B_n\) 依次是編碼 Remaining Length 的 \(n\) 個字節;函數 \(V(B)\) 表示的是字節 \(B\) 除去最高位後(低7位)轉化成十進制的值。那麼:
RemainingLength = \(\sum\limits_{i=0}^{3} V(B_i) * 128 ^ i (0\le n \le 3, n \in Z)\)
解碼 Remaining Length 的方法可有以下僞代碼描述:
multiplier = 1 value = 0 do encodedByte = 'next byte from stream' value += (encodedByte & 127) * multiplier if (multiplier > 128*128*128) throw Error(Malformed Remaining Length) multiplier *= 128 while ((encodedByte & 128) != 0)
再舉幾個🌰,當開始解碼 Remaining Length 時,
- 若咱們讀到的第一個字節爲0x40(0100 0000),它的最高位是0,這說明編碼 Remaining Length 的字節到此就結束了,最終 Remaining Length 的值爲 64。
- 若編碼 Remaining Length 的 2 個字節 分別是 0x41(1100 0001), 0x82 (0000 0010), ,則解碼出的 Remaining Length 的值爲 321(\(65 * 128^0 + 2 * 128 ^ 1\))
編碼 Remaining Length (設爲X) 的方法實際上是解碼方法的逆過程,這裏就很少作解釋,直接給出僞代碼:
do encodedByte = X MOD 128 X = X DIV 128 if ( X > 0 ) encodedByte = encodedByte OR 128 endif 'output' encodedByte while ( X > 0 )
Remaining Length 的數值範圍與對應的字節數可由下表查出:
Digits | From | To |
---|---|---|
1 | 0 (0x00) | 127 (0x7F) |
2 | 128 (0x80, 0x01) | 16 383 (0xFF, 0x7F) |
3 | 16 384 (0x80, 0x80, 0x01) | 2 097 151 (0xFF, 0xFF, 0x7F) |
4 | 2 097 152 (0x80, 0x80, 0x80, 0x01) | 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F) |
從以上能夠看出,變長編碼縮小了給定字節數表示的數值的範圍,例如,若不採用變長編碼,4字節最大表示的數值是 4 294 967 296,而使用變長編碼,4字節最大表示的數值是 268 435 455。這是能夠接受的。268 435 455 字節約爲 256 兆字節,能知足絕大多數數據傳輸場景。假若真的須要傳輸超過256M的數據,能夠將數據拆分爲多個包傳輸。雖然這在必定程度上增長了編程的複雜性,但優勢是,當咱們須要傳輸的數據不多時,Remaining Length 使用的字節數更少;而且拆分爲多個包傳輸可能增長容錯性,當某個包傳輸失敗時,只須要重傳這個包便可,而沒必要整個包都從新傳輸。
可變頭部正如它的名字同樣,是不定的,不一樣的包類型具備不一樣的可變頭部。但許多包都有包 ID(Packet Identifier)字段。下表是包 ID 字段在各種型包中的存在狀況:
Control Packet | Packet Identifier field |
---|---|
CONNECT | NO |
CONNACK | NO |
PUBLISH | YES (If QoS > 0) |
PUBACK | YES |
PUBREC | YES |
PUBREL | YES |
PUBCOMP | YES |
SUBSCRIBE | YES |
SUBACK | YES |
UNSUBSCRIBE | YES |
UNSUBACK | YES |
PINGREQ | NO |
PINGRESP | NO |
DISCONNECT | NO |
包 ID 佔 2 個字節,以下圖所示:
+--------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +--------------------------------------------+ | byte 1 | Packet Identifier MSB | +--------------------------------------------+ | byte 2 | Packet Identifier LSB | +--------------------------------------------+
須要說明的是,服務端發送到客戶端的包 和 客戶端發送到服務端的包中 攜帶的 包ID 彼此之間是相互獨立,絕不相干的。這就是說,即便 服務端發送到客戶端的包 和 客戶端發送到服務端的包 中的 包ID 相同,也不要緊。
可變頭部的詳細信息見各數據包的詳細說明。
載荷是數據包的第三部分,也是最後一部分。它也是有的包攜帶,有的包不攜帶,這和包的類型有關。下面是各種型的包是否攜帶載荷的明細表:
Control Packet | Payload |
---|---|
CONNECT | Required |
CONNACK | None |
PUBLISH | Optional |
PUBACK | None |
PUBREC | None |
PUBREL | None |
PUBCOMP | None |
SUBSCRIBE | Required |
SUBACK | Required |
UNSUBSCRIBE | Required |
UNSUBACK | None |
PINGREQ | None |
PINGRESP | None |
DISCONNECT | None |
下面對 14 種數據包類型作詳細的介紹。
客戶端 -> 服務端
此包是客戶端與服務端創建鏈接後,發送的第一個包,且第一個包必須是此包。在一個鏈接中,該包只能發送一次。若發送了屢次,當服務器第二次收到該包時,應該做爲違法處理,當即斷開鏈接。
包類型爲1;表計字段保留,值爲0。結構以下圖所示:
+---------------------------------------------------------+ | bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | 0 | 0 | 0 | 1 | 0 | 0 | 0 | 0 | +---------------------------------------------------------+ | byte2...| Remaining Length | +---------------------------------------------------------+
可變頭部結構以下:
+--------+----------------+---+---+---+---+---+---+---+---+ | Bit | Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte 1 | Length MSB (0)| 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | +---------------------------------------------------------+ | byte 2 | Length LSB (4)| 0 | 0 | 0 | 0 | 0 | 1 | 0 | 0 | +---------------------------------------------------------+ | byte 3 | 'M' | 0 | 1 | 0 | 0 | 1 | 1 | 0 | 1 | +---------------------------------------------------------+ | byte 4 | 'Q' | 0 | 1 | 0 | 1 | 0 | 0 | 0 | 1 | +---------------------------------------------------------+ | byte 5 | 'T' | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 0 | +---------------------------------------------------------+ | byte 6 | 'T' | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 0 | +---------------------------------------------------------+ | byte 7 | Level(4) | 0 | 0 | 0 | 0 | 0 | 1 | 0 | 0 | +---------------------------------------------------------+ | byte 8 | x | x | x | x | x | x | x | x | 0 | +--------+----------------+---+---+---+---+---+---+---+---+ | byte 9 | Keep Alive MSB | +---------------------------------------------------------+ | byte 10| Keep Alive LSB | +--------+----------------+---+---+---+---+---+---+---+---+
其中,
如前面介紹,前 2 個字節是包 ID。
3 ~ 6 字節是協議名稱,字符使用 UTF-8 編碼;
第 7 個字節是協議等級(協議版本。3.1.1 版本對應的協議等級是 4 );
第8個字節包含一些鏈接標記位。以下圖所示:
Bit | Description |
---|---|
7 | User Name Flag |
6 | Password Flag |
5 | Will Retain |
4 ~ 3 | Will QoS |
2 | Will Flag |
1 | Clean Session |
0 | Reserved. 目前值爲 0 |
注意:
若服務端不支持客戶端協議版本,須要響應一個 CONNACK 包,指定 code 爲 0x01,而後斷開鏈接;
服務端需校驗 Reserved 位。若值不爲 0,需斷開鏈接。
Clean Session:用來指定對 Session 的處理方式。若值爲0,在服務端和客戶端斷開鏈接後,它們都要保存 Session 信息,以便再次連上時恢復以前 Session 中的信息。除此以外,服務端還須要在斷開鏈接後保存 QoS 1 和 QoS 2 消息和客戶端的訂閱內容;若值爲1,當客戶端和服務端鏈接上時,必須丟棄以前的 Session 狀態信息再建立一個新的 Session 和訂閱內容。
Will Flag,Will QoS,Will Retain:這三個字段是用來預立「遺囑」的。預立「遺囑」的意思是:客戶端在鏈接服務端時,可將預先定義好的主題和對應消息發送給服務端。當它和服務端鏈接斷開時,服務端將及時地發佈這段消息到預約的主題。
其中,「鏈接斷開」的情形包括但不限於:
若Will Flag 被設置爲 0 ,這表示不預立「遺囑」,此時 Will QoS 和 Will Retain 字段也必須被設置爲 0,而且載荷(payload)中不能存在 Will Topic 和 Will Message;Will Flag 被設置爲 1 的狀況與此相反。
服務端一旦發佈了「遺囑」信息或收到了 DISCONNECT 包,「遺囑」信息應當當即被服務端從保存的 Session 狀態信息中移除。
Will QoS 表示的是遺囑消息的服務質量(參見如下對消息服務質量的解釋),取值0,1,2,佔兩個字節。
Will Retain 表示當「遺囑」消息被髮布後,它是否還被保留。
User Name Flag 和 Password Flag 分別表示載荷(payload)中是否存在用戶名和密碼。
第 9 ~ 10 個字節是 Keep Alive 時間,單位是秒,取值範圍是 0 ~ 65535。一般,客戶端須要每隔小於 Keep Alive 的時間發送一次 PINGREQ 消息,服務端會響應 PINGRESP 包示意網絡正常和本身都工做正常。若服務端在 1.5 倍 Keep Alive 時間內沒有接收到客戶端的任何消息,服務端必須斷開鏈接;若客戶端在發送 PINGREQ 消息後,在一段時間(本身定義)內沒有收到來自服務端的 PINGRESP 消息,客戶端也應該斷開鏈接。
Keep Alive 的值是客戶端指定的,一般會設置爲幾分鐘,最大是 18 小時 12 分鐘 15 秒。特別地,若值爲0,表示不啓用 Keep Alive 機制。
CONNECT 包的載荷必定包含 Client Identifier 字段,可能包含 Will Topic, Will Message, User Name, Password 字段(由可變頭部中的各標記位決定)。這些字段若存在,必定要按照以上順序排列。
Client Identifier:由客戶端本身指定的 ID,服務端據此來標識客戶端(以此關聯Session)。所以不一樣客戶端之間的 ID 不能重複(重複將視爲同一客戶端)。它使用 UTF-8編碼,長度一般在1 ~ 23個字節之間,一般包含 [0-9a-zA-Z] 中的字符(容許例外,由服務端的實現決定)。若客戶端 ID 不存在,服務端須要爲其指定一個獨一無二的 ID。在這種狀況下,客戶端必須設置 CleanSession 爲 1;若不設爲 1,服務端須要 響應 CONNACK 包,其中 返回 code 0x02(Identifier rejected), 隨後斷開鏈接。
Will Topic,Will Message:當 Will Flag 值爲 1 時存在,均採用 UTF-8 編碼。
User Name:採用 UTF-8 編碼,用來作身份認證。
Password:長度不固定,頭兩個字節用來指明密碼的字節數,以後是密碼的字節,結構以下:
+-----------+---+---+---+---+---+---+---+---+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------+---+---+---+---+---+---+---+ | byte 1 | Data length MSB | +-------------------------------------------+ | byte 2 | Data length LSB | +-------------------------------------------+ | byte 3...| Data, if length > 0. | +-------------------------------------------+
一些狀況的處理方式:
- 在一個客戶端在線的狀況下,同一客戶端(相同 Client ID)再次鏈接服務端,服務端需斷開以前的鏈接;
- 客戶端在發送 CONNECT 包後,能夠再當即發送其餘的包,而無需等待 CONNACK 包的響應。但服務端收到 CONNECT 包後,若拒絕鏈接,必定不能處理客戶端在 CONNECT 包後發送的包。
關於 UTF-8 編碼:
在 MQTT 協議中,字符串均採用的 UTF-8 編碼。字節結構以下:
+------------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +------------------------------------------------------------+ | byte1 | String length MSB | +------------------------------------------------------------+ | byte2 | String length LSB | +------------------------------------------------------------+ | byte3...| UTF-8 Encoded Character Data, if length > 0 | +------------------------------------------------------------+其中,前兩個字節用來指定字符串的字節數,後面則是字符串各個字節(UTF-8編碼)。
服務端 -> 客戶端
CONNACK 包是 CONNECT 包的響應,從服務端發送到客戶端的第一個包必須是此包。
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | 0 | 0 | 1 | 0 | 0 | 0 | 0 | 0 | +---------------------------------------------------------+ | byte2 | 0 | 0 | 1 | 0 | 0 | 0 | 1 | 0 | +---------------------------------------------------------+
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | X | +---------------------------------------------------------+ | byte2 | X | X | X | X | X | X | X | X | +---------------------------------------------------------+
第 1 個字節用來做鏈接確認標記,其中第 1 ~ 7 位被保留,值均爲 0。第 0 位(SP字段) 是 Session 是否存在標記(SP, Session Present Flag)。若 CleanSession 爲(參見 CONNECT Packet 中相關說明)1,SP 字段必須爲 0,且鏈接返回碼也必須爲 0;若 CleanSession 爲 0,SP 字段的值取決於服務端是否爲此客戶端存儲了 Session 狀態信息:存儲了,SP 取值爲 1,不然取值爲0。一樣,鏈接返回碼也取0。
第 2 個字節是鏈接返回碼(Connect Return code),只有當鏈接返回碼值爲 0 時,才表示服務端接受鏈接。服務端若返回了除 0 以外的其餘值,緊接着必須斷開鏈接。下表是各返回碼錶明的含義:
Value | Description |
---|---|
0 | Connection accepted |
1 | 拒絕鏈接:協議版本不支持 |
2 | 拒絕鏈接:客戶端 ID 被拒絕 |
3 | 拒絕鏈接:服務不可用 |
4 | 拒絕鏈接:用戶名或密碼錯誤 |
5 | 拒絕鏈接:沒有認證 |
6-255 | Reserved for future use |
無
客戶端 <-> 服務端
此包是用來發送消息。
+------------------------------------------------------------+ | bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +------------------------------------------------------------+ | byte1 | 0 | 0 | 1 | 1 | DUP | QoS | RETAIN | +------------------------------------------------------------+ | byte2 | Remaining Length | +------------------------------------------------------------+
正如前面介紹過的,第 1 個字節前 高 4 位是包類型,低 4 位是標記位。其中,
第 3 位是 DUP 標記,它用來指示該包是不是重複的投遞。這就是說,若是該位的取值是0,這意味着這個包是第 1 次發送的;不然代表此包不是第 1 次發送,是對以前已經發送的包的再次重發。
若 QoS 值爲 0,DUP 位必須爲 0
第 1 ~ 2 位是 QoS 字段,取值及其含義以下表所示:
QoS value | Bit 2 | bit 1 | Description |
---|---|---|---|
0 | 0 | 0 | 包至多被傳送一次 |
1 | 0 | 1 | 包至少被傳送一次 |
2 | 1 | 0 | 包被傳送,且僅被傳送一次 |
- | 1 | 1 | 保留,不能被使用(發現使用,必須斷開鏈接) |
可變頭部包含兩個字段:Topic Name 和 Packet Identifier(僅當 QoS 級別爲 1 或 2 時存在)。Topic Name 採用 UTF-8 編碼,其中必定不能包含通配符;Packet Identifier 以前做過說明。
載荷是由用戶(客戶端或服務端)在使用中本身指定的,它的長度可由固定頭部中的 Remaining Length 減去 可變頭部的長度算出。
補充說明:
PUBLISH 包指望的響應包見下表:
QoS 等級 指望的響應包 QoS 0 無 QoS 1 PUBACK 包 QoS 2 PUBREC 包
客戶端 <-> 服務端
PUBACK 包是對 QoS 爲 1 的 PUBLISH 包的確認響應。
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | 0 | 1 | 0 | 0 | 0 | 0 | 0 | 0 | +---------------------------------------------------------+ | byte2 | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 0 | +---------------------------------------------------------+
可變頭部中的內容只有 包 ID,佔 2 個字節,這表示此響應是對包 ID 爲該值的包的確認。
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | Packet Identifier MSB | +---------------------------------------------------------+ | byte2 | Packet Identifier LSB | +---------------------------------------------------------+
無
客戶端 <-> 服務端
PUBREC (Publish Received)包是對 QoS 爲 2 的 PUBLISH 包的確認響應。它是 QoS 2 協議的第 2 個包(第 1 個是 PUBLISH 包)。
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | 0 | 1 | 0 | 1 | 0 | 0 | 0 | 0 | +---------------------------------------------------------+ | byte2 | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 0 | +---------------------------------------------------------+
可變頭部中的內容只有 包 ID,佔 2 個字節,這表示此響應是對 包 ID 爲該值的包的確認。
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | Packet Identifier MSB | +---------------------------------------------------------+ | byte2 | Packet Identifier LSB | +---------------------------------------------------------+
無
客戶端 <-> 服務端
PUBREL (Publish Release) 包是對 PUBREC 包的響應。它是 QoS 2 協議的第 3 個包。
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | 0 | 1 | 1 | 0 | 0 | 0 | 1 | 0 | +---------------------------------------------------------+ | byte2 | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 0 | +---------------------------------------------------------+
注意:
第 1 個字節的 0 ~ 3 位是被保留的,但各個位上必須是上圖中的取值。若發現其餘值,必須斷開鏈接。
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | Packet Identifier MSB | +---------------------------------------------------------+ | byte2 | Packet Identifier LSB | +---------------------------------------------------------+
無
客戶端 <-> 服務端
PUBCOMP (Publish Complete) 包是對 PUBREL 包的響應。它是 QoS 2 協議的第 4 個包,也是最後一個。
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | 0 | 1 | 1 | 1 | 0 | 0 | 0 | 0 | +---------------------------------------------------------+ | byte2 | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 0 | +---------------------------------------------------------+
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | Packet Identifier MSB | +---------------------------------------------------------+ | byte2 | Packet Identifier LSB | +---------------------------------------------------------+
無
客戶端 -> 服務端
SUBSCRIBE 包是客戶端用來訂閱主題(Topic)的,一次可定義一個或多個主題。服務端會將發送到該主題的消息轉發到各個訂閱了該主題的客戶端。除此以外,SUBSCRIBE 包中還聲明瞭本身能夠接受的從服務端發來的消息的 QoS 等級的最大值(maximum QoS)。
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | 1 | 0 | 0 | 0 | 0 | 0 | 1 | 0 | +---------------------------------------------------------+ | byte2 | Remaining Length | +---------------------------------------------------------+
一樣,第 1 個字節的 0 ~ 3 位是保留位,取值也必須是 0 0 1 0,否則必須斷開鏈接。
可變頭中僅包含包 ID。
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | Packet Identifier MSB | +---------------------------------------------------------+ | byte2 | Packet Identifier LSB | +---------------------------------------------------------+
上面提到過,一次是能夠訂閱多個主題的,所以載荷在邏輯上是一個列表(List)的結構。列表中的每一個元素描述一個訂閱的主題。每一個元素包含 3 部分:主題字節數、主題、 QoS 最大值,結構以下:
+-------------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +-------------------------------------------------------------+ | byte1 | Length MSB | +-------------------------------------------------------------+ | byte2 | Length LSB | +-------------------------------------------------------------+ | byte 3..N+2 | Topic (UTF-8 encoded) | +-------------------------------------------------------------+ | byte N+3 | 0 | 0 | 0 | 0 | 0 | 0 | X | X | +-------------------------------------------------------------+
第 1 ~2 字節是指明 Topic 按照 UTF-8 編碼後有多少字節;假設有 N 個,那麼接下來的 N 個字節,即第 3 ~ N+2 個字節是 Topic 編碼後的字節;第 N+3 個字節是指明客戶端接受的,來自此主題的消息的 QoS 最大值,其中 2 ~ 7 位保留,0 ~ 1 位是接受的 QoS 最大值。
舉個完整的載荷的🌰:
假設客戶端要訂閱 a/b 和 c/d 兩個主題,並指定接受的它們的最大 QoS 值分別是 1 和 2,完整結構以下表:
Bit Description 7 6 5 4 3 2 1 0 Topic Filter byte 1 Length MSB (0) 0 0 0 0 0 0 0 0 byte 2 Length LSB (3) 0 0 0 0 0 0 1 1 byte 3 ‘a’ (0x61) 0 1 1 0 0 0 0 1 byte 4 ‘/’ (0x2F) 0 0 1 0 1 1 1 1 byte 5 ‘b’ (0x62) 0 1 1 0 0 0 1 0 Requested QoS byte 6 Requested QoS(1) 0 0 0 0 0 0 0 1 Topic Filter byte 7 Length MSB (0) 0 0 0 0 0 0 0 0 byte 8 Length LSB (3) 0 0 0 0 0 0 1 1 byte 9 ‘c’ (0x63) 0 1 1 0 0 0 1 1 byte 10 ‘/’ (0x2F) 0 0 1 0 1 1 1 1 byte 11 ‘d’ (0x64) 0 1 1 0 0 1 0 0 Requested QoS byte 12 Requested QoS(2) 0 0 0 0 0 0 1 0
服務端 -> 客戶端
SUBACK 包是服務端對客戶端發送的 SUBSCRIBE 包的響應。服務端在收到 SUBSCRIBE 包後,必須響應此包,且可變頭中指定的包 ID 必須與對應的 SUBSCRIBE 包中的包 ID 相同。
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | 1 | 0 | 0 | 1 | 0 | 0 | 0 | 0 | +---------------------------------------------------------+ | byte2 | Remaining Length | +---------------------------------------------------------+
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | Packet Identifier MSB | +---------------------------------------------------------+ | byte2 | Packet Identifier LSB | +---------------------------------------------------------+
同 SUBSCRIBE 包同樣,SUBACK 包的載荷也是一個列表,其中的每一個元素是 SUBSCRIBE 包中各個訂閱的結果碼(return code),它在順序上必須和 SUBSCRIBE 包中的主題順序一致。
每一個結果碼佔一個字節,結構以下:
Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte 1 | X | 0 | 0 | 0 | 0 | 0 | X | X |
其中,只有第 7 個 字節和第 0,1 字節被使用,其他的位保留,值爲0。結果碼共 4 種狀況,其含義以下表所示:
結果碼 | 含義 | 說明 |
---|---|---|
0x00 | 成功 | 對QoS最大值爲0的包 |
0x01 | 成功 | 對QoS最大值爲1的包 |
0x02 | 成功 | 對QoS最大值爲2的包 |
0x80 | 失敗 |
舉個完整的載荷的🌰:
描述 7 6 5 4 3 2 1 0 byte 1 成功 - QoS 最大值爲 0 0 0 0 0 0 0 0 0 byte 2 成功 - QoS 最大值爲 2 0 0 0 0 0 0 1 0 byte 3 失敗 1 0 0 0 0 0 0 0
客戶端 -> 服務端
UNSUBSCRIBE 包是客戶端用來取消訂閱的。
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | 1 | 0 | 1 | 0 | 0 | 0 | 1 | 0 | +---------------------------------------------------------+ | byte2 | Remaining Length | +---------------------------------------------------------+
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | Packet Identifier MSB | +---------------------------------------------------------+ | byte2 | Packet Identifier LSB | +---------------------------------------------------------+
同 SUBACK 包同樣,只是沒有 maximum QoS 字段。以下是一個例子:
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
Topic Filter | |||||||||
byte 1 | Length MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 2 | Length LSB (3) | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 1 |
byte 3 | ‘a’ (0x61) | 0 | 1 | 1 | 0 | 0 | 0 | 0 | 1 |
byte 4 | ‘/’ (0x2F) | 0 | 0 | 1 | 0 | 1 | 1 | 1 | 1 |
byte 5 | ‘b’ (0x62) | 0 | 1 | 1 | 0 | 0 | 0 | 1 | 0 |
Topic Filter | |||||||||
byte 6 | Length MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 7 | Length LSB (3) | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 1 |
byte 8 | ‘c’ (0x63) | 0 | 1 | 1 | 0 | 0 | 0 | 1 | 1 |
byte 9 | ‘/’ (0x2F) | 0 | 0 | 1 | 0 | 1 | 1 | 1 | 1 |
byte 10 | ‘d’ (0x64) | 0 | 1 | 1 | 0 | 0 | 1 | 0 | 0 |
該例子是取消對 a/b
主題和 c/d
主題的訂閱。
對於客戶端的取消訂閱消息,服務端對取消訂閱的主題的處理邏輯是:和以前客戶端訂閱的主題一個字符一個字符的比較,當且僅當它們徹底同樣時,才進行取消。
服務端 -> 客戶端
UNSUBACK 包是 SUBACK 包的響應
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | 1 | 0 | 1 | 1 | 0 | 0 | 0 | 0 | +---------------------------------------------------------+ | byte2 | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 0 | +---------------------------------------------------------+
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | Packet Identifier MSB | +---------------------------------------------------------+ | byte2 | Packet Identifier LSB | +---------------------------------------------------------+
無
客戶端 -> 服務端
PINGREQ 包的做用以下:
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | 1 | 1 | 0 | 0 | 0 | 0 | 0 | 0 | +---------------------------------------------------------+ | byte2 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | +---------------------------------------------------------+
無
無
服務端 -> 客戶端
PINGRESP 是服務端對 PINGREQ 包的響應,它代表服務端還活着
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | 1 | 1 | 0 | 1 | 0 | 0 | 0 | 0 | +---------------------------------------------------------+ | byte2 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | +---------------------------------------------------------+
無
無
客戶端 -> 服務端
DISCONNECT 包是客戶端發送給服務端的最後一個包,用來告訴服務端本身即將斷開鏈接。
+---------------------------------------------------------+ | Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +---------------------------------------------------------+ | byte1 | 1 | 1 | 1 | 0 | 0 | 0 | 0 | 0 | +---------------------------------------------------------+ | byte2 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | +---------------------------------------------------------+
無
無
QoS 是 Quality of Service 的縮寫,即「服務質量」,共有 3 種不一樣的消息投送服務質量。
表示盡最大能力投送消息,不保證消息必定被接收。在此等級的服務質量下,對於某個數據包,發送者僅僅發送一次到 broker,無論它有沒有被接收者接收;所以對接收者而言,相同的包要麼會被接收一次,要麼一次都不能被接收。
以下是消息發送的時序圖:
+------------+ +------------+ | Sender | | Receiver | +-----+------+ +------+-----+ | | | | | | | PUBLISH(QoS 0, DUP=0) | |--------------------------->| | | | | | +++ | | | | | | Deliver message | | | | +++ | |
該服務質量在上一服務質量上作了提高:
考慮一種場景:
發送者發送的消息成功投遞給了接收者,但接收者響應的確認包,在網絡傳輸中丟失。發送者等待確認包一段時間無果,變會從新發送該消息,此時接收者會再次收到此消息!這就是說,在 QoS 1 服務等級下,接收者可能屢次收到同一消息。接收者須要處理好這種狀況。
以下是消息發送的時序圖:
+------------+ +------------+ | Sender | | Receiver | +-----+------+ +------+-----+ | | +++ | | +-+ | | | | Store message | | +<+ | +++ | | | | PUBLISH(QoS 1, DUP=0) | |---------------------------->| | | | +++ | | | | | | Deliver message | | | | +++ | PUBACK | |<----------------------------| | | +++ | | +-+ | | | |Discard message | | +<+ | +++ | | |
須要注意的是:
當一個包未真正完成發送時,它所使用的包 ID 不能再次被使用;只有在完成後,才能被再次使用。
該方式是質量最高(也最繁瑣)的服務。當數據包投送完成時,它保證一個數據包必定被接收且僅被接收一次。
該服務質量的實現策略有 2 種,但最終達到的效果是一致的。下面分別給出消息在兩種策略下的傳送的時序圖:
方式 A:
+------------+ +------------+ | Sender | | Receiver | +-----+------+ +------+-----+ | | +++ | | +-+ | | | |Store message | | +<+ | +++ | | | | PUBLISH(QoS 2, DUP=0) | |--------------------------------->| | | | +++ | | +-+ | | | | Store message | | +<+ | +++ | | | PUBREC | |<---------------------------------| | | +++ | | +-+ | | | | Discard message, and | | | | store PUBREC packet id | | +<+ | +++ | | | | PUBREL | |--------------------------------->| | | | | | +++ | | | | | | Deliver message | | | | +++ | | | | | +++ | | +-+ | | | | Discard message | | +<+ | +++ | | | PUBCOMP | |<---------------------------------| | | +++ | | +-+ | | | |Discard stored state | | +<+ | +++ | | |
方式B:
+------------+ +------------+ | Sender | | Receiver | +-----+------+ +------+-----+ | | +++ | | +-+ | | | |Store message | | +<+ | +++ | | | | PUBLISH(QoS 2, DUP=0) | +------------------------------->+ | | | +++ | | +-+ | | | | Store package id | | +<+ | +++ | | | | | +++ | | | Deliver Message | +++ | | | PUBREC | +<-------------------------------+ | | | | +++ | | +-+ | | | | Discard message, and | | | | store PUBREC packet id | | +<+ | +++ | | | | PUBREL | +------------------------------->+ | | | | | +++ | | +-+ | | | | Discard package id | | +<+ | +++ | | | PUBCOMP | +<-------------------------------+ | | +++ | | +-+ | | | |Discard stored state | | +<+ | +++ | | |
下面是對這兩種策略中的行爲做出的解釋:
MQTT 的主題是分層級的。層級之間使用左劃線(/)進行分隔,如:sensors/computer-1/temperature/cpu
。
發佈者在發佈某個消息時,必須明確指定消息的主題,而且主題中不能包含通配符;但訂閱者在訂閱時,既能夠指定明確的主題,也能夠指定含有通配符的主題,如:sensors/+/temperature/+
。
通配符有兩種:+
和 #
。+
用來通配單一的某層級;#
用來通配剩下的全部層級(所以 #
只能處於主題的最後層級)。
舉兩個🌰,對於主題
a/b/c/d
,以下主題與之匹配:
a/b/c/d
+/b/c/d
a/+/c/d
a/+/+/d
+/+/+/+
#
a/#
a/b/#
a/b/c/#
+/b/c/#
而以下主題與之不匹配:
a/b/c
b/+/c/d
+/+/+
特別地,主題的層級能夠是空字符串,例如:a//b
, /a/b
, /a/b/
。這 3 個主題實際上都有 3 個層級:a//b
的各層級分別爲 a
, <空字符串>
, b
;/a/b
的各層級分別是 <空字符串>
, a
, b
…… +
和 #
通配符將<空字符串>
也當作「有字符串」處理。
這就是說:
a//b
能夠和 a/+/b
匹配;/a/b
能夠和 +/a/b
、/#
匹配;a/b/
能夠和 a/b/+
、a/b/#
匹配在以上的介紹中,咱們知道客戶端有兩個地方能夠指定消息的服務質量:一個是在客戶端者訂閱主題時,能夠在訂閱消息包中指定 QoS(見 SUBSCRIBE Packet);另外一個是在客戶端發送消息時,能夠指定 QoS(見 PUBLISH Packet)。那麼問題來了,它們有啥區別呢?會不會出現矛盾?好比,某客戶端訂閱時要求 QoS 爲 0,而發送消息時指定 QoS 爲 2,那服務端到底該採用哪一種服務質量進行服務呢?
回答:這並不矛盾。由於客戶端在訂閱主題時指定的 QoS,是在要求服務端,「當我訂閱的主題有消息時,請在該 QoS 下將消息發給我」。該 QoS 針對的是服務端轉發消息到客戶端的過程;而客戶端發送消息時指定的 QoS,是在要求服務端,「我在該 QoS 下,把這段消息發送給你」。這個 QoS 針對的是客戶端發送消息到服務端的過程。
下面咱們用 Python (3.x)來實現一個簡單的 MQTT 客戶端。該客戶端能夠鏈接上 MQTT 服務器,訂閱以及發佈消息。代碼會涉及到 Python Socket 相關的 API,若是你還不太熟悉,能夠先看看 Python 官方提供的 文檔。
client.py
import logging import socket import sys import traceback from packet.connack_packet import ConnackPacket from packet.connect_packet import ConnectPacket from packet.puback_packet import PubackPacket from packet.pubcomp_packet import PubcompPacket from packet.publish_packet import PublishPacket from packet.pubrec_packet import PubrecPacket from packet.pubrel_packet import PubrelPacket from packet.suback_packet import SubackPacket from packet.subscribe_packet import SubscribePacket from util.common import random_str, merge_dict logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', stream=sys.stdout, level=logging.DEBUG) _default_options = { "keepalive": 60, "will_topic": None, "will_message": None, "will_retain": None, "will_qos": None, "clean_session": True, "ping_interval": 300, } _receive_packet_types = { 2: ConnackPacket, 3: PublishPacket, 4: PubackPacket, 5: PubrecPacket, 6: PubrelPacket, 7: PubcompPacket, 9: SubackPacket, } class Client: def __init__(self, host, port=1883, client_id=random_str(6), username=None, password=None, **options): self._host = host self._port = port self._username = username self._password = password self._client_id = client_id self._options = merge_dict(_default_options, options) # No default callbacks self._on_connect = None self._on_message = None self._socket = None self._packet_id = 0 self._unack_package_ids = set() self._unack_packet = {} def connect(self): """connect MQTT broker """ self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.setblocking(True) self._socket.connect((self._host, self._port)) # Send connect packet packet = ConnectPacket(self._client_id, self._username, self._password, self._options['keepalive'], self._options['will_topic'], self._options['will_message'], self._options['will_retain'], self._options['will_qos'], self._options['clean_session']) self._send_packet(packet) def reconnect(self): """reconnect MQTT broker""" pass def close(self): """close connection and clear the resource """ pass def loop_forever(self): """Receive data from broker in an loop""" while True: try: packet_type, flags, packet_bytes, remaining_length = self._recv_packet() if packet_type not in _receive_packet_types: logging.warning("unknown packet type: %s", packet_type) continue packet = _receive_packet_types[packet_type].from_bytes(packet_bytes) logging.debug('receive a packet: %s', packet) # CONNACK Packet if isinstance(packet, ConnackPacket) and self._on_connect: self._on_connect(packet) # Publish Packet elif isinstance(packet, PublishPacket): if packet.qos == 0: # callback on_message self._on_message(packet) elif packet.qos == 1: # callback on_message self._on_message(packet) # publish ack for publish packet(qos = 1) self._send_packet(PubackPacket(packet.packet_id)) elif packet.qos == 2: # Store packet id self._unack_package_ids.add(packet.packet_id) # callback on_message self._on_message(packet) # send PUBREC packet self._send_packet(PubrecPacket(packet.packet_id)) # PUBACK Packet elif isinstance(packet, PubackPacket) and packet.packet_id in self._unack_packet: self._unack_packet.pop(packet.packet_id) # PUBREC Packet elif isinstance(packet, PubrecPacket) and packet.packet_id in self._unack_packet: # discard message self._unack_packet.pop(packet.packet_id) # store packet id self._unack_package_ids.add(packet.packet_id) # send PUBREL message self._send_packet(PubrelPacket(packet.packet_id)) # PUBCOMP Packet elif isinstance(packet, PubcompPacket) and packet.packet_id in self._unack_package_ids: self._unack_package_ids.remove(packet.packet_id) # PUBREL Packet elif isinstance(packet, PubrelPacket) and packet.packet_id in self._unack_package_ids: # discard packet id self._unack_package_ids.remove(packet.packet_id) # send PUBCOMP packet self._send_packet(PubcompPacket(packet.packet_id)) except KeyboardInterrupt: self.close() except ConnectionError as e: logging.warning("%s", e) return except Exception as e: logging.error("mqtt client occur error: %s", e) traceback.print_exc() continue def on_connect(self, func): """ """ self._on_connect = func return func def on_message(self, func): """ """ self._on_message = func return func def subscribe(self, topic: str, qos=0, *others_topic_qos): """訂閱消息 """ packet = SubscribePacket(self._next_packet_id(), topic, qos, *others_topic_qos) self._send_packet(packet) def unsubscribe(self, topics): """取消訂閱 """ pass def publish(self, topic: str, message: bytes, qos: int = 0, retain: bool = False, dup: bool = False): """發佈消息 """ if qos != 2: dup = False publish_packet = PublishPacket(dup, qos, retain, topic, self._next_packet_id(), message) self._send_packet(publish_packet) if qos != 0: # Store message self._unack_packet[publish_packet.packet_id] = publish_packet def _send_packet(self, packet): """發送數據包""" logging.debug('send a packet: {}'.format(packet)) packet_bytes = packet.to_bytes() self._socket.sendall(packet_bytes) def _recv_packet(self): packet_bytes = bytearray() # Read first byte read_bytes = self._socket.recv(1) self._check_recv_data(read_bytes) first_byte = read_bytes[0] packet_bytes.append(first_byte) packet_type = first_byte >> 4 flags = first_byte & 0x0F # Read second byte read_bytes = self._socket.recv(1) self._check_recv_data(read_bytes) second_byte = read_bytes[0] packet_bytes.append(second_byte) remaining_length = second_byte & 0x7F flag_bit = second_byte & 0x80 multiplier = 1 while True: if flag_bit == 0: break read_bytes = self._socket.recv(1) self._check_recv_data(read_bytes) next_byte = read_bytes[0] flag_bit = next_byte & 0x80 remaining_length += (next_byte & 0x7F) * multiplier # Read remaining bytes while remaining_length > 0: read_bytes = self._socket.recv(min(4096, remaining_length)) remaining_length -= len(read_bytes) packet_bytes.extend(read_bytes) return packet_type, flags, packet_bytes, remaining_length @staticmethod def _check_recv_data(data): if not data: raise ConnectionError('connection is closed') def _next_packet_id(self): self._packet_id += 1 return self._packet_id
測試一下效果:
import logging from client import Client mqtt_client = Client('127.0.0.1', username='derker', password='123456') @mqtt_client.on_connect def on_connect(connack_packet): logging.info('[on_connect]: sp = {}, return_code = {}'.format(connack_packet.sp, connack_packet.return_code)) mqtt_client.subscribe('$SYS/broker/version', 2) mqtt_client.publish('hello', bytes('I\'m derker', 'utf8'), 2) @mqtt_client.on_message def on_message(message): logging.info('[on_message]: {}'.format(message)) if __name__ == '__main__': mqtt_client.connect() mqtt_client.loop_forever()
打印結果以下:
2019-08-15 20:32:40,870 DEBUG: send a packet: <packet.connect_packet.ConnectPacket object at 0x11711f510> 2019-08-15 20:32:40,876 DEBUG: receive a packet: ConnackPacket(sp = 0, return_code = 0) 2019-08-15 20:32:40,876 INFO: [on_connect]: sp = 0, return_code = 0 2019-08-15 20:32:40,876 DEBUG: send a packet: <packet.subscribe_packet.SubscribePacket object at 0x11711f310> 2019-08-15 20:32:40,876 DEBUG: send a packet: PublishPacket(dup = False, qos = 2, retain = False, topic= hello, packet_id = 2, payload = b"I'm derker") 2019-08-15 20:32:40,877 DEBUG: receive a packet: SubackPacket(packet_id = 1, return_codes = [2]) 2019-08-15 20:32:40,877 DEBUG: receive a packet: PublishPacket(dup = 0, qos = 2, retain = True, topic= $SYS/broker/version, packet_id = 1, payload = bytearray(b'mosquitto version 1.6.3')) 2019-08-15 20:32:40,877 INFO: [on_message]: PublishPacket(dup = 0, qos = 2, retain = True, topic= $SYS/broker/version, packet_id = 1, payload = bytearray(b'mosquitto version 1.6.3')) 2019-08-15 20:32:40,877 DEBUG: send a packet: PubrecPacket(packet_id = 1) 2019-08-15 20:32:40,877 DEBUG: receive a packet: PubrecPacket(packet_id = 2) 2019-08-15 20:32:40,877 DEBUG: send a packet: PubrelPacket(packet_id = 2) 2019-08-15 20:32:40,878 DEBUG: receive a packet: PubrelPacket(packet_id = 1) 2019-08-15 20:32:40,878 DEBUG: send a packet: PubcompPacket(packet_id = 1) 2019-08-15 20:32:40,878 DEBUG: receive a packet: PubcompPacket(packet_id = 2)
代碼很簡單,這裏就不作解釋了。其中省略了對 Disconnect Packet 的處理,也沒有實現消息重發,重連。其中使用到的一些工具方法和各包的數據結構可在 GitHub 中查看。
呃, 不想寫了 :-)