MQTT之初體驗

初識mqtt

MQTT 是一種基於 發佈/訂閱(publish/subscribe) 模式的「輕量級」通信協議,該協議創建在TCP/IP協議上。MQTT最大優勢在於,能夠以極少的代碼和有限的寬帶爲遠程鏈接設備提供實時可靠的消息服務。做爲一種低開銷、低寬帶佔用的即時通信協議,使其在物聯網、小型設備、移動開發等方面有比較普遍的應用。 MQTT是一個基於客戶端-服務器的消息發佈/訂閱傳輸協議,MQTT協議中有三種身份:發佈者(Publish)代理(Broker)(服務器)訂閱者(Subscribe),形式以下:
html

image

設計規範

  • 精簡,不添加無關緊要的功能。
  • 發佈/訂閱(Pub/Sub)模式,方便消息在傳感器之間傳遞。
  • 容許用戶動態建立主題,零運維成本。
  • 把輸入量下降到最低以提升傳輸效率。
  • 把低寬帶、高延遲、不穩定的網絡因素考慮在內。
  • 支持連續的會話控制。
  • 理解客戶端計算能力可能很低。
  • 提供服務質量管理
  • 假設數據不可知不強求傳輸數據的類型與格式保持靈活性。

主要特性

  • 使用發佈/訂閱消息模式,提供一對多的消息發佈,解除應用程序耦合。
  • 對負載內容屏蔽的消息傳輸
  • 使用tcp/ip提供網絡鏈接
  • 有三種消息發佈質量
    • 至多一次:消息發佈徹底依賴底層TCP/IP網絡。會發生消息丟失或重複
    • 至少一次:確保消息到達,但消息重複可能會發生。
    • 只有一次:確保消息到達一次
  • 小型傳輸,開銷很小(固定長度的頭部字節是2字節),協議交換最小化,以下降網絡流量。
  • 使用Last WillTestament特性通知有關各方客戶端異常中斷的機制
    • Last Will:即遺言機制,用於通知同一主題下的去他設備發送遺言的設備已經斷開鏈接。
    • Testament:遺囑機制,功能相似Last Will

MQTT協議原理

MQTT協議實現方式

實現MQTT協議須要客戶端和服務器端通信完成,在通信過程當中MQTT協議中有三種身份:發佈者(Publish)代理(Broker)(服務器)訂閱者(Subscribe)。其中消息的發佈者和訂閱者都是客戶端,消息的代理是服務器,消息發佈者能夠同時是訂閱者。 MQTT傳輸的消息分爲:java

  • Topic(主題):能夠理解爲接頭暗號,訂閱者訂閱後(即口號對上後),就會收到該主題的消息內容(payload)
  • payload(負載):能夠理解爲消息內容,是指訂閱者具體須要的內容/

網路傳輸與應用消息

MQTT會構建鍍層網絡傳輸:他將創建客戶端到服務器的鏈接,提供二者之間的一個有序的、無損的、基於字節流的雙向傳輸。當應用數據經過MQTT網絡發送時,MQTT會把與之相關的服務質量(Qos)和主題名(Topic)相關聯。python

MQTT客戶端

一個使用MQTT協議的應用程序或者設備,他老是創建到服務器的網絡鏈接。客戶端能夠:bash

  • 發佈其餘客戶端可能會訂閱的信息
  • 訂閱其餘客戶端信息
  • 退訂或刪除應用程序的消息
  • 斷開與服務器鏈接

MQTT服務器

MQTT服務器以稱爲消息代理(Broker),能夠是一個程序或一臺設備他是位於消息發佈者和訂閱者之間,它能夠:服務器

  • 接受來自客戶端的鏈接
  • 接受客戶端發佈的應用信息
  • 處理來自客戶端的訂閱退訂請求
  • 向訂閱的客戶端轉發應用程序消息

MQTT協議中的訂閱、主題、會話

訂閱(Subscription)

訂閱包含主題篩選器(Topic Filter)和最大服務質量(Qos)。訂閱會與一個會話(Session)關聯。一個會話能夠包含多個訂閱。每一個會話中的每一個訂閱都有一個不一樣的主題篩選器。網絡

會話(Session)

每一個客戶端與服務器創建鏈接後就是一個會話,客戶端和服務器之間有狀態交互。會話存在與一個網絡之間,也可能在客戶端和服務器之間跨越多個連續的網絡鏈接。運維

主題名(Topic Name)

鏈接到一個應用程序消息的標籤,該標籤與服務器的訂閱相匹配。服務器會將消息發送給訂閱者所匹配標籤的每一個客戶端。tcp

主題篩選器(Topic Filter)

一個對主題名通配符篩選器,在訂閱表達式中使用,表示訂閱所匹配到的多個主題。函數

負載(Payload)

消息訂閱者所具體接受的內容。oop

MQTT協議中的方法

MQTT協議中定義了一些方法(也被稱爲動做),用於來表示對肯定資源所進行操做,這個資源能夠表明預先存在的數據或動態生成數據,這取決於服務器的實現。一般來講,資源指服務器上的文件或輸出。主要方法有:

  • Connect:與服務器創建鏈接,鏈接成功後有個回調函數(以下是python代碼)
# 連接mqtt服務器函數
    def on_mqtt_connect(self):
        self.client.connect(self.MQTTHOST, self.MQTTPORT, 60)
        # 開始監聽
        self.client.loop_start() 

# 連接完成後的回調函數
    def on_connect(self, client, userdata, flags, rc):
        logging.info("+++ Connected with result code {} +++".format(str(rc)))
        self.client.subscribe(self.topic_from_base)
