使用Python發送、訂閱消息

使用Python發送、訂閱消息

使用插件 paho-mqtt

官方文檔:http://shaocheng.li/post/blog/2017-05-23 在這裏插入圖片描述 Paho 是一個開源的 MQTT 客戶端項目,提供多種語言的 MQTT 客戶端實現,包括 C、C++、C#、Java、Python、JavaScript 等,徹底支持 MQTT v3.1 和 v3.1.1 。Paho Python Client 是它的 Python 語言版本,支持 Python 2.7 和 3.x 。更多特性能夠查看 http://www.eclipse.org/paho/clients/python/ ,源碼和文檔在 https://github.com/eclipse/paho.mqtt.pythonpython

安裝

在 Python 環境中用 pip install paho-mqtt 命令安裝,或者下載源碼:git

git clone https://github.com/eclipse/paho.mqtt.python.git
cd org.eclipse.paho.mqtt.python.git
python setup.py install

下面是一個簡單的例子,鏈接一個 borker ,訂閱系統默認話題,獲取 broker 的版本號:github

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("$SYS/broker/version")

def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect("iot.eclipse.org", 1883, 60)

client.loop_forever()

保存到 paho-mqtt.py 文件,執行:web

$ python paho-mqtt.py
Connected with result code 0
$SYS/broker/version mosquitto version 1.4.10

編程

paho.mqtt 包提供了三個類,Client、Publish 和 Subscribe。Publish 和 Subscribe 提供了簡單的方法,一次性的發送或者接收消息,不會保持鏈接。Client 包含了新建客戶端、鏈接、訂閱、發送、回調函數等方法。一般的編程步驟是新建一個 Client 的實例,而後調用它提供的鏈接、發佈和訂閱等方法與 broker 通信:編程

  1. 新建一個 Client 實例
  2. 用一個 connect*() 函數鏈接 broker
  3. 用一個 loop*() 函數,維持與 broker 的鏈接
  4. 用 subscribe() 函數訂閱一個話題,接收消息
  5. 用 publish() 函數發佈消息
  6. 用 disconnect() 函數斷開鏈接

下面主要介紹 Client 提供的方法,使用前先導入:ubuntu

import paho.mqtt.client as mqtt

初始化

新建一個 Client 實例:安全

Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")

這是 Client 類的構造函數,參數的含義:bash

  1. client_id ,設置客戶端的 ID ,應該是一個字符串,鏈接時向 broker 提交。若是爲空,會隨機生成一個 id ,此時,clean_session 必須設爲 True 。
  2. clean_session ,布爾型,若是爲 True ,斷開鏈接時,broker 會清除關於這個 client 的全部信息。若是爲 False ,斷開鏈接時,broker 會保留這個客戶端的訂閱信息和消息隊列。
  3. userdata ,用戶自定義的數據,能夠是任何類型,傳遞給回調函數。能夠用 user_data_set() 函數更新。
  4. protocol ,設置 MQTT 協議的版本,MQTTv31 或者 MQTTv311 。
  5. transport , 傳輸協議,默認仍是 tcp ,能夠設爲 websockets 。

構造實例:服務器

import paho.mqtt.client as mqtt
mqttc = mqtt.Client()

能夠調用 reinitialise() 從新初始化 Client :websocket

reinitialise(client_id="", clean_session=True, userdata=None)

配置

這些函數用來設置 Client 的一些特性,一般在鏈接 broker 以前調用。

max_inflight_messages_set(self, inflight)

這個函數能夠設置當 QoS>0 時,最多能夠存在幾條動態消息(已經發送,尚未確認成功的消息)。默認是 20 ,增長這個值會佔用更多的內存,可是能夠提高吞吐量。

max_queued_messages_set(self, queue_size)

這個函數能夠設置當 QoS>0 時,發送消息隊列的最大值,默認是 0 ,表示無限制。當隊列滿時,舊消息會丟棄。

message_retry_set(retry)

當 Qos>0 時,若是發送消息後超過必定時間尚未收到確認報文,就要重發消息,這個函數用於設置超時時間,單位是秒。默認是 5 秒,一般不用修改。

tls_set(ca_certs, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED,tls_version=ssl.PROTOCOL_TLSv1, ciphers=None)

配置 SSL 證書驗證的函數,必須在 connect*() 函數以前調動。幾個參數的含義:

  1. ca_certs ,指定 CA 根證書的路徑。
  2. certfile,keyfile ,指定客戶端私鑰和證書的路徑。
  3. cert_reqs ,設置客戶端對 broker 證書的需求,默認是 ssl.CERT_REQUIRED ,表示 broker 必須提供一個證書。
  4. tls_version ,設置 SSL/TLS 協議的版本,默認是 TLS v1 。
  5. ciphers ,設置本次鏈接的加密密碼,默認是 None 。

