玩轉PubSubClient MQTT庫

1.前言

    在ESP8266學習系列中,博主一直使用HTTP協議。HTTP鏈接屬於短鏈接,而在物聯網應用中,普遍應用的倒是MQTT協議。因此,本篇咱們將學習Arduino平臺上的MQTT實現庫 —— PubSubClient。git

2.MQTT協議

2.1 簡介

    MQTT協議(Message Queuing Telemetry Transport),翻譯過來就是遙信消息隊列傳輸,是IBM公司於1999年提出的,如今最新版本是3.1.1。MQTT是一個基於TCP的發佈訂閱協議,設計的初始目的是爲了極有限的內存設備和網絡帶寬很低的網絡不可靠的通訊,很是適合物聯網通訊。數組

image

    MQTT屬於應用層協議,基於TCP協議,確保了可靠性。博主在這裏不會去詳細講述MQTT協議(網上講解MQTT協議內容不少,不須要重複),但願有興趣的讀者自行去閱讀,可參考 MQTT中文文檔服務器

    MQTT通訊模型以下:網絡

image

  • 發佈方(Publisher)將消息發送到 Broker(MQTT服務器);
  • Broker 接收到消息之後,檢查下都有哪些訂閱方訂閱了此類消息,而後將消息發送到這些訂閱方;
  • 訂閱方(Subscriber)從 Broker 獲取該消息。

2.2 MQTT消息的QOS

    MQTT消息支持三種QOS等級:數據結構

  • QoS 0:「最多一次」,消息發佈徹底依賴底層 TCP/IP 網絡。分發的消息可能丟失或重複。例如,這個等級可用於環境傳感器數據,單次的數據丟失不要緊,由於不久後還會有第二次發送。
  • QoS 1:「至少一次」,確保消息能夠到達,但消息可能會重複。
  • QoS 2:「只有一次」,確保消息只到達一次。例如,這個等級可用在一個計費系統中,這裏若是消息重複或丟失會致使不正確的收費。

2.3 MQTT控制報文格式

    MQTT控制報文由三部分組成:less

  • 固定報頭(Fixed header),每一個MQTT控制報文都包含一個固定報頭;固定報頭指明控制報文類型、標誌Flags、剩餘長度三大部分。
  • 可變報頭(Variable header),某些MQTT控制報文包含一個可變報頭部分;它在固定報頭和有效載荷之間;可變報頭的內容根據報文類型的不一樣而不一樣,一般包括 報文標識符(Packet Identifier);
  • 有效載荷(Payload),某些MQTT控制報文在報文的最後部分包含一個有效載荷,也就是攜帶的數據信息;

    總體上說,MQTT總體控制報文協議就是:dom

固定報頭(必定有) + 可變報頭(部分有) + 有效載荷(部分有)ide

    數據結構簡單,傳輸數據量小,這也是爲何能應用於物聯網應用的緣由之一。函數

2.4 MQTT控制報文

2.4.1 CONNECT – 鏈接服務端

注意點:工具

  • 客戶端到服務端的網絡鏈接創建後,客戶端發送給服務端的第一個報文必須是CONNECT報文;
  • 在一個網絡鏈接上,客戶端只能發送一次CONNECT報文。服務端必須將客戶端發送的第二個CONNECT報文看成協議違規處理並斷開客戶端的鏈接;
  • 有效載荷包含一個或多個編碼的字段。包括客戶端的惟一標識符,Will主題,Will消息,用戶名和密碼。除了客戶端標識以外,其它的字段都是可選的,基於標誌位來決定可變報頭中是否須要包含這些字段。

報文格式:

固定報頭 + 可變報頭 + 有效載荷

  • 固定報頭: MQTTCONNECT(1 << 4)
  • 可變報頭: 協議名(Protocol Name),協議級別(Protocol Level),鏈接標誌(Connect Flags)和保持鏈接(Keep Alive)
  • 有效載荷: 客戶端標識符,遺囑主題,遺囑消息,用戶名,密碼

2.4.2 CONNACK —— 確認鏈接請求

注意點:

  • 服務端發送CONNACK報文響應從客戶端收到的CONNECT報文。服務端發送給客戶端的第一個報文必須是CONNACK;
  • 若是客戶端在合理的時間內沒有收到服務端的CONNACK報文,客戶端應該關閉網絡鏈接。合理 的時間取決於應用的類型和通訊基礎設施;

報文格式:

固定報頭 + 可變報頭

  • 固定報頭: MQTTCONNACK(2 << 4)
  • 可變報頭: 鏈接確認標誌 + 鏈接返回碼

2.4.3 PUBLISH —— 發佈消息

注意點:

  • PUBLISH控制報文是指從客戶端向服務端或者服務端向客戶端傳輸一個應用消息;

報文格式:

固定報頭 + 可變報頭 + 有效載荷

  • 固定報頭: MQTTPUBLISH(3 << 4),重發標誌 DUP,服務質量等級 QoS,保留標誌 RETAIN,剩餘長度字段
  • 可變報頭: 主題名和報文標識符
  • 有效載荷: 被髮布的應用消息

2.4.4 PUBACK —— 發佈確認

注意點:

  • PUBACK報文是對QoS 1等級的PUBLISH報文的響應

報文格式:

固定報頭 + 可變報頭

  • 固定報頭: MQTTPUBACK(4 << 4),剩餘長度字段
  • 可變報頭: 包含等待確認的PUBLISH報文的報文標識符

2.4.5 PUBREC —— 發佈收到(QoS 2,第一步)

注意點:

  • PUBREC報文是對QoS等級2的PUBLISH報文的響應。它是QoS 2等級協議交換的第二個報文。

