經常使用 MQTT 客戶端庫簡介

前言

MQTT 是一個輕量的發佈訂閱模式消息傳輸協議,專門針對低帶寬和不穩定網絡環境的物聯網應用設計。MQTT 基於發佈/訂閱範式,工做在 TCP/IP協議族上,MQTT 協議輕量、簡單、開放並易於實現,這些特色使它適用範圍很是普遍。javascript

MQTT 基於客戶端-服務器通訊模式,MQTT 服務端稱爲 MQTT Broker,目前行業內可選的 MQTT Broker 較多,其優劣與功能差異比較本文再也不贅述。本文以開源社區中最流行的 MQTT 消息服務器 EMQ X 爲例,使用 EMQ 提供的公共 Broker broker.emqx.io ,經過一個簡單客戶端鏈接 Broker 併發布、處理消息的例子,整理總結不一樣編程語言、平臺下 MQTT 客戶端庫的使用方式與樣例。html

入選客戶端庫以下:java

  • Eclipse Paho C 與 Eclipse Paho Embedded C
  • Eclipse Paho Java Client
  • Eclipse Paho MQTT Go client
  • emqtt : EMQ 提供的 Erlang MQTT 客戶端庫
  • MQTT.js Web 端 & Node.js 平臺 MQTT 客戶端
  • Eclipse Paho Python

MQTT 社區收錄了許多 MQTT 客戶端庫,讀者能夠在此處查看。node

樣例應用介紹

MQTT 客戶端整個生命週期的行爲能夠歸納爲:創建鏈接、訂閱主題、接收消息並處理、向指定主題發佈消息、取消訂閱、斷開鏈接。python

標準的客戶端庫在每一個環節都暴露出相應的方法,不一樣庫在相同環節所需方法參數含義大體相同,具體選用哪些參數、啓用哪些功能特性須要用戶深刻了解 MQTT 協議特性並結合實際應用場景而定。git

本文以一個客戶端鏈接併發布、處理消息爲例,給出每一個環節大體須要使用的參數:github

  • 創建鏈接
    • 指定 MQTT Broker 基本信息接入地址與端口
    • 指定傳輸類型是 TCP 仍是 MQTT over WebSocket
    • 若是啓用 TLS 須要選擇協議版本並攜帶相應的的證書
    • Broker 啓用了認證鑑權則客戶端須要攜帶相應的 MQTT Username Password 信息
    • 配置客戶端參數如 keepalive 時長、clean session 回話保留標誌位、MQTT 協議版本、遺囑消息(LWT)等
  • 訂閱主題:鏈接創建成功後能夠訂閱主題,須要指定主題信息
    • 指定主題過濾器 Topic,訂閱的時候支持主題通配符 +# 的使用
    • 指定 QoS,根據客戶端庫和 Broker 的實現可選 Qos 0 1 2,注意部分 Broker 與雲服務提供商不支持部分 QoS 級別,如 AWS IoT 、阿里雲 IoT 套件、Azure IoT Hub 均不支持 QoS 2 級別消息
    • 訂閱主題可能由於網絡問題、Broker 端 ACL 規則限制而失敗
  • 接收消息並處理
    • 通常是在鏈接時指定處理函數,依據客戶端庫與平臺的網絡編程模型不一樣此部分處理方式略有不一樣
  • 發佈消息:向指定主題發佈消息
    • 指定目標主題,注意該主題不能包含通配符 +#,若主題中包含通配符可能會致使消息發佈失敗、客戶端斷開等狀況(視 Broker 與客戶端庫實現方式)
    • 指定消息 QoS 級別,一樣存在不一樣 Broker 與平臺支持的 QoS 級別不一樣,如 Azure IoT Hub 發佈 QoS 2 的消息將斷開客戶端鏈接
    • 指定消息體內容,消息體內容大小不能超出 Broker 設置最大消息大小
    • 指定消息 Retain 保留消息標誌位
  • 取消訂閱
    • 指定目標主題便可
  • 斷開鏈接
    • 主動斷開鏈接,將發佈遺囑消息(LWT)

Eclipse Paho C 與 Eclipse Paho Embedded C

Eclipse Paho CEclipse Paho Embedded C 均爲 Eclipse Paho 項目下的客戶端庫,均爲使用 ANSI C 編寫的功能齊全的 MQTT 客戶端,Eclipse Paho Embedded C 能夠在桌面操做系統上使用,但主要針對 mbedArduinoFreeRTOS 等嵌入式環境。golang

該客戶端有同步/異步兩種 API ,分別以 MQTTClient 和 MQTTAsync 開頭:web

  • 同步 API 旨在更簡單,更有用,某些調用將阻塞直到操做完成爲止,使用編程上更加容易;
  • 異步 API 中只有一個調用塊 API-waitForCompletion ,經過回調進行結果通知,更適用於非主線程的環境。

兩個庫的下載、使用詳細說明請移步至項目主頁查看,本文使用 Eclipse Paho C,直接提供樣例代碼以下:shell

