樹莓派 由英國樹莓派基金會開發,是一款基於 ARM 的微型計算機主板。該主板提供 USB 接口和以太網接口,能夠鏈接鍵盤、鼠標和網線,該主板具有 PC 的基本功能,同時樹莓派集成了 Wi-Fi,藍牙以及大量 GPIO,被普遍運用在教學、家庭娛樂、物聯網等。 python
MQTT 是一種基於發佈/訂閱模式的 輕量級物聯網消息傳輸協議 ,能夠用極少的代碼和帶寬爲聯網設備提供實時可靠的消息服務,它適用於硬件資源有限的設備及帶寬有限的網絡環境。所以,MQTT 協議普遍應用於物聯網、移動互聯網、智能硬件、車聯網、電力能源等行業。 git
在此項目中,咱們將在樹莓派上使用 Python 編寫簡單的 MQTT 客戶端,並實現該客戶端與 MQTT 服務器的鏈接、訂閱、取消訂閱、收發消息等功能。github
本項目使用 Python3 進行開發,通常狀況下,樹莓派系統會內置 Python3,若是不肯定系統內是否已經安裝,可使用下面的命令進行確認。docker
python3 --version
若是顯示 Python 3.x.x(x 表示數字)則表示已經安裝,不然請使用 apt 命令安裝(或跟隨 Python3 安裝指南 操做 )。服務器
sudo apt install python3
爲了方便鏈接到 MQTT 服務器,咱們須要安裝 paho-mqtt
庫。能夠選擇如下兩種方法之一進行安裝。網絡
使用源碼安裝app
git clone https://github.com/eclipse/paho.mqtt.python cd paho.mqtt.python python3 setup.py install
使用 pip3 安裝eclipse
pip3 install paho-mqtt
本文將使用 EMQ X 提供的 免費公共 MQTT 服務器,該服務基於 EMQ X 的 MQTT 物聯網雲平臺 建立。服務器接入信息以下:socket
若是有須要,您也可使用 docker 在本地快速安裝 EMQ X 服務器。編輯器
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 18083:18083 emqx/emqx
鏈接示例代碼
# test_connect.py import paho.mqtt.client as mqtt # 回調函數。當嘗試與 MQTT broker 創建鏈接時,觸發該函數。 # client 是本次鏈接的客戶端實例。 # userdata 是用戶的信息,通常爲空。但若是有須要,也能夠經過 user_data_set 函數設置。 # flags 保存服務器響應標誌的字典。 # rc 是響應碼。 # 通常狀況下,咱們只須要關注 rc 響應碼是否爲 0 就能夠了。 def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected success") else: print(f"Connected fail with code {rc}") client = mqtt.Client() client.on_connect = on_connect client.connect("broker.emqx.io", 1883, 60) client.loop_forever()
將上面的代碼保存爲 test_connect.py 文件,並運行
python3 test_connect.py
咱們在 on_connect 函數裏對響應碼進行了判斷,爲 0 則輸出 Connected success
表示鏈接成功。若是返回的是其它數字,咱們就須要對照下面的響應碼進行判斷。
0: 鏈接成功 1: 鏈接失敗-不正確的協議版本 2: 鏈接失敗-無效的客戶端標識符 3: 鏈接失敗-服務器不可用 4: 鏈接失敗-錯誤的用戶名或密碼 5: 鏈接失敗-未受權 6-255: 未定義 若是是其它問題,能夠檢查網絡狀況,或者確認是否安裝了 `paho-mqtt`。
在 MQTT 協議的概念中,消息是經過主題傳遞的,好比設備 A 向主題 T 發送消息,那麼只有訂閱了主題 T 的設備才能接收到。因此僅僅接入 MQTT 服務器並無太大意議,要完整地使用 MQTT 服務,咱們還須要知道如何訂閱和發佈消息。
打開任意編輯器,輸入下面的代碼,並保存爲 subscriber.py 文件:
# subscriber.py import paho.mqtt.client as mqtt def on_connect(client, userdata, flags, rc): print(f"Connected with result code {rc}") # 訂閱,須要放在 on_connect 裏 # 若是與 broker 失去鏈接後重連,仍然會繼續訂閱 raspberry/topic 主題 client.subscribe("raspberry/topic") # 回調函數,當收到消息時,觸發該函數 def on_message(client, userdata, msg): print(f"{msg.topic} {msg.payload}") client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message # 設置遺囑消息,當樹莓派斷電,或者網絡出現異常中斷時,發送遺囑消息給其餘客戶端 client.will_set('raspberry/status', {"status": "Off"}) # 建立鏈接,三個參數分別爲 broker 地址,broker 端口號,保活時間 client.connect("broker.emqx.io", 1883, 60) # 設置網絡循環堵塞,在調用 disconnect() 或程序崩潰前,不會主動結束程序 client.loop_forever()
調用 subscribe()
函數,可讓樹莓派訂閱一個主題。在上面的代碼中,咱們使用它訂閱了 raspberry/topic
主題,並監聽消息。
另外,咱們還使用 will_set()
設置了遺囑消息。 遺囑消息是 MQTT 的一個特性,當設備在乎外斷開網絡鏈接後,會向某個特定的主題發送消息。經過這個特性,咱們能夠得知樹莓派是否斷電,或者出現網絡異常。
打開任意編輯器,輸入下面的代碼,並保存爲 publisher.py 文件:
import paho.mqtt.client as mqtt import time def on_connect(client, userdata, flags, rc): print(f"Connected with result code {rc}") client = mqtt.Client() client.on_connect = on_connect client.connect("broker.emqx.io", 1883, 60) # 每間隔 1 秒鐘向 raspberry/topic 發送一個消息,連續發送 5 次 for i in range(5): # 四個參數分別爲:主題,發送內容,QoS, 是否保留消息 client.publish('raspberry/topic', payload=i, qos=0, retain=False) print(f"send {i} to raspberry/topic") time.sleep(1) client.loop_forever()
調用 publish()
函數,能夠向一個主題發送消息。在上面的代碼中,咱們使用了它向主題 raspberry/topic
發送消息。其中參數 QoS 是另外一個 MQTT 特性,若是你想了解更多 QoS 的內容,能夠查看 MQTT QoS(服務質量)介紹,這裏咱們暫且設爲 0。
咱們使用 MQTT 5.0 客戶端工具 - MQTT X 進行如下測試。
運行 Python 代碼,並主動發送一個消息。
打開終端,運行 Python 代碼,監聽消息 。
python3 subscriber.py
raspberry/topic
發送消息 。raspberry/topic
主題 。接下來測試一下遺囑消息是否設置成功。
raspberry/status
。raspberry/status
主題接收到的消息。咱們完成了在樹莓派上使用 Python MQTT 客戶端庫 paho-mqtt
編寫測試客戶端, 並實現了測試客戶端與 MQTT 服務器的鏈接、訂閱、取消訂閱、收發消息等功能。
至此,您已經學會了如何簡單的使用 MQTT 服務,雖然這只是 MQTT 服務的一小部分,但也足夠完成不少有意思的事,好比:
1. 使用手機發送 MQTT 消息,遠程控制樹莓派。 2. 定時將樹莓派的設備信息發送到 MQTT 服務器,經過手機接收消息,能夠進行全天候監控。 3. 能夠經過將樹莓派接入 MQTT 服務器,並配合各種傳感器及 ESP 模塊建立不少有趣的物聯網應用。
接下來咱們將會陸續發佈更多關於物聯網開發及樹莓派的相關文章,盡情關注。
版權聲明: 本文爲 EMQ 原創,轉載請註明出處。