報文格式:

固定報頭 + 可變報頭

  • 固定報頭: MQTTPUBREC(5 << 4),剩餘長度字段
  • 可變報頭: 包含等待確認的PUBLISH報文的報文標識符

2.4.6 PUBREL —— 發佈釋放(QoS 2,第二步)

注意點:

  • PUBREL報文是對PUBREC報文的響應。它是QoS 2等級協議交換的第三個報文。

報文格式:

固定報頭 + 可變報頭

  • 固定報頭: MQTTPUBREL(6 << 4),剩餘長度字段
  • 可變報頭: 包含與等待確認的PUBREC報文相同的報文標識符。

2.4.7 PUBCOMP —— 發佈完成(QoS 2,第三步)

注意點:

  • PUBCOMP報文是對PUBREL報文的響應。它是QoS 2等級協議交換的第四個也是最後一個報文。

報文格式:

固定報頭 + 可變報頭

  • 固定報頭: MQTTPUBCOMP(7 << 4),剩餘長度字段
  • 可變報頭: 包含與等待確認的PUBREL報文相同的報文標識符

2.4.8 SUBSCRIBE —— 訂閱主題

注意點:

  • 客戶端向服務端發送SUBSCRIBE報文用於建立一個或多個訂閱
  • 爲了將應用消息轉發給與那些訂閱匹配的主題,服務端發送PUBLISH報文給客戶端
  • SUBSCRIBE報文也(爲每一個訂閱)指定了最大的QoS等級,服務端根據這個發送應用消息給客戶端

報文格式:

固定報頭 + 可變報頭 + 有效載荷

  • 固定報頭: MQTTSUBSCRIBE(8 << 4),剩餘長度字段
  • 可變報頭: 報文標識符
  • 有效載荷:包含了一個主題過濾器列表,它們表示客戶端想要訂閱的主題

2.4.9 SUBACK —— 訂閱確認

注意點:

  • 服務端發送SUBACK報文給客戶端,用於確認它已收到而且正在處理SUBSCRIBE報文
  • SUBACK報文包含一個返回碼清單,它們指定了SUBSCRIBE請求的每一個訂閱被授予的最大QoS等級

報文格式:

固定報頭 + 可變報頭 + 有效載荷

  • 固定報頭: MQTTSUBACK(9 << 4),剩餘長度字段
  • 可變報頭: 包含等待確認的SUBSCRIBE報文的報文標識符
  • 有效載荷:包含一個返回碼清單。每一個返回碼對應等待確認的SUBSCRIBE報文中的一個主題過濾器

2.4.10 UNSUBSCRIBE —— 取消訂閱

注意點:

  • 客戶端發送UNSUBSCRIBE報文給服務端,用於取消訂閱主題

報文格式:

固定報頭 + 可變報頭 + 有效載荷

  • 固定報頭: MQTTUNSUBSCRIBE(10 << 4),剩餘長度字段
  • 可變報頭: 報文標識符
  • 有效載荷:包含客戶端想要取消訂閱的主題過濾器列表

2.4.11 UNSUBACK —— 取消訂閱確認

注意點:

  • 服務端發送UNSUBACK報文給客戶端用於確認收到UNSUBSCRIBE報文

報文格式:

固定報頭 + 可變報頭

  • 固定報頭: MQTTUNSUBACK(11 << 4),剩餘長度字段
  • 可變報頭: 包含等待確認的UNSUBSCRIBE報文的報文標識符

2.4.12 PINGREQ —— 心跳請求

注意點:

  • 客戶端發送PINGREQ報文給服務端
  • 在沒有任何其它控制報文從客戶端發給服務端時,告知服務端客戶端還活着
  • 請求服務端發送 響應確認它還活着
  • 使用網絡以確認網絡鏈接沒有斷開
  • 保持鏈接(Keep Alive)處理中用到這個報文

報文格式:

固定報頭

  • 固定報頭: MQTTPINGREQ(12 << 4),剩餘長度字段

2.4.13 PINGRESP —— 心跳響應

注意點:

  • 服務端發送PINGRESP報文響應客戶端的PINGREQ報文
  • 保持鏈接(Keep Alive)處理中用到這個報文

報文格式:

固定報頭

  • 固定報頭: MQTTPINGRESP(13 << 4),剩餘長度字段

2.4.14 DISCONNECT —— 斷開鏈接

注意點:

  • DISCONNECT報文是客戶端發給服務端的最後一個控制報文。
  • 表示客戶端正常斷開鏈接。

報文格式:

固定報頭

  • 固定報頭: MQTTDISCONNECT(14 << 4),剩餘長度字段

3.PubSubClient —— ArduinoMQTT庫

    老規矩,上一個百度腦圖:

image

3.1 PubSubClient —— 初始化構造器

函數1說明

/**
 * 建立一個沒有初始化的PubSubClient對象
 */
PubSubClient::PubSubClient() {
    this->_state = MQTT_DISCONNECTED;
    this->_client = NULL;
    this->stream = NULL;
    setCallback(NULL);
}

在使用PubSubClient對象以前,必須配置完整的內容:

WiFiClient espClient;
PubSubClient client;

void setup() {
    client.setClient(espClient);
    client.setServer("broker.example.com",1883);
    // client is now configured for use
}

函數2說明

/**
 * 建立一個部分初始化的PubSubClient對象
 * @param client client實例
 */
PubSubClient::PubSubClient(Client& client) {
    this->_state = MQTT_DISCONNECTED;
    setClient(client);
    this->stream = NULL;
}

在使用PubSubClient對象以前,必須配置完整的內容:

WiFiClient espClient;
PubSubClient client(espClient);

