轉MQTT--Python進行發佈、訂閱測試

前言

 使用python編寫程序進行測試MQTT的發佈和訂閱功能。首先要安裝:pip install paho-mqttpython

測試發佈(pub)

 個人MQTT部署在阿里雲的服務器上面,因此我在本機上編寫了python程序進行測試。shell

而後在shell裏面從新打開一個終端,訂閱一個主題爲「chat」 mosquitto_sub -t chat服務器

 在本機上測試遠程的MQTT的發佈功能就是把本身做爲一個發送信息的人,當本身發送信息的時候,全部訂閱過該主題(topic)的對象都將收到本身發送的信息。 
mqtt_client.py網絡

# encoding: utf-8

import paho.mqtt.client as mqtt

HOST = "101.200.46.138"
PORT = 1883

def test():
    client = mqtt.Client()
    client.connect(HOST, PORT, 60)
    client.publish("chat","hello liefyuan",2) # 發佈一個主題爲'chat',內容爲‘hello liefyuan’的信息
    client.loop_forever()

if __name__ == '__main__':
    test()

  

 

註解函數:函數

client.connect(self, host, port, keepalive, bind_address)

client.publish(self, topic, payload, qos, retain) ---保留(retain)

client.subscribe(self, topic, qos)

 

注: MQTT傳輸的消息分爲:主題(Topic)和負載(payload)兩部分:oop

(1)Topic,能夠理解爲消息的類型,訂閱者訂閱(Subscribe)後,就會收到該主題的消息內容(payload);測試

(2)payload,能夠理解爲消息的內容,是指訂閱者具體要使用的內容。ui

(3)當應用數據經過MQTT網絡發送時,MQTT會把與之相關的服務質量(QoS)和主題名(Topic)相關連。阿里雲

測試訂閱(sub)

 在本機上編寫程序測試訂閱功能,就是讓本身的程序做爲一個接收者,同一個主題沒有發佈(pub)信息的時候,就本身一直等候。spa

# encoding: utf-8


import paho.mqtt.client as mqtt


def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("chat")


def on_message(client, userdata, msg):
    print(msg.topic+" " + ":" + str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("www.liefyuan.top", 1883, 60)
client.loop_forever()
相關文章
相關標籤/搜索