設置用戶名和密碼:

username_pw_set(username, password=None)

設置遺囑:

will_set(topic, payload=None, qos=0, retain=False)

當這個 client 斷開鏈接時,broker 會發布這個遺囑消息。參數的含義:

  1. topic ,遺囑消息的話題
  2. payload ,遺囑消息的內容,字符串類型,若是設爲 None ,會發送一條長度爲 0 消息。若是設置了 int 或者 float 類型的值,會當作字符串發送,若是你想發送真正的 int 或者 float 值,須要用 struct.pack() 生成消息。
  3. qos ,遺囑消息的安全等級
  4. retain ,若是設爲 True ,遺囑消息會被設爲保留消息

若是參數設置錯誤,函數會拋出 ValueError 異常。

鏈接

最基本的鏈接方法是 connect() :

connect(host, port=1883, keepalive=60, bind_address="")

鏈接到 broker ,這是一個阻塞函數,參數的含義:

  1. host ,broker 的 hostname 或者 IP
  2. port ,broker 的開放端口,默認是 1883 ,若是使能了 SSL/TLS ,端口多是 8883
  3. keepalive ,心跳間隔,單位是秒,若是 broker 和 client 在這段時間內沒有任何通信,client 會給 broker 發送一個 ping 消息
  4. bind_address ,若是 client 的本地計算機有多個網絡接口,能夠用這個參數綁定其中的一個

client 調用該函數發起鏈接後,若是收到 broker 發來的 CONNACK 消息,就會執行 on_connect() 回調函數。除此以外,還有 connect_async() 和 connect_srv() 兩種函數能夠鏈接 broker 。connect_async() 須要配合 loop_start() 函數以非阻塞的方式鏈接 broker。connect_srv() 是從 SRV DNS 獲取 broker 的地址,而後再鏈接。 調用過 connect*() 函數以後,能夠調用 reconnect() 用現有的參數從新鏈接。調用 disconnect() 函數能夠從 broker 斷開鏈接,斷開鏈接後,會執行 on_disconnect() 回調函數。

網絡循環

網絡循環的函數有四種,它們運行在後臺,處理收發的消息。最基本的是 loop() :

loop(timeout=1.0, max_packets=1)

這個函數會經過 select() 函數阻塞,直到有消息須要收發,阻塞的時間用 timeout 參數設置,不能超過心跳時間 keepalive ,不然你的 client 會定時從 broker 斷開。max_packets 參數已通過時,無需設置。 另外一個循環函數是 loop_forever() ,它會一直阻塞,直到 client 調用了 disconnect() ,而且,它會自動重連:

loop_forever(timeout=1.0, max_packets=1, retry_first_connection=False)

timeout 和 max_packets 參數已通過時,無需設置。

發佈

publish(topic, payload=None, qos=0, retain=False)

向指定話題發送一條消息,參數的含義:

  1. topic ,這條消息所屬的話題
  2. payload ,消息內容,字符串類型,若是設爲 None ,會發送一條長度爲 0 消息。若是設置了 int 或者 3. float 類型的值,會當作字符串發送,若是你想發送真正的 int 或者 float 值,須要用 struct.pack() 生成消息。
  3. qos ,消息的安全等級
  4. retain ,若是設爲 Ture ,這條消息會被設爲保留消息

若是參數設置錯誤,會拋出 ValueError 異常。消息發送成功後,會執行 on_publish() 回調函數。

訂閱

subscribe(topic, qos=0)

向 broker 訂閱話題,參數 topic 設置話題名稱,qos 設置安全等級。若是隻訂閱一個話題,直接設置兩個參數便可,例如:

subscribe(("my/topic", 1))

若是要訂閱多個話題,能夠將每一個話題放在一個元組中,多個話題組成一個列表:

subscribe([("my/topic", 0), ("another/topic", 2)])

當 broker 確認訂閱有效後,client 會執行 on_subscribe() 回調函數。若是要取消訂閱某個話題,能夠調用 unsubscribe(topic) ,參數是字符串型,若是是取消多個話題,參數應該是一個字符串列表。取消成功的話,會執行 on_unsubscribe() 回調函數。

回調函數

當 broker 對 client 的鏈接請求作出迴應時,會調用 on_connect() 回調函數,能夠在該函數中判斷鏈接是否成功:

on_connect(client, userdata, flags, rc)

