在ESP8266學習系列中,博主一直使用HTTP協議。HTTP鏈接屬於短鏈接,而在物聯網應用中,普遍應用的倒是MQTT協議。因此,本篇咱們將學習Arduino平臺上的MQTT實現庫 —— PubSubClient。git
MQTT協議(Message Queuing Telemetry Transport),翻譯過來就是遙信消息隊列傳輸,是IBM公司於1999年提出的,如今最新版本是3.1.1。MQTT是一個基於TCP的發佈訂閱協議,設計的初始目的是爲了極有限的內存設備和網絡帶寬很低的網絡不可靠的通訊,很是適合物聯網通訊。數組
MQTT屬於應用層協議,基於TCP協議,確保了可靠性。博主在這裏不會去詳細講述MQTT協議(網上講解MQTT協議內容不少,不須要重複),但願有興趣的讀者自行去閱讀,可參考 MQTT中文文檔。服務器
MQTT通訊模型以下:網絡
MQTT消息支持三種QOS等級:數據結構
MQTT控制報文由三部分組成:less
總體上說,MQTT總體控制報文協議就是:dom
固定報頭(必定有) + 可變報頭(部分有) + 有效載荷(部分有)ide
數據結構簡單,傳輸數據量小,這也是爲何能應用於物聯網應用的緣由之一。函數
注意點:工具
報文格式:
固定報頭 + 可變報頭 + 有效載荷
注意點:
報文格式:
固定報頭 + 可變報頭
注意點:
報文格式:
固定報頭 + 可變報頭 + 有效載荷
注意點:
報文格式:
固定報頭 + 可變報頭
注意點:
報文格式:
固定報頭 + 可變報頭
注意點:
報文格式:
固定報頭 + 可變報頭
注意點:
報文格式:
固定報頭 + 可變報頭
注意點:
報文格式:
固定報頭 + 可變報頭 + 有效載荷
注意點:
報文格式:
固定報頭 + 可變報頭 + 有效載荷
注意點:
報文格式:
固定報頭 + 可變報頭 + 有效載荷
注意點:
報文格式:
固定報頭 + 可變報頭
注意點:
報文格式:
固定報頭
注意點:
報文格式:
固定報頭
注意點:
報文格式:
固定報頭
老規矩,上一個百度腦圖:
函數1說明:
/** * 建立一個沒有初始化的PubSubClient對象 */ PubSubClient::PubSubClient() { this->_state = MQTT_DISCONNECTED; this->_client = NULL; this->stream = NULL; setCallback(NULL); }
在使用PubSubClient對象以前,必須配置完整的內容:
WiFiClient espClient; PubSubClient client; void setup() { client.setClient(espClient); client.setServer("broker.example.com",1883); // client is now configured for use }
函數2說明:
/** * 建立一個部分初始化的PubSubClient對象 * @param client client實例 */ PubSubClient::PubSubClient(Client& client) { this->_state = MQTT_DISCONNECTED; setClient(client); this->stream = NULL; }
在使用PubSubClient對象以前,必須配置完整的內容:
WiFiClient espClient; PubSubClient client(espClient); void setup() { client.setServer("broker.example.com",1883); // client is now configured for use }
函數3說明:
/** * 建立完整初始化的PubSubClient對象 * @param addr mqtt服務器ip地址 * @param post mqtt服務器端口 * @param client 客戶端實例 */ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(addr, port); setClient(client); this->stream = NULL; } /** * 建立完整初始化的PubSubClient對象 * @param addr mqtt服務器ip地址 * @param post mqtt服務器端口 * @param client 客戶端實例 * @param stream 輸出流,會把收到的消息輸出到流中 */ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(addr,port); setClient(client); setStream(stream); } /** * 建立完整初始化的PubSubClient對象 * @param addr mqtt服務器ip地址 * @param post mqtt服務器端口 * @param MQTT_CALLBACK_SIGNATURE callback方法 * @param client 客戶端實例 */ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(addr, port); setCallback(callback); setClient(client); this->stream = NULL; } /** * 建立完整初始化的PubSubClient對象 * @param addr mqtt服務器ip地址 * @param post mqtt服務器端口 * @param MQTT_CALLBACK_SIGNATURE callback方法 * @param client 客戶端實例 * @param stream 輸出流,會把收到的消息輸出到流中 */ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(addr,port); setCallback(callback); setClient(client); setStream(stream); } /** * 建立完整初始化的PubSubClient對象 * @param ip mqtt服務器ip地址 * @param post mqtt服務器端口 * @param client 客戶端實例 */ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(ip, port); setClient(client); this->stream = NULL; } /** * 建立完整初始化的PubSubClient對象 * @param ip mqtt服務器ip地址 * @param post mqtt服務器端口 * @param client 客戶端實例 * @param stream 輸出流,會把收到的消息輸出到流中 */ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(ip,port); setClient(client); setStream(stream); } /** * 建立完整初始化的PubSubClient對象 * @param ip mqtt服務器ip地址 * @param post mqtt服務器端口 * @param client 客戶端實例 * @param MQTT_CALLBACK_SIGNATURE callback方法 */ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(ip, port); setCallback(callback); setClient(client); this->stream = NULL; } /** * 建立完整初始化的PubSubClient對象 * @param ip mqtt服務器ip地址 * @param post mqtt服務器端口 * @param client 客戶端實例 * @param MQTT_CALLBACK_SIGNATURE callback方法 * @param stream 輸出流,會把收到的消息輸出到流中 */ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(ip,port); setCallback(callback); setClient(client); setStream(stream); } /** * 建立完整初始化的PubSubClient對象 * @param domain mqtt服務器域名 * @param post mqtt服務器端口 * @param client 客戶端實例 */ PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(domain,port); setClient(client); this->stream = NULL; } /** * 建立完整初始化的PubSubClient對象 * @param domain mqtt服務器域名 * @param post mqtt服務器端口 * @param client 客戶端實例 * @param stream 輸出流,會把收到的消息輸出到流中 */ PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(domain,port); setClient(client); setStream(stream); } /** * 建立完整初始化的PubSubClient對象 * @param domain mqtt服務器域名 * @param post mqtt服務器端口 * @param client 客戶端實例 * @param MQTT_CALLBACK_SIGNATURE callback方法 */ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(domain,port); setCallback(callback); setClient(client); this->stream = NULL; } /** * 建立完整初始化的PubSubClient對象 * @param domain mqtt服務器域名 * @param post mqtt服務器端口 * @param client 客戶端實例 * @param MQTT_CALLBACK_SIGNATURE callback方法 * @param stream 輸出流,會把收到的消息輸出到流中 */ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(domain,port); setCallback(callback); setClient(client); setStream(stream); }
函數說明:
/** * 配置服務器 * @param ip MQTT服務器ip地址,數組 * @param port MQTT服務器端口 */ PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) { IPAddress addr(ip[0],ip[1],ip[2],ip[3]); return setServer(addr,port); } /** * 配置服務器 * @param ip MQTT服務器ip地址,IPAddress * @param port MQTT服務器端口 */ PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) { this->ip = ip; this->port = port; this->domain = NULL; return *this; } /** * 配置服務器 * @param domain MQTT服務器domain地址 * @param port MQTT服務器端口 */ PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port) { this->domain = domain; this->port = port; return *this; }
注意:
函數說明:
/** * 配置客戶端 * @param client client實例,好比wificlient */ PubSubClient& PubSubClient::setClient(Client& client){ this->_client = &client; return *this; }
注意:
函數說明
/** * 配置流,可用於存儲消息內容 */ PubSubClient& PubSubClient::setStream(Stream& stream){ this->stream = &stream; return *this; }
注意:
函數說明
/** * 判斷client是否鏈接上服務器 * @return bool true表示鏈接上 */ boolean PubSubClient::connected() { boolean rc; if (_client == NULL ) { rc = false; } else { rc = (int)_client->connected(); if (!rc) { //判斷鏈接狀態 if (this->_state == MQTT_CONNECTED) { this->_state = MQTT_CONNECTION_LOST; _client->flush(); _client->stop(); } } } return rc; }
函數說明:
/** * 鏈接MQTT服務(CONNECT控制報文) * @param id client端標識符 * @return bool 是否鏈接成功 */ boolean PubSubClient::connect(const char *id) { return connect(id,NULL,NULL,0,0,0,0,1); } /** * 鏈接MQTT服務(CONNECT控制報文) * @param id client端標識符 * @param user 用戶帳號 * @param pass 用戶密碼 * @return bool 是否鏈接成功 */ boolean PubSubClient::connect(const char *id, const char *user, const char *pass) { return connect(id,user,pass,0,0,0,0,1); } /** * 鏈接MQTT服務(CONNECT控制報文) * @param id client端標識符 * @param willTopic 遺囑主題 * @param willQos 遺囑消息質量等級 * @param willRetain 是否保留信息 * @param willMessage 遺囑內容 * @return bool 是否鏈接成功 */ boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage,1); } /** * 鏈接MQTT服務(CONNECT控制報文) * @param id client端標識符 * @param user 用戶帳號 * @param pass 用戶密碼 * @param willTopic 遺囑主題 * @param willQos 遺囑消息質量等級 * @param willRetain 是否保留信息 * @param willMessage 遺囑內容 * @return bool 是否鏈接成功 */ boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { return connect(id,user,pass,willTopic,willQos,willRetain,willMessage,1); } /** * 鏈接MQTT服務(CONNECT控制報文) * @param id client端標識符 * @param user 用戶帳號 * @param pass 用戶密碼 * @param willTopic 遺囑主題 * @param willQos 遺囑消息質量等級 * @param willRetain 是否保留信息 * @param willMessage 遺囑內容 * @param cleanSession 是否清除會話 * @return bool 是否鏈接成功 * * @Note 注意結合CONNECT和CONNACK報文協議 */ boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession) { if (!connected()) { int result = 0; if (domain != NULL) { result = _client->connect(this->domain, this->port); } else { result = _client->connect(this->ip, this->port); } if (result == 1) { nextMsgId = 1; // Leave room in the buffer for header and variable length field //固定報頭 uint16_t length = MQTT_MAX_HEADER_SIZE; unsigned int j; //在固定CONNECT報文可變報頭包含四個字段,協議名、協議級別、鏈接標誌、保持鏈接: #if MQTT_VERSION == MQTT_VERSION_3_1 uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION}; #define MQTT_HEADER_VERSION_LENGTH 9 #elif MQTT_VERSION == MQTT_VERSION_3_1_1 uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION}; #define MQTT_HEADER_VERSION_LENGTH 7 #endif for (j = 0;j<MQTT_HEADER_VERSION_LENGTH;j++) { buffer[length++] = d[j]; } /******************** 鏈接標誌(Connect Flags) start *************************/ uint8_t v; //遺囑主題 if (willTopic) { v = 0x04|(willQos<<3)|(willRetain<<5); } else { v = 0x00; } //清除會話 if (cleanSession) { v = v|0x02; } //帳號密碼 if(user != NULL) { v = v|0x80; if(pass != NULL) { v = v|(0x80>>1); } } buffer[length++] = v; /***********************鏈接標誌(Connect Flags) end ******************************************************/ /******************** 保持鏈接(Keep Alive) start *************************/ buffer[length++] = ((MQTT_KEEPALIVE) >> 8); buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); /******************** 保持鏈接(Keep Alive) end *************************/ //有效載荷 CONNECT報文的有效載荷包含一個或多個以長度爲前綴的字段,可變報頭中的標誌決定是否包含這些字段。 //若是包含的話,必須按照這個順序出現:client標識符,遺囑主題,遺囑消息,用戶名,密碼。 /*********** client標識符 start ************/ CHECK_STRING_LENGTH(length,id) length = writeString(id,buffer,length); /*********** client標識符 end ************/ /********* 遺囑主題 遺囑消息 start ********/ if (willTopic) { CHECK_STRING_LENGTH(length,willTopic) length = writeString(willTopic,buffer,length); CHECK_STRING_LENGTH(length,willMessage) length = writeString(willMessage,buffer,length); } /********* 遺囑主題 遺囑消息 end ********/ /********* 用戶名 密碼 start ********/ if(user != NULL) { CHECK_STRING_LENGTH(length,user) length = writeString(user,buffer,length); if(pass != NULL) { CHECK_STRING_LENGTH(length,pass) length = writeString(pass,buffer,length); } } /********* 用戶名 密碼 end ********/ //拼裝 CONNECT消息 write(MQTTCONNECT,buffer,length-MQTT_MAX_HEADER_SIZE); lastInActivity = lastOutActivity = millis(); //等待MQTT服務器返回響應內容 while (!_client->available()) { unsigned long t = millis(); //判斷是否超時 if (t-lastInActivity >= ((int32_t) MQTT_SOCKET_TIMEOUT*1000UL)) { _state = MQTT_CONNECTION_TIMEOUT; _client->stop(); return false; } } uint8_t llen; //讀取響應返回的內容 uint16_t len = readPacket(&llen); /*** 處理 CONNACK – 確認鏈接請求 報文 ***/ if (len == 4) { if (buffer[3] == 0) { lastInActivity = millis(); pingOutstanding = false; _state = MQTT_CONNECTED; return true; } else { _state = buffer[3]; } } _client->stop(); } else { _state = MQTT_CONNECT_FAILED; } return false; } return true; }
注意:
函數說明:
/** * 斷開鏈接 * 客戶端斷開鏈接(客戶端發給服務端的最後一個控制報文。表示客戶端正常斷開鏈接) * @Note 取消訂閱報文格式: 固定報頭(報文類型+剩餘長度) */ void PubSubClient::disconnect() { /*** 斷開鏈接 報文協議 ***/ buffer[0] = MQTTDISCONNECT; buffer[1] = 0; _client->write(buffer,2); _state = MQTT_DISCONNECTED; _client->flush(); _client->stop(); lastInActivity = lastOutActivity = millis(); }
函數說明:
/** * 訂閱主題 * @param topic 主題 * @return bool 是否訂閱成功 */ boolean PubSubClient::subscribe(const char* topic) { return subscribe(topic, 0); } /** * 訂閱主題(客戶端向服務端發送SUBSCRIBE報文用於建立一個或多個訂閱) * @param topic 主題 * @param qos 質量等級 * @return bool 是否訂閱成功 * * @Note 訂閱報文格式: 固定報頭 + 可變報頭(報文標識符)+ 有效載荷(主題過濾器) * 訂閱確認報文格式:固定報頭 可變報頭(報文標識符)+ 有效載荷(返回碼清單) */ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { if (qos > 1) { return false; } if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) { // Too long return false; } if (connected()) { // Leave room in the buffer for header and variable length field uint16_t length = MQTT_MAX_HEADER_SIZE; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; } buffer[length++] = (nextMsgId >> 8); buffer[length++] = (nextMsgId & 0xFF); length = writeString((char*)topic, buffer,length); buffer[length++] = qos; return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE); } return false; }
函數說明:
/** * 取消訂閱主題(客戶端發送UNSUBSCRIBE報文給服務端,用於取消訂閱主題) * @param topic 具體主題 * @return bool 是否取消成功 * @Note 取消訂閱報文格式: 固定報頭(報文類型+剩餘長度) + 可變報頭(報文標識符)+ 有效載荷(主題過濾器列表) */ boolean PubSubClient::unsubscribe(const char* topic) { if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) { // Too long return false; } if (connected()) { uint16_t length = MQTT_MAX_HEADER_SIZE; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; } //可變報頭(報文標識符) buffer[length++] = (nextMsgId >> 8); buffer[length++] = (nextMsgId & 0xFF); length = writeString(topic, buffer,length); return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE); } return false; }
函數說明:
/** * 發佈對應主題消息 * @param topic 主題 * @param payload 有效負載 */ boolean PubSubClient::publish(const char* topic, const char* payload) { return publish(topic,(const uint8_t*)payload,strlen(payload),false); } /** * 發佈對應主題消息 * @param topic 主題 * @param payload 有效負載 * @param retained 是否保持 */ boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) { return publish(topic,(const uint8_t*)payload,strlen(payload),retained); } /** * 發佈對應主題消息 * @param topic 主題 * @param payload 有效負載 * @param plength 負載內容長度 */ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) { return publish(topic, payload, plength, false); } /** * 發佈對應主題消息 * @param topic 主題 * @param payload 有效負載 * @param plength 負載內容長度 * @param retained 是否保持 */ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { if (connected()) { if (MQTT_MAX_PACKET_SIZE < MQTT_MAX_HEADER_SIZE + 2+strlen(topic) + plength) { // Too long return false; } // Leave room in the buffer for header and variable length field uint16_t length = MQTT_MAX_HEADER_SIZE; length = writeString(topic,buffer,length); uint16_t i; for (i=0;i<plength;i++) { buffer[length++] = payload[i]; } uint8_t header = MQTTPUBLISH; if (retained) { header |= 1; } return write(header,buffer,length-MQTT_MAX_HEADER_SIZE); } return false; } /** * 發佈對應主題消息 * @param topic 主題 * @param payload 有效負載(F(xxx)) * @param retained 是否保持 */ boolean PubSubClient::publish_P(const char* topic, const char* payload, boolean retained) { return publish_P(topic, (const uint8_t*)payload, strlen(payload), retained); } /** * 發佈對應主題消息 * @param topic 主題 * @param payload 有效負載(F(xxx)) * @param plength 負載內容長度 * @param retained 是否保持 */ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { uint8_t llen = 0; uint8_t digit; unsigned int rc = 0; uint16_t tlen; unsigned int pos = 0; unsigned int i; uint8_t header; unsigned int len; if (!connected()) { return false; } tlen = strlen(topic); header = MQTTPUBLISH; if (retained) { header |= 1; } buffer[pos++] = header; len = plength + 2 + tlen; do { digit = len % 128; len = len / 128; if (len > 0) { digit |= 0x80; } buffer[pos++] = digit; llen++; } while(len>0); pos = writeString(topic,buffer,pos); rc += _client->write(buffer,pos); for (i=0;i<plength;i++) { rc += _client->write((char)pgm_read_byte_near(payload + i)); } lastOutActivity = millis(); return rc == tlen + 4 + plength; }
函數說明:
/** * 設置消息回調函數 * @param MQTT_CALLBACK_SIGNATURE */ PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) { this->callback = callback; return *this; }
注意:
#if defined(ESP8266) || defined(ESP32) #include <functional> #define MQTT_CALLBACK_SIGNATURE std::function<void(char*, uint8_t*, unsigned int)> callback #else #define MQTT_CALLBACK_SIGNATURE void (*callback)(char*, uint8_t*, unsigned int) #endif
函數說明:
/** * 處理消息以及保持心跳 */ boolean PubSubClient::loop() { if (connected()) { unsigned long t = millis(); if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { if (pingOutstanding) { this->_state = MQTT_CONNECTION_TIMEOUT; _client->stop(); return false; } else { /*** PINGREQ——心跳請求 ****/ buffer[0] = MQTTPINGREQ; buffer[1] = 0; _client->write(buffer,2); lastOutActivity = t; lastInActivity = t; pingOutstanding = true; } } if (_client->available()) { uint8_t llen; uint16_t len = readPacket(&llen); uint16_t msgId = 0; uint8_t *payload; if (len > 0) { lastInActivity = t; uint8_t type = buffer[0]&0xF0; if (type == MQTTPUBLISH) { //服務端發佈消息 if (callback) { uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; /* topic length in bytes */ memmove(buffer+llen+2,buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */ buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */ char *topic = (char*) buffer+llen+2; // msgId only present for QOS>0 if ((buffer[0]&0x06) == MQTTQOS1) { msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1]; payload = buffer+llen+3+tl+2; callback(topic,payload,len-llen-3-tl-2); //客戶端發佈應答 buffer[0] = MQTTPUBACK; buffer[1] = 2; buffer[2] = (msgId >> 8); buffer[3] = (msgId & 0xFF); _client->write(buffer,4); lastOutActivity = t; } else { payload = buffer+llen+3+tl; callback(topic,payload,len-llen-3-tl); } } } else if (type == MQTTPINGREQ) { /**** PINGRESP-心跳響應 ****/ buffer[0] = MQTTPINGRESP; buffer[1] = 0; _client->write(buffer,2); } else if (type == MQTTPINGRESP) { /**** PINGRESP-心跳響應 ****/ pingOutstanding = false; } } else if (!connected()) { // readPacket has closed the connection return false; } } return true; } return false; }
上面的發佈消息方法都是小數據發佈,何爲小數據呢?直接看buffer大小吧:
// MQTT_MAX_PACKET_SIZE : Maximum packet size #ifndef MQTT_MAX_PACKET_SIZE #define MQTT_MAX_PACKET_SIZE 128 #endif
默認的buffer是128字節,固然博主不推薦你們發送大量數據。
若是要發佈稍微大一點的數據,就得用到三部曲方法。
函數說明:
/** * 發佈大數據(第1步) —— 只是固定報頭 + 剩餘長度 * @param topic 主題 * @param plength 負載內容長度 * @param retained 是否保持 * * @Note 這裏仍是用到buffer[128] */ boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) { if (connected()) { // Send the header and variable length field uint16_t length = MQTT_MAX_HEADER_SIZE; length = writeString(topic,buffer,length); uint8_t header = MQTTPUBLISH; if (retained) { header |= 1; } size_t hlen = buildHeader(header, buffer, plength+length-MQTT_MAX_HEADER_SIZE); uint16_t rc = _client->write(buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen)); lastOutActivity = millis(); return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen))); } return false; }
函數說明:
/** * 發佈大數據(第2步) —— 有效負載 * @param buffer 內容 * @param size 負載內容長度 * * @Note 這裏仍是用到buffer[128] */ size_t PubSubClient::write(const uint8_t *buffer, size_t size) { lastOutActivity = millis(); return _client->write(buffer,size); }
函數說明:
/** * 發佈大數據(第3步) 感受沒什麼用的方法.... */ int PubSubClient::endPublish() { return 1; }
函數說明:
//狀態定義 // Possible values for client.state() #define MQTT_CONNECTION_TIMEOUT -4 #define MQTT_CONNECTION_LOST -3 #define MQTT_CONNECT_FAILED -2 #define MQTT_DISCONNECTED -1 #define MQTT_CONNECTED 0 #define MQTT_CONNECT_BAD_PROTOCOL 1 #define MQTT_CONNECT_BAD_CLIENT_ID 2 #define MQTT_CONNECT_UNAVAILABLE 3 #define MQTT_CONNECT_BAD_CREDENTIALS 4 #define MQTT_CONNECT_UNAUTHORIZED 5 /** * 獲取Mqtt客戶端當前狀態 */ int PubSubClient::state() { return this->_state; }
工具講完了,咱們直接看庫自帶的示例(後面博主會結合OneNet來使用MQTT 敬請期待)。
案例說明:
案例代碼:
#include <ESP8266WiFi.h> #include <PubSubClient.h> // Update these with values suitable for your network. const char* ssid = "........";//wifi帳號 const char* password = "........";//wifi祕密 const char* mqtt_server = "broker.mqtt-dashboard.com";//mqtt服務器 WiFiClient espClient; PubSubClient client(espClient); long lastMsg = 0; char msg[50]; int value = 0; void setup_wifi() { delay(10); // We start by connecting to a WiFi network Serial.println(); Serial.print("Connecting to "); Serial.println(ssid); WiFi.begin(ssid, password); while (WiFi.status() != WL_CONNECTED) { delay(500); Serial.print("."); } randomSeed(micros()); Serial.println(""); Serial.println("WiFi connected"); Serial.println("IP address: "); Serial.println(WiFi.localIP()); } /** * 消息回調 */ void callback(char* topic, byte* payload, unsigned int length) { Serial.print("Message arrived ["); Serial.print(topic); Serial.print("] "); for (int i = 0; i < length; i++) { Serial.print((char)payload[i]); } Serial.println(); // Switch on the LED if an 1 was received as first character if ((char)payload[0] == '1') { digitalWrite(BUILTIN_LED, LOW); // Turn the LED on (Note that LOW is the voltage level // but actually the LED is on; this is because // it is active low on the ESP-01) } else { digitalWrite(BUILTIN_LED, HIGH); // Turn the LED off by making the voltage HIGH } } /** * 斷開重連 */ void reconnect() { // Loop until we're reconnected while (!client.connected()) { Serial.print("Attempting MQTT connection..."); // Create a random client ID String clientId = "ESP8266Client-"; clientId += String(random(0xffff), HEX); // Attempt to connect if (client.connect(clientId.c_str())) { Serial.println("connected"); // Once connected, publish an announcement... client.publish("outTopic", "hello world"); // ... and resubscribe client.subscribe("inTopic"); } else { Serial.print("failed, rc="); Serial.print(client.state()); Serial.println(" try again in 5 seconds"); // Wait 5 seconds before retrying delay(5000); } } } void setup() { pinMode(BUILTIN_LED, OUTPUT); // Initialize the BUILTIN_LED pin as an output Serial.begin(115200); setup_wifi(); //配置mqtt服務器地址和端口 client.setServer(mqtt_server, 1883); //設置訂閱消息回調 client.setCallback(callback); } void loop() { //重連機制 if (!client.connected()) { reconnect(); } //不斷監聽信息 client.loop(); long now = millis(); if (now - lastMsg > 2000) { //每2s發佈一次信息 lastMsg = now; ++value; snprintf (msg, 50, "hello world #%ld", value); Serial.print("Publish message: "); Serial.println(msg); client.publish("outTopic", msg); } }
案例說明:
案例代碼:
#include <ESP8266WiFi.h> #include <PubSubClient.h> const char* ssid = "........"; const char* password = "........"; const char* mqtt_server = "broker.mqtt-dashboard.com"; WiFiClient espClient; void callback(char* topic, byte* payload, unsigned int length) { // handle message arrived } PubSubClient client(mqtt_server, 1883, callback, espClient); void setup_wifi() { delay(10); // We start by connecting to a WiFi network Serial.println(); Serial.print("Connecting to "); Serial.println(ssid); WiFi.begin(ssid, password); while (WiFi.status() != WL_CONNECTED) { delay(500); Serial.print("."); } Serial.println(""); Serial.println("WiFi connected"); Serial.println("IP address: "); Serial.println(WiFi.localIP()); } void setup() { setup_wifi(); // Note - the default maximum packet size is 128 bytes. If the // combined length of clientId, username and password exceed this, // you will need to increase the value of MQTT_MAX_PACKET_SIZE in // PubSubClient.h if (client.connect("arduinoClient", "testuser", "testpass")) { client.publish("outTopic","hello world"); client.subscribe("inTopic"); } } void loop() { client.loop(); }
案例說明:
案例代碼:
#include <ESP8266WiFi.h> #include <PubSubClient.h> // Update these with values suitable for your network. const char* ssid = "........"; const char* password = "........"; const char* mqtt_server = "broker.mqtt-dashboard.com"; WiFiClient espClient; PubSubClient client(espClient); void setup_wifi() { delay(10); // We start by connecting to a WiFi network Serial.println(); Serial.print("Connecting to "); Serial.println(ssid); WiFi.begin(ssid, password); while (WiFi.status() != WL_CONNECTED) { delay(500); Serial.print("."); } randomSeed(micros()); Serial.println(""); Serial.println("WiFi connected"); Serial.println("IP address: "); Serial.println(WiFi.localIP()); } void callback(char* topic, byte* payload, unsigned int length) { Serial.print("Message arrived ["); Serial.print(topic); Serial.print("] "); for (int i = 0; i < length; i++) { Serial.print((char)payload[i]); } Serial.println(); // Find out how many bottles we should generate lyrics for String topicStr(topic); int bottleCount = 0; // assume no bottles unless we correctly parse a value from the topic if (topicStr.indexOf('/') >= 0) { // The topic includes a '/', we'll try to read the number of bottles from just after that topicStr.remove(0, topicStr.indexOf('/')+1); // Now see if there's a number of bottles after the '/' bottleCount = topicStr.toInt(); } if (bottleCount > 0) { // Work out how big our resulting message will be int msgLen = 0; for (int i = bottleCount; i > 0; i--) { String numBottles(i); msgLen += 2*numBottles.length(); if (i == 1) { msgLen += 2*String(" green bottle, standing on the wall\n").length(); } else { msgLen += 2*String(" green bottles, standing on the wall\n").length(); } msgLen += String("And if one green bottle should accidentally fall\nThere'll be ").length(); switch (i) { case 1: msgLen += String("no green bottles, standing on the wall\n\n").length(); break; case 2: msgLen += String("1 green bottle, standing on the wall\n\n").length(); break; default: numBottles = i-1; msgLen += numBottles.length(); msgLen += String(" green bottles, standing on the wall\n\n").length(); break; }; } // 顯示開始發佈大數據 client.beginPublish("greenBottles/lyrics", msgLen, false); for (int i = bottleCount; i > 0; i--) { for (int j = 0; j < 2; j++) { client.print(i); if (i == 1) { client.print(" green bottle, standing on the wall\n"); } else { client.print(" green bottles, standing on the wall\n"); } } client.print("And if one green bottle should accidentally fall\nThere'll be "); switch (i) { case 1: client.print("no green bottles, standing on the wall\n\n"); break; case 2: client.print("1 green bottle, standing on the wall\n\n"); break; default: client.print(i-1); client.print(" green bottles, standing on the wall\n\n"); break; }; } // 發佈完畢 client.endPublish(); } } /** * 重連機制 */ void reconnect() { // Loop until we're reconnected while (!client.connected()) { Serial.print("Attempting MQTT connection..."); // Create a random client ID String clientId = "ESP8266Client-"; clientId += String(random(0xffff), HEX); // Attempt to connect if (client.connect(clientId.c_str())) { Serial.println("connected"); // Once connected, publish an announcement... client.publish("outTopic", "hello world"); // ... and resubscribe client.subscribe("greenBottles/#"); } else { Serial.print("failed, rc="); Serial.print(client.state()); Serial.println(" try again in 5 seconds"); // Wait 5 seconds before retrying delay(5000); } } } void setup() { pinMode(BUILTIN_LED, OUTPUT); // Initialize the BUILTIN_LED pin as an output Serial.begin(115200); setup_wifi(); client.setServer(mqtt_server, 1883); client.setCallback(callback); } void loop() { if (!client.connected()) { reconnect(); } client.loop(); }
整體來講,MQTT協議簡單,很是容易上手使用,但願讀者也能就着協議理解一下源碼知識,敬請期待OneNet篇關於MQTT的使用。