複製代碼
  • Disconnect: 等待MQTT客戶端完成所作的工做,並與服務器斷開TCP/IP會話。
  • Subscribe:等待完成訂閱。(python代碼以下)
# 訂閱函數
    def subscribe(self):
        self.client.subscribe(self.topic_from_base, 1)
        # 消息到來處理函數
        self.client.on_message = self.on_message
        
    # 接收到信息後的回調函數, (client:客戶端信息,userdata:用戶信息,msg:消息體)
    def on_message(self, client, userdata, msg):
        pass
複製代碼
  • UnSubscribe:等待服務器取消客戶端的一個或多個訂閱。
  • Publish:MQTT客戶端發送消息請求,發送完成後返回應用程序線程。(python代碼以下)
# 數據發送函數
    def publish(self, index, fuc):
        # mqtt發送
        self.client.publish(
            self.topic_to_base, #主題
            to_base_data    # 消息題
        )
複製代碼

MQTT協議數據包結構

在MQTT協議中,一個MQTT數據包由:固定頭(Fixed header)、可變頭(Variable header)、消息題(Payload)三部分組成。MQTT數據包結構以下:

  • 固定頭(Fixed header): 存在與全部MQTT數據包中,表示數據包類型及數據包的分組類標識。
  • 可變頭(Variable header):存在與部分MQTT數據包中,數據包類型決定了可變頭是否存在及其具內容。
  • 消息體(Payload):存在於部分MQTT數據包中,表示客戶端收到的具體內容。
MQTT固定頭

固定頭存在與全部MQTT數據包中,使用兩個字節看,共十六位 其結構以下:

image

MQTT數據包類型

位置:Byte 1中的4-7位。

使用四位二進制,可表明十六種類型消息:

image

除去0和15位置屬於保留待用,共十四種消息時間類型。

標誌位

位置:Byte 1 中的0-3

  • DUP :保證消息可靠傳輸,默認爲0,只佔用一個字節,表示第一次發送。不能用於檢測消息重複發送等。只適用於客戶端或服務器端嘗試重發PUBLISH, PUBREL, SUBSCRIBE 或 UNSUBSCRIBE消息,注意須要知足條件:當QoS > 0消息須要回覆確認 此時,在可變頭部須要包含消息ID,當值爲1時,表示當前效益先前已經被傳送過。
  • Qos(服務質量):
    image
  • RETAIN:發佈保留標識,表示服務器要保留此次推送的信息,若是有新的訂閱者出現,就把這消息推送給它,若是設有那麼推送至當前訂閱者後釋放
剩餘長度

位置:Byte 2

固定頭的第二字節用來保存變長頭部和消息體的總大小的,但不是直接保存的。這一字節是能夠擴展,其保存機制,前7位用於保存長度,後一部用作標識。當最後一位爲1時,表示長度不足,須要使用二個字節繼續保存。例如:計算出後面的大小爲0

MQTT可變頭

MQTT數據包中包含一個可變頭,它駐位於固定的頭和負載之間。不少類型數據包中都包括一個2字節的數據包標識字段,這些類型的包有:PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK。可變頭部內容字節長度 + Playload/負荷字節長度 = 剩餘長度,這個是須要牢記的可變頭的內容因數據包類型而不一樣,較常的應用是做爲包的標識:

image

消息體(Payloa)

Payload消息體位MQTT數據包的第三部分,包含CONNECT、SUBSCRIBE、SUBACK、UNSUBSCRIBE四種類型的消息:

  • CONNECT:消息體內容主要是:客戶端的ClientID、訂閱的Topic、Message以及用戶名和密碼。
  • SUBSCRIBE:消息體內容是一系列的要訂閱的主題以及QoS。
  • SUBACK:消息體內容是服務器對於SUBSCRIBE所申請的主題及QoS進行確認和回覆。
  • UNSUBSCRIBE:消息體內容是要訂閱的主題。

python 簡單測試代碼

# -*- coding: utf-8 -*-

import paho.mqtt.client as mqtt
import time

class Mqtt(object):
    def __init__(self):
        self.MQTTHOST = "********"
        self.MOTTPORT = "********"
        self.client_id = time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))
        self.client = mqtt.Client(self.client_id)
        self.client.username_pw_set()

        # 設置連接上服務器後回調函數
        self.client.on_connect = self.on_connect

        # 設置接收到服務器消息後回調函數
        self.client.on_message = self.on_message

        # 鏈接服務器,維持心跳爲60秒
        self.client.connect(self.MQTTHOST, self.MOTTPORT, 60)
        self.client.loop_forever()

    def on_connect(self,client, userdata, flag, rc):
        print("Connected with result code " + str(rc))
        client.subscribe("test")

    def on_publish(self, topic, payload, qos):
        self.client.publish(topic, payload, qos)
        print(topic+"消息發送成功。。。。。")
        return 1

    def on_message(self, client, userdata, msg):
        print(msg.topic+ "" +msg.payload.decode("utf-8"))
        return 1

    def on_subscribe(self, topic):
        self.client.subscribe(topic, 1)
        self.client.on_message = self.on_message
        return 1

if __name__ == "__main__":
    mqtt = Mqtt()
    mqtt.on_publish("test", "helloWord",1)
複製代碼
參考:

https://www.jianshu.com/p/5c42cb0ed1e9
http://www.runoob.com/w3cnote/mqtt-intro.html
http://www.blogjava.net/yongboy/archive/2014/02/07/409587.html

新手上車,請多指教,若有問題,請郵件聯繫:young5678@qq.com

相關文章
相關標籤/搜索