參數 client 是當前 client 的實例,userdata 是 Client() 或 userdata_set() 設置的用戶數據。flags 是 broker 發送的迴應 flags ,字典類型。rc 表示鏈接結果,整數型,0 表示鏈接成功,鏈接失敗可能的值有:

  1. 錯誤的協議版本
  2. 無效的 client ID
  3. 服務器不可用
  4. 錯誤的用戶名或密碼
  5. 沒法驗證

使用實例:

def on_connect(client, userdata, flags, rc):
    print("Connection returned result: "+connack_string(rc))

mqttc.on_connect = on_connect
...

對應的,與 broker 斷開鏈接後,會執行 on_disconnect() 回調函數:

on_disconnect(client, userdata, rc)

rc 表示斷開鏈接的狀態,若是是 0 ,表示是調用了 disconnect() 引發的斷開鏈接,其餘結果表示意外斷開,好比網絡中斷。使用實例:

def on_disconnect(client, userdata, rc):
    if rc != 0:
        print("Unexpected disconnection.")

mqttc.on_disconnect = on_disconnect
...

當 client 接收到已訂閱的話題的消息時,會調用 on_message() 回調函數,在該函數中判斷是哪一個話題的消息,並處理消息內容:

on_message(client, userdata, message)

參數 message 是 MQTTMessage 類的實例,這個類包含的成員有 topic ,payload ,qos ,retain 。使用實例:

def on_message(client, userdata, message):
    print("Received message '" + str(message.payload) + "' on topic '"
        + message.topic + "' with QoS " + str(message.qos))

mqttc.on_message = on_message
...

若是要用通配符同時處理多個話題的消息,例如用 sensors/# 匹配 sensors/temperature 和 sensors/humidity 話題,能夠用 message_callback_add() 設置回調函數:

message_callback_add(sub, callback)

參數 sub 是一個使用通配符的話題過濾器,字符串型,用 callback 參數指定回掉函數,與 on_message() 相同的類型。 若是同時設置了 on_message() 和 message_callback_add() 回調函數,會首先尋找合適的 message_callback_add() 定義的話題過濾器,若是沒有匹配,纔會調用 on_message() 。

實例

假設 broker 要求提供用戶名、密碼、證書和密鑰,下面是一個簡單的 client 例子:

$ cat path-mqtt.py
#!/usr/bin/python

import paho.mqtt.client as mqtt

cafile = "/etc/mosquitto/ca/ca.crt"
certfile = "/home/ubuntu/CA/client.crt"
keyfile = "/home/ubuntu/CA/client.key"
user = "guest"
passwd = "12345678"
server = "localhost"
port = 8883

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("$SYS/broker/version")

def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))

client = mqtt.Client()
client.tls_set(cafile,certfile,keyfile)
client.username_pw_set(user,passwd)
client.on_connect = on_connect
client.on_message = on_message

client.connect(server, port, 60)

client.loop_forever()

執行:

$ ./path-mqtt.py
Connected with result code 0
$SYS/broker/version mosquitto version 1.4.11

簡單實例

# pip install paho-mqtt
import paho.mqtt.client as mqtt
import time

server = "192.168.1.168"  # mqtt地址
port = 1883  # mqtt端口

def on_connect(client,userdata,flags,rc):
    """
    mqtt鏈接成功的回調
    """
    print("MQTT服務器鏈接返回碼:" + str(rc))
    client.subscribe([("/wjw/znqs/123456789/1/down",0),("/wjw/znqs/123456789/2/down",0)])
    

def on_message(client,userdata,msg):
    """
    mqtt收到訂閱消息回調
    """
    print(msg.topic +" --> "+(msg.payload).decode())



client = mqtt.Client()
client.on_connect = on_connect # 綁定mqtt鏈接回調
client.on_message = on_message # 綁定mqtt訂閱消息回調
client.connect(server,port,60) # 鏈接mqtt服務
client.publish("/wjw/znqs/123456789/1/up", payload="1 2 3 4 5 6 7", qos=0, retain=False)
print(time.time() , "   /wjw/znqs/123456789/1/up ---> 1 2 3 4 5 6 7")

client.loop_forever() # 阻塞函數

"""
while 1:
    client.publish("/wjw/znqs/123456789/1/up", payload="1 2 3 4 5 6 7", qos=0, retain=False)
    print(time.time() , "   /wjw/znqs/123456789/1/up ---> 1 2 3 4 5 6 7")
    time.sleep(5)

"""

案例

https://blog.csdn.net/lhh08hasee/article/details/88025436

相關文章
相關標籤/搜索