MQTT 是一種基於 發佈/訂閱(publish/subscribe) 模式的「輕量級」通信協議,該協議創建在TCP/IP協議上。MQTT最大優勢在於,能夠以極少的代碼和有限的寬帶爲遠程鏈接設備提供實時可靠的消息服務。做爲一種低開銷、低寬帶佔用的即時通信協議,使其在物聯網、小型設備、移動開發等方面有比較普遍的應用。 MQTT是一個基於客戶端-服務器的消息發佈/訂閱傳輸協議,MQTT協議中有三種身份:發佈者(Publish)、代理(Broker)(服務器)、訂閱者(Subscribe),形式以下:
html
實現MQTT協議須要客戶端和服務器端通信完成,在通信過程當中MQTT協議中有三種身份:發佈者(Publish)、代理(Broker)(服務器)、訂閱者(Subscribe)。其中消息的發佈者和訂閱者都是客戶端,消息的代理是服務器,消息發佈者能夠同時是訂閱者。 MQTT傳輸的消息分爲:java
MQTT會構建鍍層網絡傳輸:他將創建客戶端到服務器的鏈接,提供二者之間的一個有序的、無損的、基於字節流的雙向傳輸。當應用數據經過MQTT網絡發送時,MQTT會把與之相關的服務質量(Qos)和主題名(Topic)相關聯。python
一個使用MQTT協議的應用程序或者設備,他老是創建到服務器的網絡鏈接。客戶端能夠:bash
MQTT服務器以稱爲消息代理(Broker),能夠是一個程序或一臺設備他是位於消息發佈者和訂閱者之間,它能夠:服務器
訂閱包含主題篩選器(Topic Filter)和最大服務質量(Qos)。訂閱會與一個會話(Session)關聯。一個會話能夠包含多個訂閱。每一個會話中的每一個訂閱都有一個不一樣的主題篩選器。網絡
每一個客戶端與服務器創建鏈接後就是一個會話,客戶端和服務器之間有狀態交互。會話存在與一個網絡之間,也可能在客戶端和服務器之間跨越多個連續的網絡鏈接。運維
鏈接到一個應用程序消息的標籤,該標籤與服務器的訂閱相匹配。服務器會將消息發送給訂閱者所匹配標籤的每一個客戶端。tcp
一個對主題名通配符篩選器,在訂閱表達式中使用,表示訂閱所匹配到的多個主題。函數
消息訂閱者所具體接受的內容。oop
MQTT協議中定義了一些方法(也被稱爲動做),用於來表示對肯定資源所進行操做,這個資源能夠表明預先存在的數據或動態生成數據,這取決於服務器的實現。一般來講,資源指服務器上的文件或輸出。主要方法有:
# 連接mqtt服務器函數
def on_mqtt_connect(self):
self.client.connect(self.MQTTHOST, self.MQTTPORT, 60)
# 開始監聽
self.client.loop_start()
# 連接完成後的回調函數
def on_connect(self, client, userdata, flags, rc):
logging.info("+++ Connected with result code {} +++".format(str(rc)))
self.client.subscribe(self.topic_from_base)
複製代碼
# 訂閱函數
def subscribe(self):
self.client.subscribe(self.topic_from_base, 1)
# 消息到來處理函數
self.client.on_message = self.on_message
# 接收到信息後的回調函數, (client:客戶端信息,userdata:用戶信息,msg:消息體)
def on_message(self, client, userdata, msg):
pass
複製代碼
# 數據發送函數
def publish(self, index, fuc):
# mqtt發送
self.client.publish(
self.topic_to_base, #主題
to_base_data # 消息題
)
複製代碼
在MQTT協議中,一個MQTT數據包由:固定頭(Fixed header)、可變頭(Variable header)、消息題(Payload)三部分組成。MQTT數據包結構以下:
固定頭存在與全部MQTT數據包中,使用兩個字節看,共十六位 其結構以下:
位置:Byte 1中的4-7位。
使用四位二進制,可表明十六種類型消息:
除去0和15位置屬於保留待用,共十四種消息時間類型。
位置:Byte 1 中的0-3
位置:Byte 2
固定頭的第二字節用來保存變長頭部和消息體的總大小的,但不是直接保存的。這一字節是能夠擴展,其保存機制,前7位用於保存長度,後一部用作標識。當最後一位爲1時,表示長度不足,須要使用二個字節繼續保存。例如:計算出後面的大小爲0
MQTT數據包中包含一個可變頭,它駐位於固定的頭和負載之間。不少類型數據包中都包括一個2字節的數據包標識字段,這些類型的包有:PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK。可變頭部內容字節長度 + Playload/負荷字節長度 = 剩餘長度,這個是須要牢記的可變頭的內容因數據包類型而不一樣,較常的應用是做爲包的標識:
Payload消息體位MQTT數據包的第三部分,包含CONNECT、SUBSCRIBE、SUBACK、UNSUBSCRIBE四種類型的消息:
python 簡單測試代碼
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt
import time
class Mqtt(object):
def __init__(self):
self.MQTTHOST = "********"
self.MOTTPORT = "********"
self.client_id = time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))
self.client = mqtt.Client(self.client_id)
self.client.username_pw_set()
# 設置連接上服務器後回調函數
self.client.on_connect = self.on_connect
# 設置接收到服務器消息後回調函數
self.client.on_message = self.on_message
# 鏈接服務器,維持心跳爲60秒
self.client.connect(self.MQTTHOST, self.MOTTPORT, 60)
self.client.loop_forever()
def on_connect(self,client, userdata, flag, rc):
print("Connected with result code " + str(rc))
client.subscribe("test")
def on_publish(self, topic, payload, qos):
self.client.publish(topic, payload, qos)
print(topic+"消息發送成功。。。。。")
return 1
def on_message(self, client, userdata, msg):
print(msg.topic+ "" +msg.payload.decode("utf-8"))
return 1
def on_subscribe(self, topic):
self.client.subscribe(topic, 1)
self.client.on_message = self.on_message
return 1
if __name__ == "__main__":
mqtt = Mqtt()
mqtt.on_publish("test", "helloWord",1)
複製代碼
https://www.jianshu.com/p/5c42cb0ed1e9
http://www.runoob.com/w3cnote/mqtt-intro.html
http://www.blogjava.net/yongboy/archive/2014/02/07/409587.html
新手上車,請多指教,若有問題,請郵件聯繫:young5678@qq.com