void setup() {
    client.setServer("broker.example.com",1883);
    // client is now configured for use
}

函數3說明

/**
 * 建立完整初始化的PubSubClient對象
 * @param addr  mqtt服務器ip地址
 * @param post  mqtt服務器端口
 * @param client 客戶端實例
 */
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) {
    this->_state = MQTT_DISCONNECTED;
    setServer(addr, port);
    setClient(client);
    this->stream = NULL;
}

/**
 * 建立完整初始化的PubSubClient對象
 * @param addr  mqtt服務器ip地址
 * @param post  mqtt服務器端口
 * @param client 客戶端實例
 * @param stream 輸出流,會把收到的消息輸出到流中
 */
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) {
    this->_state = MQTT_DISCONNECTED;
    setServer(addr,port);
    setClient(client);
    setStream(stream);
}

/**
 * 建立完整初始化的PubSubClient對象
 * @param addr  mqtt服務器ip地址
 * @param post  mqtt服務器端口
 * @param MQTT_CALLBACK_SIGNATURE callback方法
 * @param client 客戶端實例
 */
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
    this->_state = MQTT_DISCONNECTED;
    setServer(addr, port);
    setCallback(callback);
    setClient(client);
    this->stream = NULL;
}

/**
 * 建立完整初始化的PubSubClient對象
 * @param addr  mqtt服務器ip地址
 * @param post  mqtt服務器端口
 * @param MQTT_CALLBACK_SIGNATURE callback方法
 * @param client 客戶端實例
 * @param stream 輸出流,會把收到的消息輸出到流中
 */
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
    this->_state = MQTT_DISCONNECTED;
    setServer(addr,port);
    setCallback(callback);
    setClient(client);
    setStream(stream);
}

/**
 * 建立完整初始化的PubSubClient對象
 * @param ip  mqtt服務器ip地址
 * @param post  mqtt服務器端口
 * @param client 客戶端實例
 */
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
    this->_state = MQTT_DISCONNECTED;
    setServer(ip, port);
    setClient(client);
    this->stream = NULL;
}

/**
 * 建立完整初始化的PubSubClient對象
 * @param ip  mqtt服務器ip地址
 * @param post  mqtt服務器端口
 * @param client 客戶端實例
 * @param stream 輸出流,會把收到的消息輸出到流中
 */
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) {
    this->_state = MQTT_DISCONNECTED;
    setServer(ip,port);
    setClient(client);
    setStream(stream);
}

/**
 * 建立完整初始化的PubSubClient對象
 * @param ip  mqtt服務器ip地址
 * @param post  mqtt服務器端口
 * @param client 客戶端實例
 * @param MQTT_CALLBACK_SIGNATURE callback方法
 */
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
    this->_state = MQTT_DISCONNECTED;
    setServer(ip, port);
    setCallback(callback);
    setClient(client);
    this->stream = NULL;
}

/**
 * 建立完整初始化的PubSubClient對象
 * @param ip  mqtt服務器ip地址
 * @param post  mqtt服務器端口
 * @param client 客戶端實例
 * @param MQTT_CALLBACK_SIGNATURE callback方法
 * @param stream 輸出流,會把收到的消息輸出到流中
 */
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
    this->_state = MQTT_DISCONNECTED;
    setServer(ip,port);
    setCallback(callback);
    setClient(client);
    setStream(stream);
}

/**
 * 建立完整初始化的PubSubClient對象
 * @param domain  mqtt服務器域名
 * @param post  mqtt服務器端口
 * @param client 客戶端實例
 */
PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) {
    this->_state = MQTT_DISCONNECTED;
    setServer(domain,port);
    setClient(client);
    this->stream = NULL;
}

/**
 * 建立完整初始化的PubSubClient對象
 * @param domain  mqtt服務器域名
 * @param post  mqtt服務器端口
 * @param client 客戶端實例
 * @param stream 輸出流,會把收到的消息輸出到流中
 */
PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) {
    this->_state = MQTT_DISCONNECTED;
    setServer(domain,port);
    setClient(client);
    setStream(stream);
}

/**
 * 建立完整初始化的PubSubClient對象
 * @param domain  mqtt服務器域名
 * @param post  mqtt服務器端口
 * @param client 客戶端實例
 * @param MQTT_CALLBACK_SIGNATURE callback方法
 */
PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
    this->_state = MQTT_DISCONNECTED;
    setServer(domain,port);
    setCallback(callback);
    setClient(client);
    this->stream = NULL;
}

/**
 * 建立完整初始化的PubSubClient對象
 * @param domain  mqtt服務器域名
 * @param post  mqtt服務器端口
 * @param client 客戶端實例
 * @param MQTT_CALLBACK_SIGNATURE callback方法
 * @param stream 輸出流,會把收到的消息輸出到流中
 */
PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
    this->_state = MQTT_DISCONNECTED;
    setServer(domain,port);
    setCallback(callback);
    setClient(client);
    setStream(stream);
}

3.2 setServer —— 配置服務器

函數說明

/**
 * 配置服務器
 * @param ip  MQTT服務器ip地址,數組
 * @param port MQTT服務器端口
 */
PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) {
    IPAddress addr(ip[0],ip[1],ip[2],ip[3]);
    return setServer(addr,port);
}

/**
 * 配置服務器
 * @param ip  MQTT服務器ip地址,IPAddress
 * @param port MQTT服務器端口
 */
PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) {
    this->ip = ip;
    this->port = port;
    this->domain = NULL;
    return *this;
}

/**
 * 配置服務器
 * @param domain  MQTT服務器domain地址
 * @param port MQTT服務器端口
 */
PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port) {
    this->domain = domain;
    this->port = port;
    return *this;
}

注意

  • 該方法返回this指針,意味着咱們能夠實現鏈式調用;

3.3 setClient —— 配置客戶端

函數說明

/**
 * 配置客戶端
 * @param client  client實例,好比wificlient
 */
PubSubClient& PubSubClient::setClient(Client& client){
    this->_client = &client;
    return *this;
}

注意

  • 該方法返回this指針,意味着咱們能夠實現鏈式調用;

3.4 setStream —— 配置流

函數說明

/**
 * 配置流,可用於存儲消息內容
 */
PubSubClient& PubSubClient::setStream(Stream& stream){
    this->stream = &stream;
    return *this;
}

注意

  • 該方法返回this指針,意味着咱們能夠實現鏈式調用;

3.5 connected —— 判斷客戶端是否鏈接上服務器

函數說明

/**
 * 判斷client是否鏈接上服務器
 * @return bool true表示鏈接上 
 */
boolean PubSubClient::connected() {
    boolean rc;
    if (_client == NULL ) {
        rc = false;
    } else {
        rc = (int)_client->connected();
        if (!rc) {
            //判斷鏈接狀態
            if (this->_state == MQTT_CONNECTED) {
                this->_state = MQTT_CONNECTION_LOST;
                _client->flush();
                _client->stop();
            }
        }
    }
    return rc;
}

3.6 connect —— 鏈接MQTT服務(CONNECT控制報文)

函數說明:

/**
 * 鏈接MQTT服務(CONNECT控制報文)
 * @param id client端標識符
 * @return bool 是否鏈接成功
 */
boolean PubSubClient::connect(const char *id) {
    return connect(id,NULL,NULL,0,0,0,0,1);
}

/**
 * 鏈接MQTT服務(CONNECT控制報文)
 * @param id client端標識符
 * @param user 用戶帳號
 * @param pass 用戶密碼
 * @return bool 是否鏈接成功 
 */
boolean PubSubClient::connect(const char *id, const char *user, const char *pass) {
    return connect(id,user,pass,0,0,0,0,1);
}

/**
 * 鏈接MQTT服務(CONNECT控制報文)
 * @param id client端標識符
 * @param willTopic 遺囑主題
 * @param willQos 遺囑消息質量等級
 * @param willRetain 是否保留信息
 * @param willMessage 遺囑內容
 * @return bool 是否鏈接成功 
 */
boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
    return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage,1);
}

/**
 * 鏈接MQTT服務(CONNECT控制報文)
 * @param id client端標識符
 * @param user 用戶帳號
 * @param pass 用戶密碼
 * @param willTopic 遺囑主題
 * @param willQos 遺囑消息質量等級
 * @param willRetain 是否保留信息
 * @param willMessage 遺囑內容
 * @return bool 是否鏈接成功 
 */
boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
    return connect(id,user,pass,willTopic,willQos,willRetain,willMessage,1);
}

/**
 * 鏈接MQTT服務(CONNECT控制報文)
 * @param id client端標識符
 * @param user 用戶帳號
 * @param pass 用戶密碼
 * @param willTopic 遺囑主題
 * @param willQos 遺囑消息質量等級
 * @param willRetain 是否保留信息
 * @param willMessage 遺囑內容
 * @param cleanSession 是否清除會話
 * @return bool 是否鏈接成功 
 *
 * @Note 注意結合CONNECT和CONNACK報文協議
 */
boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession) {
    if (!connected()) {
        int result = 0;

        if (domain != NULL) {
            result = _client->connect(this->domain, this->port);
        } else {
            result = _client->connect(this->ip, this->port);
        }
        if (result == 1) {
            nextMsgId = 1;
            // Leave room in the buffer for header and variable length field
            //固定報頭
            uint16_t length = MQTT_MAX_HEADER_SIZE;
            unsigned int j;
//在固定CONNECT報文可變報頭包含四個字段,協議名、協議級別、鏈接標誌、保持鏈接:
#if MQTT_VERSION == MQTT_VERSION_3_1
            uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION};
#define MQTT_HEADER_VERSION_LENGTH 9
#elif MQTT_VERSION == MQTT_VERSION_3_1_1
            uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION};
#define MQTT_HEADER_VERSION_LENGTH 7
#endif
            for (j = 0;j<MQTT_HEADER_VERSION_LENGTH;j++) {
                buffer[length++] = d[j];
            }

            /******************** 鏈接標誌(Connect Flags) start *************************/
            uint8_t v;
            //遺囑主題
            if (willTopic) {
                v = 0x04|(willQos<<3)|(willRetain<<5);
            } else {
                v = 0x00;
            }
            //清除會話
            if (cleanSession) {
                v = v|0x02;
            }
            //帳號密碼
            if(user != NULL) {
                v = v|0x80;

                if(pass != NULL) {
                    v = v|(0x80>>1);
                }
            }

            buffer[length++] = v;
           /***********************鏈接標誌(Connect Flags)  end ******************************************************/
           /******************** 保持鏈接(Keep Alive) start *************************/
            buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
            buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
           /******************** 保持鏈接(Keep Alive) end *************************/

           //有效載荷 CONNECT報文的有效載荷包含一個或多個以長度爲前綴的字段,可變報頭中的標誌決定是否包含這些字段。
           //若是包含的話,必須按照這個順序出現:client標識符,遺囑主題,遺囑消息,用戶名,密碼。

           /***********  client標識符 start ************/
            CHECK_STRING_LENGTH(length,id)
            length = writeString(id,buffer,length);
           /***********  client標識符 end ************/

           /********* 遺囑主題  遺囑消息 start ********/
            if (willTopic) {
                CHECK_STRING_LENGTH(length,willTopic)
                length = writeString(willTopic,buffer,length);
                CHECK_STRING_LENGTH(length,willMessage)
                length = writeString(willMessage,buffer,length);
            }
           /********* 遺囑主題  遺囑消息 end ********/

            /********* 用戶名  密碼 start ********/
            if(user != NULL) {
                CHECK_STRING_LENGTH(length,user)
                length = writeString(user,buffer,length);
                if(pass != NULL) {
                    CHECK_STRING_LENGTH(length,pass)
                    length = writeString(pass,buffer,length);
                }
            }
            /********* 用戶名  密碼 end ********/

            //拼裝 CONNECT消息
            write(MQTTCONNECT,buffer,length-MQTT_MAX_HEADER_SIZE);

            lastInActivity = lastOutActivity = millis();

            //等待MQTT服務器返回響應內容
            while (!_client->available()) {
                unsigned long t = millis();
                //判斷是否超時
                if (t-lastInActivity >= ((int32_t) MQTT_SOCKET_TIMEOUT*1000UL)) {
                    _state = MQTT_CONNECTION_TIMEOUT;
                    _client->stop();
                    return false;
                }
            }
            uint8_t llen;
            //讀取響應返回的內容
            uint16_t len = readPacket(&llen);

            /*** 處理 CONNACK – 確認鏈接請求 報文 ***/
            if (len == 4) {
                if (buffer[3] == 0) {
                    lastInActivity = millis();
                    pingOutstanding = false;
                    _state = MQTT_CONNECTED;
                    return true;
                } else {
                    _state = buffer[3];
                }
            }
            _client->stop();
        } else {
            _state = MQTT_CONNECT_FAILED;
        }
        return false;
    }
    return true;
}

