MQTT 是一個輕量的發佈訂閱模式消息傳輸協議,專門針對低帶寬和不穩定網絡環境的物聯網應用設計。MQTT 基於發佈/訂閱範式,工做在 TCP/IP協議族上,MQTT 協議輕量、簡單、開放並易於實現,這些特色使它適用範圍很是普遍。javascript
MQTT 基於客戶端-服務器通訊模式,MQTT 服務端稱爲 MQTT Broker,目前行業內可選的 MQTT Broker 較多,其優劣與功能差異比較本文再也不贅述。本文以開源社區中最流行的 MQTT 消息服務器 EMQ X 爲例,使用 EMQ 提供的公共 Broker broker.emqx.io
,經過一個簡單客戶端鏈接 Broker 併發布、處理消息的例子,整理總結不一樣編程語言、平臺下 MQTT 客戶端庫的使用方式與樣例。html
入選客戶端庫以下:java
MQTT 社區收錄了許多 MQTT 客戶端庫,讀者能夠在此處查看。node
MQTT 客戶端整個生命週期的行爲能夠歸納爲:創建鏈接、訂閱主題、接收消息並處理、向指定主題發佈消息、取消訂閱、斷開鏈接。python
標準的客戶端庫在每一個環節都暴露出相應的方法,不一樣庫在相同環節所需方法參數含義大體相同,具體選用哪些參數、啓用哪些功能特性須要用戶深刻了解 MQTT 協議特性並結合實際應用場景而定。git
本文以一個客戶端鏈接併發布、處理消息爲例,給出每一個環節大體須要使用的參數:github
+
與 #
的使用+
或 #
,若主題中包含通配符可能會致使消息發佈失敗、客戶端斷開等狀況(視 Broker 與客戶端庫實現方式)Eclipse Paho C 與 Eclipse Paho Embedded C 均爲 Eclipse Paho 項目下的客戶端庫,均爲使用 ANSI C 編寫的功能齊全的 MQTT 客戶端,Eclipse Paho Embedded C 能夠在桌面操做系統上使用,但主要針對 mbed,Arduino和 FreeRTOS 等嵌入式環境。golang
該客戶端有同步/異步兩種 API ,分別以 MQTTClient 和 MQTTAsync 開頭:web
API-waitForCompletion
,經過回調進行結果通知,更適用於非主線程的環境。兩個庫的下載、使用詳細說明請移步至項目主頁查看,本文使用 Eclipse Paho C,直接提供樣例代碼以下:shell
#include "stdio.h" #include "stdlib.h" #include "string.h" #include "MQTTClient.h" #define ADDRESS "tcp://broker.emqx.io:1883" #define CLIENTID "emqx_test" #define TOPIC "testtopic/1" #define PAYLOAD "Hello World!" #define QOS 1 #define TIMEOUT 10000L int main(int argc, char* argv[]) { MQTTClient client; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_deliveryToken token; int rc; MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); // Connection parameters conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(-1); } // Publish message pubmsg.payload = PAYLOAD; pubmsg.payloadlen = strlen(PAYLOAD); pubmsg.qos = QOS; pubmsg.retained = 0; MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token); printf("Waiting for up to %d seconds for publication of %s\n" "on topic %s for client with ClientID: %s\n", (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID); rc = MQTTClient_waitForCompletion(client, token, TIMEOUT); printf("Message with delivery token %d delivered\n", token); // Disconnect MQTTClient_disconnect(client, 10000); MQTTClient_destroy(&client); return rc; }
Eclipse Paho Java Client 是用 Java 編寫的 MQTT 客戶端庫,可用於 JVM 或其餘 Java 兼容平臺(例如Android)。
Eclipse Paho Java Client 提供了MqttAsyncClient 和 MqttClient 異步和同步 API。
經過 Maven 安裝:
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency>
鏈接樣例代碼以下:
App.java
package io.emqx; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class App { public static void main(String[] args) { String subTopic = "testtopic/#"; String pubTopic = "testtopic/1"; String content = "Hello World"; int qos = 2; String broker = "tcp://broker.emqx.io:1883"; String clientId = "emqx_test"; MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient client = new MqttClient(broker, clientId, persistence); // Connection options MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName("emqx_test"); connOpts.setPassword("emqx_test_password".toCharArray()); // Retain connection connOpts.setCleanSession(true); // Set callback client.setCallback(new PushCallback()); // Setup connection System.out.println("Connecting to broker: " + broker); client.connect(connOpts); System.out.println("Connected"); System.out.println("Publishing message: " + content); // Publish client.subscribe(subTopic); // Required parameters for publishing message MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); client.publish(pubTopic, message); System.out.println("Message published"); client.disconnect(); System.out.println("Disconnected"); client.close(); System.exit(0); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } } }
回調消息處理類 OnMessageCallback.java
package io.emqx; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class OnMessageCallback implements MqttCallback { public void connectionLost(Throwable cause) { // Reconnect after lost connection. System.out.println("Connection lost, and re-connect here."); } public void messageArrived(String topic, MqttMessage message) throws Exception { // Message handler after receiving message System.out.println("Topic:" + topic); System.out.println("QoS:" + message.getQos()); System.out.println("Payload:" + new String(message.getPayload())); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } }
Eclipse Paho MQTT Go Client 爲 Eclipse Paho 項目下的 Go 語言版客戶端庫,該庫可以鏈接到 MQTT Broker 以發佈消息,訂閱主題並接收已發佈的消息,支持徹底異步的操做模式。
客戶端依賴於 Google 的 proxy 和 websockets 軟件包,經過如下命令完成安裝:
go get github.com/eclipse/paho.mqtt.golang
鏈接樣例代碼以下:
package main import ( "fmt" "log" "os" "time" "github.com/eclipse/paho.mqtt.golang" ) var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { fmt.Printf("TOPIC: %s\n", msg.Topic()) fmt.Printf("MSG: %s\n", msg.Payload()) } func main() { mqtt.DEBUG = log.New(os.Stdout, "", 0) mqtt.ERROR = log.New(os.Stdout, "", 0) opts := mqtt.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883").SetClientID("emqx_test_client") opts.SetKeepAlive(60 * time.Second) // Message callback handler opts.SetDefaultPublishHandler(f) opts.SetPingTimeout(1 * time.Second) c := mqtt.NewClient(opts) if token := c.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } // Subscription if token := c.Subscribe("testtopic/#", 0, nil); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) os.Exit(1) } // Publish message token := c.Publish("testtopic/1", 0, false, "Hello World") token.Wait() time.Sleep(6 * time.Second) // Cancel subscription if token := c.Unsubscribe("testtopic/#"); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) os.Exit(1) } // Disconnect c.Disconnect(250) time.Sleep(1 * time.Second) }
emqtt 是開源 MQTT Broker EMQ X 官方 EMQ 提供的客戶端庫,適用於 Erlang 語言。
Erlang 生態有多個 MQTT Broker 實現,如經過插件支持 MQTT 的 RabbitMQ ,VerenMQ、EMQ X 等。可是 MQTT 客戶端庫幾乎沒有選擇的餘地,MQTT 社區收錄的 Erlang 客戶端庫中 emqtt 是最佳選擇。
emqtt 徹底由 Erlang 實現,完成支持 MQTT v3.1.1 和 MQTT v5.0 協議版本,支持 SSL 單雙向認證與 WebSocket 鏈接。另外一款 MQTT 基準測試工具 emqtt_bench 就基於該客戶端庫構建。
emqtt 使用方式以下:
ClientId = <<"test">>. {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]). {ok, _Props} = emqtt:connect(ConnPid). Topic = <<"guide/#">>. QoS = 1. {ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, {Topic, QoS}). {ok, _PktId} = emqtt:publish(ConnPid, <<"guide/1">>, <<"Hello World!">>, QoS). %% If the qos of publish packet is 0, `publish` function would not return packetid. ok = emqtt:publish(ConnPid, <<"guide/2">>, <<"Hello World!">>, 0). %% Recursively get messages from mail box. Y = fun (Proc) -> ((fun (F) -> F(F) end)((fun(ProcGen) -> Proc(fun() -> (ProcGen(ProcGen))() end) end))) end. Rec = fun(Receive) -> fun()-> receive {publish, Msg} -> io:format("Msg: ~p~n", [Msg]), Receive(); _Other -> Receive() after 5 -> ok end end end. (Y(Rec))(). %% If you don't like y combinator, you can also try named function to recursively get messages in erlang shell. Receive = fun Rec() -> receive {publish, Msg} -> io:format("Msg: ~p~n", [Msg]), Rec(); _Other -> Rec() after 5 -> ok end end. Receive(). {ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, <<"guide/#">>). ok = emqtt:disconnect(ConnPid).
MQTT.js 是 JavaScript 編寫的,實現了 MQTT 協議客戶端功能的模塊,能夠在 Node.js 或瀏覽器環境中使用。在 Node.js 中使用時,便可以 -g
全局安裝以命令行的形式使用,又能夠將其集成到項目中調用。
因爲 JavaScript 單線程特性,MQTT.js 是全異步 MQTT 客戶端,MQTT.js 支持 MQTT 與 MQTT over WebSocket,在不一樣運行環境支持程度以下:
不一樣環境裏除了少部分鏈接參數不一樣,其餘 API 均是相同的。
使用 npm 安裝:
npm i mqtt
使用 CDN 安裝(瀏覽器):
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script> <script> // Initialize a global mqtt variable console.log(mqtt) </script>
樣例代碼:
// const mqtt = require('mqtt') import mqtt from 'mqtt' // Connection option const options = { clean: true, // Retain connection connectTimeout: 4000, // Timeout // Authtication clientId: 'emqx_test', username: 'emqx_test', password: 'emqx_test', } // Connection string // ws: unsecured WebSocket // wss: secured WebSocket connection // mqtt: unsecured TCP connection // mqtts: secured TCP connection const connectUrl = 'wss://broker.emqx.io:8084/mqtt' const client = mqtt.connect(connectUrl, options) client.on('reconnect', (error) => { console.log('reconnect:', error) }) client.on('reconnect', (error) => { console.log('reconnect:', error) }) client.on('message', (topic, message) => { console.log('message:', topic, message.toString()) })
Eclipse Paho Python 爲 Eclipse Paho 項目下的 Python 語言版客戶端庫,該庫可以鏈接到 MQTT Broker 以發佈消息,訂閱主題並接收已發佈的消息。
使用 PyPi 包管理工具安裝:
pip install paho-mqtt
代碼樣例:
import paho.mqtt.client as mqtt # Successful Connection Callback def on_connect(client, userdata, flags, rc): print('Connected with result code '+str(rc)) client.subscribe('testtopic/#') # Message delivery callback def on_message(client, userdata, msg): print(msg.topic+" "+str(msg.payload)) client = mqtt.Client() # Set callback handler client.on_connect = on_connect client.on_message = on_message # Set up connection client.connect('broker.emqx.io', 1883, 60) # Publish message client.publish('emqtt',payload='Hello World',qos=0) client.loop_forever()
關於 MQTT 協議、MQTT 客戶端庫使用流程、經常使用 MQTT 客戶端的簡介就到這裏,歡迎讀者經過 EMQ X 進行MQTT 學習、項目開發使用。