#include "stdio.h"
#include "stdlib.h"
#include "string.h"

#include "MQTTClient.h"

#define ADDRESS     "tcp://broker.emqx.io:1883"
#define CLIENTID    "emqx_test"
#define TOPIC       "testtopic/1"
#define PAYLOAD     "Hello World!"
#define QOS         1
#define TIMEOUT     10000L

int main(int argc, char* argv[])
{
    MQTTClient client;
    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    MQTTClient_message pubmsg = MQTTClient_message_initializer;
    MQTTClient_deliveryToken token;
    int rc;

    MQTTClient_create(&client, ADDRESS, CLIENTID,
        MQTTCLIENT_PERSISTENCE_NONE, NULL);
  
    // Connection parameters
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;

    if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
    {
        printf("Failed to connect, return code %d\n", rc);
        exit(-1);
    }
  
    // Publish message
    pubmsg.payload = PAYLOAD;
    pubmsg.payloadlen = strlen(PAYLOAD);
    pubmsg.qos = QOS;
    pubmsg.retained = 0;
    MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
    printf("Waiting for up to %d seconds for publication of %s\n"
            "on topic %s for client with ClientID: %s\n",
            (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
    rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
    printf("Message with delivery token %d delivered\n", token);
  
    // Disconnect
    MQTTClient_disconnect(client, 10000);
    MQTTClient_destroy(&client);
    return rc;
}

Eclipse Paho Java Client

Eclipse Paho Java Client 是用 Java 編寫的 MQTT 客戶端庫,可用於 JVM 或其餘 Java 兼容平臺(例如Android)。

Eclipse Paho Java Client 提供了MqttAsyncClient 和 MqttClient 異步和同步 API。

經過 Maven 安裝:

<dependency>
  <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>

鏈接樣例代碼以下:

App.java

package io.emqx;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


public class App {
    public static void main(String[] args) {
        String subTopic = "testtopic/#";
        String pubTopic = "testtopic/1";
        String content = "Hello World";
        int qos = 2;
        String broker = "tcp://broker.emqx.io:1883";
        String clientId = "emqx_test";
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);

            // Connection options
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName("emqx_test");
            connOpts.setPassword("emqx_test_password".toCharArray());
            // Retain connection
            connOpts.setCleanSession(true);

            // Set callback
            client.setCallback(new PushCallback());

            // Setup connection
            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);

            System.out.println("Connected");
            System.out.println("Publishing message: " + content);

            // Publish
            client.subscribe(subTopic);

            // Required parameters for publishing message
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            client.publish(pubTopic, message);
            System.out.println("Message published");

            client.disconnect();
            System.out.println("Disconnected");
            client.close();
            System.exit(0);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
}

回調消息處理類 OnMessageCallback.java

package io.emqx;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class OnMessageCallback implements MqttCallback {
    public void connectionLost(Throwable cause) {
        // Reconnect after lost connection.
        System.out.println("Connection lost, and re-connect here.");
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // Message handler after receiving message
        System.out.println("Topic:" + topic);
        System.out.println("QoS:" + message.getQos());
        System.out.println("Payload:" + new String(message.getPayload()));
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }
}

Eclipse Paho MQTT Go client

Eclipse Paho MQTT Go Client 爲 Eclipse Paho 項目下的 Go 語言版客戶端庫,該庫可以鏈接到 MQTT Broker 以發佈消息,訂閱主題並接收已發佈的消息,支持徹底異步的操做模式。

客戶端依賴於 Google 的 proxywebsockets 軟件包,經過如下命令完成安裝:

go get github.com/eclipse/paho.mqtt.golang

鏈接樣例代碼以下:

package main

import (
    "fmt"
    "log"
    "os"
    "time"

    "github.com/eclipse/paho.mqtt.golang"
)

var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    fmt.Printf("TOPIC: %s\n", msg.Topic())
    fmt.Printf("MSG: %s\n", msg.Payload())
}

func main() {
    mqtt.DEBUG = log.New(os.Stdout, "", 0)
    mqtt.ERROR = log.New(os.Stdout, "", 0)
    opts := mqtt.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883").SetClientID("emqx_test_client")
    
    opts.SetKeepAlive(60 * time.Second)
    // Message callback handler
    opts.SetDefaultPublishHandler(f)
    opts.SetPingTimeout(1 * time.Second)

    c := mqtt.NewClient(opts)
    if token := c.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    // Subscription
    if token := c.Subscribe("testtopic/#", 0, nil); token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
        os.Exit(1)
    }
    
    // Publish message
    token := c.Publish("testtopic/1", 0, false, "Hello World")
    token.Wait()

    time.Sleep(6 * time.Second)

    // Cancel subscription
    if token := c.Unsubscribe("testtopic/#"); token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
        os.Exit(1)
    }
  
    // Disconnect
    c.Disconnect(250)
    time.Sleep(1 * time.Second)
}

emqtt : EMQ 提供的 Erlang MQTT 客戶端庫