注意

  • MQTT_KEEPALIVE 默認 15 S;
  • MQTT_SOCKET_TIMEOUT 默認15 S;

3.7 disconnect —— 斷開鏈接(DISCONNECT報文)

函數說明:

/**
 * 斷開鏈接
 * 客戶端斷開鏈接(客戶端發給服務端的最後一個控制報文。表示客戶端正常斷開鏈接)
 * @Note 取消訂閱報文格式: 固定報頭(報文類型+剩餘長度)
 */
void PubSubClient::disconnect() {
    /*** 斷開鏈接 報文協議 ***/
    buffer[0] = MQTTDISCONNECT;
    buffer[1] = 0;
    _client->write(buffer,2);
    _state = MQTT_DISCONNECTED;
    _client->flush();
    _client->stop();
    lastInActivity = lastOutActivity = millis();
}

3.8 subscribe —— 訂閱主題(SUBSCRIBE報文)

函數說明:

/**
 * 訂閱主題
 * @param topic 主題
 * @return bool 是否訂閱成功
 */
boolean PubSubClient::subscribe(const char* topic) {
    return subscribe(topic, 0);
}

/**
 * 訂閱主題(客戶端向服務端發送SUBSCRIBE報文用於建立一個或多個訂閱)
 * @param topic 主題
 * @param qos 質量等級
 * @return bool 是否訂閱成功
 *
 * @Note 訂閱報文格式: 固定報頭 + 可變報頭(報文標識符)+ 有效載荷(主題過濾器)
 *       訂閱確認報文格式:固定報頭 可變報頭(報文標識符)+ 有效載荷(返回碼清單)
 */
boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
    if (qos > 1) {
        return false;
    }
    if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) {
        // Too long
        return false;
    }
    if (connected()) {
        // Leave room in the buffer for header and variable length field
        uint16_t length = MQTT_MAX_HEADER_SIZE;
        nextMsgId++;
        if (nextMsgId == 0) {
            nextMsgId = 1;
        }
        buffer[length++] = (nextMsgId >> 8);
        buffer[length++] = (nextMsgId & 0xFF);
        length = writeString((char*)topic, buffer,length);
        buffer[length++] = qos;
        return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE);
    }
    return false;
}

3.9 unsubscribe —— 取消訂閱主題

函數說明:

/**
 * 取消訂閱主題(客戶端發送UNSUBSCRIBE報文給服務端,用於取消訂閱主題)
 * @param topic 具體主題
 * @return bool 是否取消成功
 * @Note 取消訂閱報文格式: 固定報頭(報文類型+剩餘長度) + 可變報頭(報文標識符)+ 有效載荷(主題過濾器列表)
 */
boolean PubSubClient::unsubscribe(const char* topic) {
    if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) {
        // Too long
        return false;
    }
    if (connected()) {
        uint16_t length = MQTT_MAX_HEADER_SIZE;
        nextMsgId++;
        if (nextMsgId == 0) {
            nextMsgId = 1;
        }
        //可變報頭(報文標識符)
        buffer[length++] = (nextMsgId >> 8);
        buffer[length++] = (nextMsgId & 0xFF);
        length = writeString(topic, buffer,length);
        return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE);
    }
    return false;
}

3.10 publish/publish_P —— 發佈消息

函數說明:

/**
 * 發佈對應主題消息
 * @param topic 主題
 * @param payload 有效負載
 */
boolean PubSubClient::publish(const char* topic, const char* payload) {
    return publish(topic,(const uint8_t*)payload,strlen(payload),false);
}

/**
 * 發佈對應主題消息
 * @param topic 主題
 * @param payload 有效負載
 * @param retained 是否保持
 */
boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) {
    return publish(topic,(const uint8_t*)payload,strlen(payload),retained);
}

