使用Python發送、訂閱消息
使用插件 paho-mqtt
官方文檔:http://shaocheng.li/post/blog/2017-05-23 Paho 是一個開源的 MQTT 客戶端項目,提供多種語言的 MQTT 客戶端實現,包括 C、C++、C#、Java、Python、JavaScript 等,徹底支持 MQTT v3.1 和 v3.1.1 。Paho Python Client 是它的 Python 語言版本,支持 Python 2.7 和 3.x 。更多特性能夠查看 http://www.eclipse.org/paho/clients/python/ ,源碼和文檔在 https://github.com/eclipse/paho.mqtt.python 。python
安裝
在 Python 環境中用 pip install paho-mqtt
命令安裝,或者下載源碼:git
git clone https://github.com/eclipse/paho.mqtt.python.git cd org.eclipse.paho.mqtt.python.git python setup.py install
下面是一個簡單的例子,鏈接一個 borker ,訂閱系統默認話題,獲取 broker 的版本號:github
import paho.mqtt.client as mqtt def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) client.subscribe("$SYS/broker/version") def on_message(client, userdata, msg): print(msg.topic+" "+str(msg.payload)) client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect("iot.eclipse.org", 1883, 60) client.loop_forever()
保存到 paho-mqtt.py 文件,執行:web
$ python paho-mqtt.py Connected with result code 0 $SYS/broker/version mosquitto version 1.4.10
編程
paho.mqtt 包提供了三個類,Client、Publish 和 Subscribe。Publish 和 Subscribe 提供了簡單的方法,一次性的發送或者接收消息,不會保持鏈接。Client 包含了新建客戶端、鏈接、訂閱、發送、回調函數等方法。一般的編程步驟是新建一個 Client 的實例,而後調用它提供的鏈接、發佈和訂閱等方法與 broker 通信:編程
- 新建一個 Client 實例
- 用一個 connect*() 函數鏈接 broker
- 用一個 loop*() 函數,維持與 broker 的鏈接
- 用 subscribe() 函數訂閱一個話題,接收消息
- 用 publish() 函數發佈消息
- 用 disconnect() 函數斷開鏈接
下面主要介紹 Client 提供的方法,使用前先導入:ubuntu
import paho.mqtt.client as mqtt
初始化
新建一個 Client 實例:安全
Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")
這是 Client 類的構造函數,參數的含義:bash
- client_id ,設置客戶端的 ID ,應該是一個字符串,鏈接時向 broker 提交。若是爲空,會隨機生成一個 id ,此時,clean_session 必須設爲 True 。
- clean_session ,布爾型,若是爲 True ,斷開鏈接時,broker 會清除關於這個 client 的全部信息。若是爲 False ,斷開鏈接時,broker 會保留這個客戶端的訂閱信息和消息隊列。
- userdata ,用戶自定義的數據,能夠是任何類型,傳遞給回調函數。能夠用 user_data_set() 函數更新。
- protocol ,設置 MQTT 協議的版本,MQTTv31 或者 MQTTv311 。
- transport , 傳輸協議,默認仍是 tcp ,能夠設爲 websockets 。
構造實例:服務器
import paho.mqtt.client as mqtt mqttc = mqtt.Client()
能夠調用 reinitialise() 從新初始化 Client :websocket
reinitialise(client_id="", clean_session=True, userdata=None)
配置
這些函數用來設置 Client 的一些特性,一般在鏈接 broker 以前調用。
max_inflight_messages_set(self, inflight)
這個函數能夠設置當 QoS>0 時,最多能夠存在幾條動態消息(已經發送,尚未確認成功的消息)。默認是 20 ,增長這個值會佔用更多的內存,可是能夠提高吞吐量。
max_queued_messages_set(self, queue_size)
這個函數能夠設置當 QoS>0 時,發送消息隊列的最大值,默認是 0 ,表示無限制。當隊列滿時,舊消息會丟棄。
message_retry_set(retry)
當 Qos>0 時,若是發送消息後超過必定時間尚未收到確認報文,就要重發消息,這個函數用於設置超時時間,單位是秒。默認是 5 秒,一般不用修改。
tls_set(ca_certs, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED,tls_version=ssl.PROTOCOL_TLSv1, ciphers=None)
配置 SSL 證書驗證的函數,必須在 connect*() 函數以前調動。幾個參數的含義:
- ca_certs ,指定 CA 根證書的路徑。
- certfile,keyfile ,指定客戶端私鑰和證書的路徑。
- cert_reqs ,設置客戶端對 broker 證書的需求,默認是 ssl.CERT_REQUIRED ,表示 broker 必須提供一個證書。
- tls_version ,設置 SSL/TLS 協議的版本,默認是 TLS v1 。
- ciphers ,設置本次鏈接的加密密碼,默認是 None 。
設置用戶名和密碼:
username_pw_set(username, password=None)
設置遺囑:
will_set(topic, payload=None, qos=0, retain=False)
當這個 client 斷開鏈接時,broker 會發布這個遺囑消息。參數的含義:
- topic ,遺囑消息的話題
- payload ,遺囑消息的內容,字符串類型,若是設爲 None ,會發送一條長度爲 0 消息。若是設置了 int 或者 float 類型的值,會當作字符串發送,若是你想發送真正的 int 或者 float 值,須要用 struct.pack() 生成消息。
- qos ,遺囑消息的安全等級
- retain ,若是設爲 True ,遺囑消息會被設爲保留消息
若是參數設置錯誤,函數會拋出 ValueError 異常。
鏈接
最基本的鏈接方法是 connect() :
connect(host, port=1883, keepalive=60, bind_address="")
鏈接到 broker ,這是一個阻塞函數,參數的含義:
- host ,broker 的 hostname 或者 IP
- port ,broker 的開放端口,默認是 1883 ,若是使能了 SSL/TLS ,端口多是 8883
- keepalive ,心跳間隔,單位是秒,若是 broker 和 client 在這段時間內沒有任何通信,client 會給 broker 發送一個 ping 消息
- bind_address ,若是 client 的本地計算機有多個網絡接口,能夠用這個參數綁定其中的一個
client 調用該函數發起鏈接後,若是收到 broker 發來的 CONNACK 消息,就會執行 on_connect() 回調函數。除此以外,還有 connect_async() 和 connect_srv() 兩種函數能夠鏈接 broker 。connect_async() 須要配合 loop_start() 函數以非阻塞的方式鏈接 broker。connect_srv() 是從 SRV DNS 獲取 broker 的地址,而後再鏈接。 調用過 connect*() 函數以後,能夠調用 reconnect() 用現有的參數從新鏈接。調用 disconnect() 函數能夠從 broker 斷開鏈接,斷開鏈接後,會執行 on_disconnect() 回調函數。
網絡循環
網絡循環的函數有四種,它們運行在後臺,處理收發的消息。最基本的是 loop() :
loop(timeout=1.0, max_packets=1)
這個函數會經過 select() 函數阻塞,直到有消息須要收發,阻塞的時間用 timeout 參數設置,不能超過心跳時間 keepalive ,不然你的 client 會定時從 broker 斷開。max_packets 參數已通過時,無需設置。 另外一個循環函數是 loop_forever() ,它會一直阻塞,直到 client 調用了 disconnect() ,而且,它會自動重連:
loop_forever(timeout=1.0, max_packets=1, retry_first_connection=False)
timeout 和 max_packets 參數已通過時,無需設置。
發佈
publish(topic, payload=None, qos=0, retain=False)
向指定話題發送一條消息,參數的含義:
- topic ,這條消息所屬的話題
- payload ,消息內容,字符串類型,若是設爲 None ,會發送一條長度爲 0 消息。若是設置了 int 或者 3. float 類型的值,會當作字符串發送,若是你想發送真正的 int 或者 float 值,須要用 struct.pack() 生成消息。
- qos ,消息的安全等級
- retain ,若是設爲 Ture ,這條消息會被設爲保留消息
若是參數設置錯誤,會拋出 ValueError 異常。消息發送成功後,會執行 on_publish() 回調函數。
訂閱
subscribe(topic, qos=0)
向 broker 訂閱話題,參數 topic 設置話題名稱,qos 設置安全等級。若是隻訂閱一個話題,直接設置兩個參數便可,例如:
subscribe(("my/topic", 1))
若是要訂閱多個話題,能夠將每一個話題放在一個元組中,多個話題組成一個列表:
subscribe([("my/topic", 0), ("another/topic", 2)])
當 broker 確認訂閱有效後,client 會執行 on_subscribe() 回調函數。若是要取消訂閱某個話題,能夠調用 unsubscribe(topic) ,參數是字符串型,若是是取消多個話題,參數應該是一個字符串列表。取消成功的話,會執行 on_unsubscribe() 回調函數。
回調函數
當 broker 對 client 的鏈接請求作出迴應時,會調用 on_connect() 回調函數,能夠在該函數中判斷鏈接是否成功:
on_connect(client, userdata, flags, rc)
參數 client 是當前 client 的實例,userdata 是 Client() 或 userdata_set() 設置的用戶數據。flags 是 broker 發送的迴應 flags ,字典類型。rc 表示鏈接結果,整數型,0 表示鏈接成功,鏈接失敗可能的值有:
- 錯誤的協議版本
- 無效的 client ID
- 服務器不可用
- 錯誤的用戶名或密碼
- 沒法驗證
使用實例:
def on_connect(client, userdata, flags, rc): print("Connection returned result: "+connack_string(rc)) mqttc.on_connect = on_connect ...
對應的,與 broker 斷開鏈接後,會執行 on_disconnect() 回調函數:
on_disconnect(client, userdata, rc)
rc 表示斷開鏈接的狀態,若是是 0 ,表示是調用了 disconnect() 引發的斷開鏈接,其餘結果表示意外斷開,好比網絡中斷。使用實例:
def on_disconnect(client, userdata, rc): if rc != 0: print("Unexpected disconnection.") mqttc.on_disconnect = on_disconnect ...
當 client 接收到已訂閱的話題的消息時,會調用 on_message() 回調函數,在該函數中判斷是哪一個話題的消息,並處理消息內容:
on_message(client, userdata, message)
參數 message 是 MQTTMessage 類的實例,這個類包含的成員有 topic ,payload ,qos ,retain 。使用實例:
def on_message(client, userdata, message): print("Received message '" + str(message.payload) + "' on topic '" + message.topic + "' with QoS " + str(message.qos)) mqttc.on_message = on_message ...
若是要用通配符同時處理多個話題的消息,例如用 sensors/# 匹配 sensors/temperature 和 sensors/humidity 話題,能夠用 message_callback_add() 設置回調函數:
message_callback_add(sub, callback)
參數 sub 是一個使用通配符的話題過濾器,字符串型,用 callback 參數指定回掉函數,與 on_message() 相同的類型。 若是同時設置了 on_message() 和 message_callback_add() 回調函數,會首先尋找合適的 message_callback_add() 定義的話題過濾器,若是沒有匹配,纔會調用 on_message() 。
實例
假設 broker 要求提供用戶名、密碼、證書和密鑰,下面是一個簡單的 client 例子:
$ cat path-mqtt.py
#!/usr/bin/python import paho.mqtt.client as mqtt cafile = "/etc/mosquitto/ca/ca.crt" certfile = "/home/ubuntu/CA/client.crt" keyfile = "/home/ubuntu/CA/client.key" user = "guest" passwd = "12345678" server = "localhost" port = 8883 def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) client.subscribe("$SYS/broker/version") def on_message(client, userdata, msg): print(msg.topic+" "+str(msg.payload)) client = mqtt.Client() client.tls_set(cafile,certfile,keyfile) client.username_pw_set(user,passwd) client.on_connect = on_connect client.on_message = on_message client.connect(server, port, 60) client.loop_forever()
執行:
$ ./path-mqtt.py Connected with result code 0 $SYS/broker/version mosquitto version 1.4.11
簡單實例
# pip install paho-mqtt import paho.mqtt.client as mqtt import time server = "192.168.1.168" # mqtt地址 port = 1883 # mqtt端口 def on_connect(client,userdata,flags,rc): """ mqtt鏈接成功的回調 """ print("MQTT服務器鏈接返回碼:" + str(rc)) client.subscribe([("/wjw/znqs/123456789/1/down",0),("/wjw/znqs/123456789/2/down",0)]) def on_message(client,userdata,msg): """ mqtt收到訂閱消息回調 """ print(msg.topic +" --> "+(msg.payload).decode()) client = mqtt.Client() client.on_connect = on_connect # 綁定mqtt鏈接回調 client.on_message = on_message # 綁定mqtt訂閱消息回調 client.connect(server,port,60) # 鏈接mqtt服務 client.publish("/wjw/znqs/123456789/1/up", payload="1 2 3 4 5 6 7", qos=0, retain=False) print(time.time() , " /wjw/znqs/123456789/1/up ---> 1 2 3 4 5 6 7") client.loop_forever() # 阻塞函數 """ while 1: client.publish("/wjw/znqs/123456789/1/up", payload="1 2 3 4 5 6 7", qos=0, retain=False) print(time.time() , " /wjw/znqs/123456789/1/up ---> 1 2 3 4 5 6 7") time.sleep(5) """