emqtt 是開源 MQTT Broker EMQ X 官方 EMQ 提供的客戶端庫,適用於 Erlang 語言。

Erlang 生態有多個 MQTT Broker 實現,如經過插件支持 MQTT 的 RabbitMQ ,VerenMQ、EMQ X 等。可是 MQTT 客戶端庫幾乎沒有選擇的餘地,MQTT 社區收錄的 Erlang 客戶端庫中 emqtt 是最佳選擇。

emqtt 徹底由 Erlang 實現,完成支持 MQTT v3.1.1 和 MQTT v5.0 協議版本,支持 SSL 單雙向認證與 WebSocket 鏈接。另外一款 MQTT 基準測試工具 emqtt_bench 就基於該客戶端庫構建。

emqtt 使用方式以下:

ClientId = <<"test">>.
{ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]).
{ok, _Props} = emqtt:connect(ConnPid).
Topic = <<"guide/#">>.
QoS = 1.
{ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, {Topic, QoS}).
{ok, _PktId} = emqtt:publish(ConnPid, <<"guide/1">>, <<"Hello World!">>, QoS).
%% If the qos of publish packet is 0, `publish` function would not return packetid.
ok = emqtt:publish(ConnPid, <<"guide/2">>, <<"Hello World!">>, 0).

%% Recursively get messages from mail box.
Y = fun (Proc) -> ((fun (F) -> F(F) end)((fun(ProcGen) -> Proc(fun() -> (ProcGen(ProcGen))() end) end))) end.
Rec = fun(Receive) -> fun()-> receive {publish, Msg} -> io:format("Msg: ~p~n", [Msg]), Receive(); _Other -> Receive() after 5 -> ok end end end.
(Y(Rec))().

%% If you don't like y combinator, you can also try named function to recursively get messages in erlang shell.
Receive = fun Rec() -> receive {publish, Msg} -> io:format("Msg: ~p~n", [Msg]), Rec(); _Other -> Rec() after 5 -> ok end end.
Receive().

{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, <<"guide/#">>).

ok = emqtt:disconnect(ConnPid).

MQTT.js Web 端 & Node.js 平臺 MQTT 客戶端

MQTT.js 是 JavaScript 編寫的,實現了 MQTT 協議客戶端功能的模塊,能夠在 Node.js 或瀏覽器環境中使用。在 Node.js 中使用時,便可以 -g 全局安裝以命令行的形式使用,又能夠將其集成到項目中調用。

因爲 JavaScript 單線程特性,MQTT.js 是全異步 MQTT 客戶端,MQTT.js 支持 MQTT 與 MQTT over WebSocket,在不一樣運行環境支持程度以下:

  • 瀏覽器環境:MQTT over WebSocket(包括微信小程序、支付寶小程序等定製瀏覽器環境)
  • Node.js 環境:MQTT、MQTT over WebSocket

不一樣環境裏除了少部分鏈接參數不一樣,其餘 API 均是相同的。

使用 npm 安裝:

npm i mqtt

使用 CDN 安裝(瀏覽器):

<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>
    // Initialize a global mqtt variable
    console.log(mqtt)
</script>

樣例代碼:

// const mqtt = require('mqtt')
import mqtt from 'mqtt'

// Connection option
const options = {
        clean: true, // Retain connection
      connectTimeout: 4000, // Timeout
      // Authtication
      clientId: 'emqx_test',
      username: 'emqx_test',
      password: 'emqx_test',
}

// Connection string
// ws: unsecured WebSocket
// wss: secured WebSocket connection
// mqtt: unsecured TCP connection
// mqtts: secured TCP connection
const connectUrl = 'wss://broker.emqx.io:8084/mqtt'
const client = mqtt.connect(connectUrl, options)

client.on('reconnect', (error) => {
    console.log('reconnect:', error)
})

client.on('reconnect', (error) => {
    console.log('reconnect:', error)
})

client.on('message', (topic, message) => {
  console.log('message:', topic, message.toString())
})

Eclipse Paho Python

Eclipse Paho Python 爲 Eclipse Paho 項目下的 Python 語言版客戶端庫,該庫可以鏈接到 MQTT Broker 以發佈消息,訂閱主題並接收已發佈的消息。

使用 PyPi 包管理工具安裝:

pip install paho-mqtt

代碼樣例:

import paho.mqtt.client as mqtt


# Successful Connection Callback
def on_connect(client, userdata, flags, rc):
    print('Connected with result code '+str(rc))
    client.subscribe('testtopic/#')

# Message delivery callback
def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))

client = mqtt.Client()

# Set callback handler
client.on_connect = on_connect
client.on_message = on_message

# Set up connection
client.connect('broker.emqx.io', 1883, 60)
# Publish message
client.publish('emqtt',payload='Hello World',qos=0)

client.loop_forever()

總結

關於 MQTT 協議、MQTT 客戶端庫使用流程、經常使用 MQTT 客戶端的簡介就到這裏,歡迎讀者經過 EMQ X 進行MQTT 學習、項目開發使用。

相關文章
相關標籤/搜索