/**
 * 發佈對應主題消息
 * @param topic 主題
 * @param payload 有效負載
 * @param plength 負載內容長度
 */
boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) {
    return publish(topic, payload, plength, false);
}

/**
 * 發佈對應主題消息
 * @param topic 主題
 * @param payload 有效負載
 * @param plength 負載內容長度
 * @param retained 是否保持
 */
boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
    if (connected()) {
        if (MQTT_MAX_PACKET_SIZE < MQTT_MAX_HEADER_SIZE + 2+strlen(topic) + plength) {
            // Too long
            return false;
        }
        // Leave room in the buffer for header and variable length field
        uint16_t length = MQTT_MAX_HEADER_SIZE;
        length = writeString(topic,buffer,length);
        uint16_t i;
        for (i=0;i<plength;i++) {
            buffer[length++] = payload[i];
        }
        uint8_t header = MQTTPUBLISH;
        if (retained) {
            header |= 1;
        }
        return write(header,buffer,length-MQTT_MAX_HEADER_SIZE);
    }
    return false;
}

/**
 * 發佈對應主題消息
 * @param topic 主題
 * @param payload 有效負載(F(xxx))
 * @param retained 是否保持
 */
boolean PubSubClient::publish_P(const char* topic, const char* payload, boolean retained) {
    return publish_P(topic, (const uint8_t*)payload, strlen(payload), retained);
}

/**
 * 發佈對應主題消息
 * @param topic 主題
 * @param payload 有效負載(F(xxx))
 * @param plength 負載內容長度
 * @param retained 是否保持
 */
boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
    uint8_t llen = 0;
    uint8_t digit;
    unsigned int rc = 0;
    uint16_t tlen;
    unsigned int pos = 0;
    unsigned int i;
    uint8_t header;
    unsigned int len;

    if (!connected()) {
        return false;
    }

    tlen = strlen(topic);

    header = MQTTPUBLISH;
    if (retained) {
        header |= 1;
    }
    buffer[pos++] = header;
    len = plength + 2 + tlen;
    do {
        digit = len % 128;
        len = len / 128;
        if (len > 0) {
            digit |= 0x80;
        }
        buffer[pos++] = digit;
        llen++;
    } while(len>0);

    pos = writeString(topic,buffer,pos);

    rc += _client->write(buffer,pos);

    for (i=0;i<plength;i++) {
        rc += _client->write((char)pgm_read_byte_near(payload + i));
    }

    lastOutActivity = millis();

    return rc == tlen + 4 + plength;
}

3.11 setCallback —— 處理消息回調

函數說明:

/**
 * 設置消息回調函數
 * @param MQTT_CALLBACK_SIGNATURE 
 */
PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) {
    this->callback = callback;
    return *this;
}

注意

  • MQTT_CALLBACK_SIGNATURE是一個函數定義
#if defined(ESP8266) || defined(ESP32)
#include <functional>
#define MQTT_CALLBACK_SIGNATURE std::function<void(char*, uint8_t*, unsigned int)> callback
#else
#define MQTT_CALLBACK_SIGNATURE void (*callback)(char*, uint8_t*, unsigned int)
#endif

3.12 loop —— 處理消息以及保持心跳

函數說明:

/**
 * 處理消息以及保持心跳
 */
boolean PubSubClient::loop() {
    if (connected()) {
        unsigned long t = millis();
        if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
            if (pingOutstanding) {
                this->_state = MQTT_CONNECTION_TIMEOUT;
                _client->stop();
                return false;
            } else {
                /*** PINGREQ——心跳請求 ****/
                buffer[0] = MQTTPINGREQ;
                buffer[1] = 0;
                _client->write(buffer,2);
                lastOutActivity = t;
                lastInActivity = t;
                pingOutstanding = true;
            }
        }
        if (_client->available()) {
            uint8_t llen;
            uint16_t len = readPacket(&llen);
            uint16_t msgId = 0;
            uint8_t *payload;
            if (len > 0) {
                lastInActivity = t;
                uint8_t type = buffer[0]&0xF0;
                if (type == MQTTPUBLISH) {
                 //服務端發佈消息
                    if (callback) {
                        uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; /* topic length in bytes */
                        memmove(buffer+llen+2,buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */
                        buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */
                        char *topic = (char*) buffer+llen+2;
                        // msgId only present for QOS>0
                        if ((buffer[0]&0x06) == MQTTQOS1) {
                            msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
                            payload = buffer+llen+3+tl+2;
                            callback(topic,payload,len-llen-3-tl-2);
                            //客戶端發佈應答
                            buffer[0] = MQTTPUBACK;
                            buffer[1] = 2;
                            buffer[2] = (msgId >> 8);
                            buffer[3] = (msgId & 0xFF);
                            _client->write(buffer,4);
                            lastOutActivity = t;

                        } else {
                            payload = buffer+llen+3+tl;
                            callback(topic,payload,len-llen-3-tl);
                        }
                    }
                } else if (type == MQTTPINGREQ) {
                    /**** PINGRESP-心跳響應 ****/
                    buffer[0] = MQTTPINGRESP;
                    buffer[1] = 0;
                    _client->write(buffer,2);
                } else if (type == MQTTPINGRESP) {
                    /**** PINGRESP-心跳響應 ****/
                    pingOutstanding = false;
                }
            } else if (!connected()) {
                // readPacket has closed the connection
                return false;
            }
        }
        return true;
    }
    return false;
}

3.13 beginPublish —— 發佈大數據(第1步)

    上面的發佈消息方法都是小數據發佈,何爲小數據呢?直接看buffer大小吧:

