pip3 install paho-mqtt
clientpython
#!/usr/bin/env python #coding=utf-8 import json import sys import os import paho.mqtt.client as mqtt import time sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..')) sys.path.append("..") TASK_TOPIC = 'test' # 客戶端發佈消息主題 client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time())) client = mqtt.Client(client_id, transport='tcp') client.connect("127.0.0.1", 1883, 60) # 此處端口默認爲1883,通訊端口期keepalive默認60 client.loop_start() def clicent_main(message: str): """ 客戶端發佈消息 :param message: 消息主體 :return: """ time_now = time.strftime('%Y-%m-%d %H-%M-%S', time.localtime(time.time())) payload = {"msg": "%s" % message, "data": "%s" % time_now} # publish(主題:Topic; 消息內容) client.publish(TASK_TOPIC, json.dumps(payload, ensure_ascii=False)) print("Successful send message!") return True if __name__ == '__main__': msg = "我是一條測試數據!" clicent_main(msg)
serverjson
#!/usr/bin/env python #coding=utf-8 import json import sys import os import time import paho.mqtt.client as mqtt sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..')) sys.path.append("..") REPORT_TOPIC = 'test' # 主題 def on_connect(client, userdata, flags, rc): print('connected to mqtt with resurt code ', rc) client.subscribe(REPORT_TOPIC) # 訂閱主題 def on_message(client, userdata, msg): """ 接收客戶端發送的消息 :param client: 鏈接信息 :param userdata: :param msg: 客戶端返回的消息 :return: """ print("Start server!") payload = json.loads(msg.payload.decode('utf-8')) print(payload) def server_conenet(client): client.on_connect = on_connect # 啓用訂閱模式 client.on_message = on_message # 接收消息 client.connect("127.0.0.1", 1883, 60) # 連接 # client.loop_start() # 以start方式運行,須要啓動一個守護線程,讓服務端運行,不然會隨主線程死亡 client.loop_forever() # 以forever方式阻塞運行。 def server_stop(client): client.loop_stop() # 中止服務端 sys.exit(0) def server_main(): client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time())) client = mqtt.Client(client_id, transport='tcp') server_conenet(client) if __name__ == '__main__': # 啓動監聽 server_main()
參考:app