// MQTT_MAX_PACKET_SIZE : Maximum packet size
#ifndef MQTT_MAX_PACKET_SIZE
#define MQTT_MAX_PACKET_SIZE 128
#endif

    默認的buffer是128字節,固然博主不推薦你們發送大量數據。
    若是要發佈稍微大一點的數據,就得用到三部曲方法。

函數說明

/**
 * 發佈大數據(第1步) —— 只是固定報頭 + 剩餘長度 
 * @param topic 主題
 * @param plength 負載內容長度
 * @param retained 是否保持
 *
 * @Note 這裏仍是用到buffer[128]
 */
boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) {
    if (connected()) {
        // Send the header and variable length field
        uint16_t length = MQTT_MAX_HEADER_SIZE;
        length = writeString(topic,buffer,length);
        uint8_t header = MQTTPUBLISH;
        if (retained) {
            header |= 1;
        }
        size_t hlen = buildHeader(header, buffer, plength+length-MQTT_MAX_HEADER_SIZE);
        uint16_t rc = _client->write(buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
        lastOutActivity = millis();
        return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen)));
    }
    return false;
}

3.14 write —— 發佈大數據(第2步)

函數說明

/**
 * 發佈大數據(第2步) —— 有效負載
 * @param buffer 內容
 * @param size 負載內容長度
 *
 * @Note 這裏仍是用到buffer[128]
 */
size_t PubSubClient::write(const uint8_t *buffer, size_t size) {
    lastOutActivity = millis();
    return _client->write(buffer,size);
}

3.15 endPublish —— 發佈大數據(第3步)

函數說明

/**
 * 發佈大數據(第3步) 感受沒什麼用的方法....
 */
int PubSubClient::endPublish() {
 return 1;
}

3.16 state —— 獲取Mqtt客戶端當前狀態

函數說明

//狀態定義
// Possible values for client.state()
#define MQTT_CONNECTION_TIMEOUT     -4
#define MQTT_CONNECTION_LOST        -3
#define MQTT_CONNECT_FAILED         -2
#define MQTT_DISCONNECTED           -1
#define MQTT_CONNECTED               0
#define MQTT_CONNECT_BAD_PROTOCOL    1
#define MQTT_CONNECT_BAD_CLIENT_ID   2
#define MQTT_CONNECT_UNAVAILABLE     3
#define MQTT_CONNECT_BAD_CREDENTIALS 4
#define MQTT_CONNECT_UNAUTHORIZED    5

/**
 * 獲取Mqtt客戶端當前狀態
 */
int PubSubClient::state() {
    return this->_state;
}

4.測試用例

    工具講完了,咱們直接看庫自帶的示例(後面博主會結合OneNet來使用MQTT 敬請期待)。

4.1 mqtt-8266案例

案例說明

  • 鏈接上一個MQTT服務器,每2秒發佈一次"hello world"消息到主題「outTopic」
  • 客戶端監聽主題「inTopic」,並判斷負載內容來控制燈亮滅

案例代碼

#include <ESP8266WiFi.h>
#include <PubSubClient.h>

// Update these with values suitable for your network.

const char* ssid = "........";//wifi帳號
const char* password = "........";//wifi祕密
const char* mqtt_server = "broker.mqtt-dashboard.com";//mqtt服務器

WiFiClient espClient;
PubSubClient client(espClient);
long lastMsg = 0;
char msg[50];
int value = 0;

void setup_wifi() {

  delay(10);
  // We start by connecting to a WiFi network
  Serial.println();
  Serial.print("Connecting to ");
  Serial.println(ssid);

  WiFi.begin(ssid, password);

  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }

  randomSeed(micros());

  Serial.println("");
  Serial.println("WiFi connected");
  Serial.println("IP address: ");
  Serial.println(WiFi.localIP());
}

/**
 * 消息回調
 */
void callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Message arrived [");
  Serial.print(topic);
  Serial.print("] ");
  for (int i = 0; i < length; i++) {
    Serial.print((char)payload[i]);
  }
  Serial.println();

  // Switch on the LED if an 1 was received as first character
  if ((char)payload[0] == '1') {
    digitalWrite(BUILTIN_LED, LOW);   // Turn the LED on (Note that LOW is the voltage level
    // but actually the LED is on; this is because
    // it is active low on the ESP-01)
  } else {
    digitalWrite(BUILTIN_LED, HIGH);  // Turn the LED off by making the voltage HIGH
  }

}

/**
 * 斷開重連
 */
void reconnect() {
  // Loop until we're reconnected
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    // Create a random client ID
    String clientId = "ESP8266Client-";
    clientId += String(random(0xffff), HEX);
    // Attempt to connect
    if (client.connect(clientId.c_str())) {
      Serial.println("connected");
      // Once connected, publish an announcement...
      client.publish("outTopic", "hello world");
      // ... and resubscribe
      client.subscribe("inTopic");
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      // Wait 5 seconds before retrying
      delay(5000);
    }
  }
}

void setup() {
  pinMode(BUILTIN_LED, OUTPUT);     // Initialize the BUILTIN_LED pin as an output
  Serial.begin(115200);
  setup_wifi();
  //配置mqtt服務器地址和端口
  client.setServer(mqtt_server, 1883);
  //設置訂閱消息回調
  client.setCallback(callback);
}

void loop() {
  //重連機制
  if (!client.connected()) {
    reconnect();
  }
  //不斷監聽信息
  client.loop();

  long now = millis();
  if (now - lastMsg > 2000) {
    //每2s發佈一次信息
    lastMsg = now;
    ++value;
    snprintf (msg, 50, "hello world #%ld", value);
    Serial.print("Publish message: ");
    Serial.println(msg);
    client.publish("outTopic", msg);
  }
}

4.2 mqtt-auth案例

案例說明

  • 鏈接上一個MQTT服務器,須要帳號和密碼

案例代碼

#include <ESP8266WiFi.h>
#include <PubSubClient.h>

const char* ssid = "........";
const char* password = "........";
const char* mqtt_server = "broker.mqtt-dashboard.com";

WiFiClient espClient;

void callback(char* topic, byte* payload, unsigned int length) {
  // handle message arrived
}

PubSubClient client(mqtt_server, 1883, callback, espClient);

void setup_wifi() {
  delay(10);
  // We start by connecting to a WiFi network
  Serial.println();
  Serial.print("Connecting to ");
  Serial.println(ssid);

  WiFi.begin(ssid, password);

  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }

  Serial.println("");
  Serial.println("WiFi connected");
  Serial.println("IP address: ");
  Serial.println(WiFi.localIP());
}

void setup()
{
  
  setup_wifi();
  
  // Note - the default maximum packet size is 128 bytes. If the
  // combined length of clientId, username and password exceed this,
  // you will need to increase the value of MQTT_MAX_PACKET_SIZE in
  // PubSubClient.h
  
  if (client.connect("arduinoClient", "testuser", "testpass")) {
    client.publish("outTopic","hello world");
    client.subscribe("inTopic");
  }
}

void loop()
{
  client.loop();
}

4.3 mqtt-larget-message案例

案例說明

  • 鏈接上一個MQTT服務器,發佈大數據

案例代碼

#include <ESP8266WiFi.h>
#include <PubSubClient.h>

// Update these with values suitable for your network.

const char* ssid = "........";
const char* password = "........";
const char* mqtt_server = "broker.mqtt-dashboard.com";

WiFiClient espClient;
PubSubClient client(espClient);

void setup_wifi() {

  delay(10);
  // We start by connecting to a WiFi network
  Serial.println();
  Serial.print("Connecting to ");
  Serial.println(ssid);

  WiFi.begin(ssid, password);

  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }

  randomSeed(micros());

  Serial.println("");
  Serial.println("WiFi connected");
  Serial.println("IP address: ");
  Serial.println(WiFi.localIP());
}

void callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Message arrived [");
  Serial.print(topic);
  Serial.print("] ");
  for (int i = 0; i < length; i++) {
    Serial.print((char)payload[i]);
  }
  Serial.println();

  // Find out how many bottles we should generate lyrics for
  String topicStr(topic);
  int bottleCount = 0; // assume no bottles unless we correctly parse a value from the topic
  if (topicStr.indexOf('/') >= 0) {
    // The topic includes a '/', we'll try to read the number of bottles from just after that
    topicStr.remove(0, topicStr.indexOf('/')+1);
    // Now see if there's a number of bottles after the '/'
    bottleCount = topicStr.toInt();
  }

  if (bottleCount > 0) {
    // Work out how big our resulting message will be
    int msgLen = 0;
    for (int i = bottleCount; i > 0; i--) {
      String numBottles(i);
      msgLen += 2*numBottles.length();
      if (i == 1) {
        msgLen += 2*String(" green bottle, standing on the wall\n").length();
      } else {
        msgLen += 2*String(" green bottles, standing on the wall\n").length();
      }
      msgLen += String("And if one green bottle should accidentally fall\nThere'll be ").length();
      switch (i) {
      case 1:
        msgLen += String("no green bottles, standing on the wall\n\n").length();
        break;
      case 2:
        msgLen += String("1 green bottle, standing on the wall\n\n").length();
        break;
      default:
        numBottles = i-1;
        msgLen += numBottles.length();
        msgLen += String(" green bottles, standing on the wall\n\n").length();
        break;
      };
    }
  
    // 顯示開始發佈大數據
    client.beginPublish("greenBottles/lyrics", msgLen, false);
    for (int i = bottleCount; i > 0; i--) {
      for (int j = 0; j < 2; j++) {
        client.print(i);
        if (i == 1) {
          client.print(" green bottle, standing on the wall\n");
        } else {
          client.print(" green bottles, standing on the wall\n");
        }
      }
      client.print("And if one green bottle should accidentally fall\nThere'll be ");
      switch (i) {
      case 1:
        client.print("no green bottles, standing on the wall\n\n");
        break;
      case 2:
        client.print("1 green bottle, standing on the wall\n\n");
        break;
      default:
        client.print(i-1);
        client.print(" green bottles, standing on the wall\n\n");
        break;
      };
    }
    // 發佈完畢
    client.endPublish();
  }
}

/**
 * 重連機制
 */
void reconnect() {
  // Loop until we're reconnected
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    // Create a random client ID
    String clientId = "ESP8266Client-";
    clientId += String(random(0xffff), HEX);
    // Attempt to connect
    if (client.connect(clientId.c_str())) {
      Serial.println("connected");
      // Once connected, publish an announcement...
      client.publish("outTopic", "hello world");
      // ... and resubscribe
      client.subscribe("greenBottles/#");
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      // Wait 5 seconds before retrying
      delay(5000);
    }
  }
}

void setup() {
  pinMode(BUILTIN_LED, OUTPUT);     // Initialize the BUILTIN_LED pin as an output
  Serial.begin(115200);
  setup_wifi();
  client.setServer(mqtt_server, 1883);
  client.setCallback(callback);
}

void loop() {

  if (!client.connected()) {
    reconnect();
  }
  client.loop();
}

5.總結

    整體來講,MQTT協議簡單,很是容易上手使用,但願讀者也能就着協議理解一下源碼知識,敬請期待OneNet篇關於MQTT的使用。

相關文章
相關標籤/搜索