【開源】一個高性能、高穩定性的跨平臺MQTT客戶端,歡迎star與fork

開源地址

github.com/jiejieTop/m…node

mqttclient

一個高性能、高穩定性的跨平臺MQTT客戶端linux

一個高性能、高穩定性的跨平臺MQTT客戶端,基於socket API之上開發,能夠在嵌入式設備(FreeRTOS/LiteOS/RT-Thread/TencentOS tiny)、Linux、Windows、Mac上使用,擁有很是簡潔的API接口,以極少的資源實現QOS2的服務質量,而且無縫銜接了mbedtls加密庫。git

優點:

  • 基於標準BSD socket之上開發,只要是兼容BSD socket的系統都可使用。github

  • 穩定:不管是掉線重連丟包重發,都是嚴格遵循MQTT協議標準執行,除此以外對大數據量的測試不管是收是發,都是很是穩定(一次發送135K數據,3秒一次),高頻測試也是很是穩定(7個主題同時收發,每秒一次,也就是1秒14個mqtt報文,服務質量QoS0、QoS一、QoS2都有)。由於做者以極少的資源設計了記錄機制,對採用QoS1服務質量的報文必須保證到達一次,當發佈的主題(qos一、qos2都適用)沒有被服務器收到時會自動重發,而對QoS2服務質量的報文保證有且只有處理一次(若是不相信它穩定性的同窗能夠本身去修改源碼,專門爲QoS2服務質量去作測試,故意不回覆PUBREC包,讓服務器重發QoS2報文,且看看客戶端是否有且只有處理一次),而對於掉線重連的穩定性,這種則是基本操做了,沒啥好說的,在自動重連後還會自動從新訂閱主題,保證主題不會丟失,所以在測試中穩定性極好。算法

  • 輕量級:整個代碼工程極其簡單,不使用mbedtls狀況下,佔用資源極少,做者曾使用esp8266模組與雲端通訊,整個工程代碼消耗的RAM不足15k(包括系統佔用的開銷,對數據的處理開銷,而這次仍是未優化的狀況下,還依舊完美保留了掉線重連的穩定性,可是對應qos一、qos2服務質量的報文則未作測試,由於STM32F103C8T6芯片資源實在是太少了,折騰不起)。編程

  • 無縫銜接mbedtls加密傳輸,讓網絡傳輸更加安全,並且接口層徹底不須要用戶理會,不管是否加密,mqttclient對用戶提供的API接口是沒有變化的,這就很好的兼容了一套代應用層的碼能夠加密傳輸也能夠不加密傳輸。api

  • 擁有極簡的API接口,總的來講,mqttclient的配置都有默認值,基本無需配置都能使用的,也能夠隨意配置,對配置都有健壯性檢測,這樣子設計的API接口也是很是簡單。安全

  • 有很是好的代碼風格與思想:整個代碼採用分層式設計,代碼實現採用異步處理的思想,下降耦合,提升性能,具體體如今什麼地方呢?很簡單,目前市面上不少MQTT客戶端發佈主題都是要阻塞等待ack,這是很是暴力的行爲,阻塞當前線程等待服務器的應答,那若是我想要發送數據怎麼辦,或者我要重複檢測數據怎麼辦,你可能會說,指定阻塞時間等待,那若是網絡延遲,ack遲遲不來,我就白等了嗎,對於qos一、qos2的服務質量怎麼辦,因此說這種仍是要異步處理的思想,我發佈主題,那我發佈出去就行了,不須要等待,對於qos一、qos2服務質量的MQTT報文,若是服務器沒收到,那我重發就能夠,這種重發也是異步的處理,徹底不會阻塞當前線程。bash

  • MQTT協議支持主題通配符「#」、「+」服務器

  • 訂閱的主題與消息處理徹底分離,讓編程邏輯更加簡單易用,用戶無需理會錯綜複雜的邏輯關係。

  • mqttclient內部已實現保活處理機制,無需用戶過多關心理會,用戶只需專心處理應用功能便可。

  • 無縫銜接salof:它是一個同步異步日誌輸出框架,在空閒時候輸出對應的日誌信息,也能夠將信息寫入flash中保存,方便調試。

  • 不對外產生依賴。

  • 使用 paho mqtt 庫

總體框架

擁有很是明確的分層框架。

總體框架

目前已實現了Linux、TencentOS tiny、FreeRTOS、RT-Thread平臺(已作成軟件包,名字爲kawaii-mqtt),除此以外TencentOS tiny的AT框架亦可使用(RAM消耗不足15K),而且穩定性極好!

平臺 代碼位置
Linux github.com/jiejieTop/m…
TencentOS tiny github.com/Tencent/Ten…
TencentOS tiny AT 框架 github.com/jiejieTop/g…
RT-Thread github.com/jiejieTop/k…
FreeRTOS github.com/jiejieTop/f…

版本

發佈版本 描述
[v1.0.0] 初次發佈,完成基本框架及其穩定性驗證
[v1.0.1] 修復主動與服務器斷開鏈接時的邏輯處理
[v1.0.2] 添加新特性——攔截器,修復一些小bug
[v1.0.3] 避免形成全局污染修改了log、list相關函數的命名

問題

歡迎以 GitHub Issues 的形式提交問題和bug報告

版權和許可

mqttclient 遵循 Apache License v2.0 開源協議。鼓勵代碼共享和尊重原做者的著做權,能夠自由的使用、修改源代碼,也能夠將修改後的代碼做爲開源或閉源軟件發佈,但必須保留原做者版權聲明

linux平臺下測試使用

安裝cmake:

sudo apt-get install cmake
複製代碼

配置

mqttclient/test/test.c文件中修改如下內容:

init_params.connect_params.network_params.network_ssl_params.ca_crt = test_ca_get();    /* CA證書 */
    init_params.connect_params.network_params.addr = "xxxxxxx";                             /* 服務器域名 */
    init_params.connect_params.network_params.port = "8883";                                /* 服務器端口號 */
    init_params.connect_params.user_name = "xxxxxxx";                                       /* 用戶名 */
    init_params.connect_params.password = "xxxxxxx";                                        /* 密碼 */
    init_params.connect_params.client_id = "xxxxxxx";                                       /* 客戶端id */
複製代碼

mbedtls

默認打開mbedtls。

salof 全稱是:Synchronous Asynchronous Log Output Framework(同步異步日誌輸出框架),它是一個同步異步日誌輸出框架,在空閒時候輸出對應的日誌信息,而且該庫與mqttclient無縫銜接。

配置對應的日誌輸出級別:

#define BASE_LEVEL (0)
#define ASSERT_LEVEL (BASE_LEVEL + 1) /* 日誌輸出級別:斷言級別(很是高優先級) */
#define ERR_LEVEL (ASSERT_LEVEL + 1) /* 日誌輸出級別:錯誤級別(高優先級) */
#define WARN_LEVEL (ERR_LEVEL + 1) /* 日誌輸出級別:警告級別(中優先級) */
#define INFO_LEVEL (WARN_LEVEL + 1) /* 日誌輸出級別:信息級別(低優先級) */
#define DEBUG_LEVEL (INFO_LEVEL + 1) /* 日誌輸出級別:調試級別(更低優先級) */

#define LOG_LEVEL WARN_LEVEL /* 日誌輸出級別 */
複製代碼

日誌其餘選項:

  • 終端帶顏色
  • 時間戳
  • 標籤

mqttclient的配置

配置mqtt等待應答列表的最大值,對於qos1 qos2服務質量有要求的能夠將其設置大一點,固然也必須資源跟得上,它主要是保證qos1 qos2的mqtt報文能準確到達服務器。

#define MQTT_ACK_HANDLER_NUM_MAX 64
複製代碼

選擇MQTT協議的版本,默認爲4,表示使用MQTT 3.1.1版本,而3則表示爲MQTT 3.1版本。

#define MQTT_VERSION 4 // 4 is mqtt 3.1.1
複製代碼

設置默認的保活時間,它主要是保證MQTT客戶端與服務器的保持活性鏈接,單位爲 秒 ,好比MQTT客戶端與服務器100S沒有發送數據了,有沒有接收到數據,此時MQTT客戶端會發送一個ping包,確認一下這個會話是否存在,若是收到服務器的應答,那麼說明這個會話仍是存在的,能夠隨時收發數據,而若是不存在了,就清除會話。

#define MQTT_KEEP_ALIVE_INTERVAL 100 // unit: second
複製代碼

默認的命令超時,它主要是用於socket讀寫超時,在MQTT初始化時能夠指定:

#define MQTT_DEFAULT_CMD_TIMEOUT 4000
複製代碼

默認主題的長度,主題是支持通配符的,若是主題太長則會被截斷:

#define MQTT_TOPIC_LEN_MAX 64
複製代碼

默認的算法數據緩衝區的大小,若是要發送大量數據則修改大一些,在MQTT初始化時能夠指定:

#define MQTT_DEFAULT_BUF_SIZE 1024
複製代碼

線程相關的配置,如線程棧,線程優先級,線程時間片等: 在linux環境下能夠是不須要理會這些參數的,而在RTOS平臺則須要配置,若是不使用mbedtls,線程棧2048字節已足夠,而使用mbedtls加密後,須要配置4096字節以上。

#define MQTT_THREAD_STACK_SIZE 2048 // 線程棧
#define MQTT_THREAD_PRIO 5 // 線程優先級
#define MQTT_THREAD_TICK 50 // 線程時間片
複製代碼

默認的重連時間間隔,當發生掉線時,會以這個時間間隔嘗試重連:

#define MQTT_RECONNECT_DEFAULT_DURATION 1000
複製代碼

其餘不須要怎麼配置的東西:

#define MQTT_MAX_PACKET_ID (0xFFFF - 1) // mqtt報文id
#define MQTT_MAX_CMD_TIMEOUT 20000 //最大的命令超時參數
#define MQTT_MIN_CMD_TIMEOUT 1000 //最小的命令超時參數
複製代碼

ps:以上參數基本不須要怎麼配置的,直接用便可~

編譯 & 運行

./build.sh
複製代碼

運行build.sh腳本後會在 ./build/bin/目錄下生成可執行文件mqtt-client,直接運行便可。

編譯成動態庫libmqttclient.so

./make-libmqttclient.sh
複製代碼

運行make-libmqttclient.sh腳本後會在 ./libmqttclient/lib目錄下生成一個動態庫文件libmqttclient.so,並安裝到系統的/usr/lib目錄下,相關頭文件已經拷貝到./libmqttclient/include目錄下,編譯應用程序的時候只須要連接動態庫便可-lmqttclient,動態庫的配置文件根據./test/mqtt_config.h配置的。

設計思想

  • 總體採用分層式設計,代碼實現採用異步設計方式,下降耦合。
  • 消息的處理使用回調的方式處理:用戶指定[訂閱的主題]與指定[消息的處理函數]
  • 不對外產生依賴

API

mqttclient擁有很是簡潔的api接口

int mqtt_keep_alive(mqtt_client_t* c);
int mqtt_init(mqtt_client_t* c, client_init_params_t* init);
int mqtt_release(mqtt_client_t* c);
int mqtt_connect(mqtt_client_t* c);
int mqtt_disconnect(mqtt_client_t* c);
int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t msg_handler);
int mqtt_unsubscribe(mqtt_client_t* c, const char* topic_filter);
int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg);
int mqtt_list_subscribe_topic(mqtt_client_t* c);
int mqtt_set_interceptor_handler(mqtt_client_t* c, interceptor_handler_t handler);
複製代碼

核心

mqtt_client_t 結構

typedef struct mqtt_client {
    unsigned short              packet_id;
    unsigned char               ping_outstanding;
    unsigned char               ack_handler_number;
    unsigned char               *read_buf;
    unsigned char               *write_buf;
    unsigned int                cmd_timeout;
    unsigned int                read_buf_size;
    unsigned int                write_buf_size;
    unsigned int                reconnect_try_duration;
    void                        *reconnect_date;
    reconnect_handler_t         reconnect_handler;
    client_state_t              client_state;
    platform_mutex_t            write_lock;
    platform_mutex_t            global_lock;
    mqtt_list_t                 msg_handler_list;
    mqtt_list_t                 ack_handler_list;
    network_t                   *network;
    platform_thread_t           *thread;
    platform_timer_t            reconnect_timer;
    platform_timer_t            last_sent;
    platform_timer_t            last_received;
    connect_params_t            *connect_params;
    interceptor_handler_t       interceptor_handler;
} mqtt_client_t;
複製代碼

該結構主要維護如下內容:

  1. 讀寫數據緩衝區read_buf、write_buf
  2. 命令超時時間cmd_timeout(主要是讀寫阻塞時間、等待響應的時間、重連等待時間)
  3. 維護ack鏈表ack_handler_list,這是異步實現的核心,全部等待響應的報文都會被掛載到這個鏈表上
  4. 維護消息處理列表msg_handler_list,這是mqtt協議必須實現的內容,全部來自服務器的publish報文都會被處理(前提是訂閱了對應的消息)
  5. 維護一個網卡接口network
  6. 維護一個內部線程thread,全部來自服務器的mqtt包都會在這裏被處理!
  7. 兩個定時器,分別是掉線重連定時器與保活定時器reconnect_timer、last_sent、last_received
  8. 一些鏈接的參數connect_params

mqttclient實現

如下是整個框架的實現方式,方便你們更容易理解mqttclient的代碼與設計思想,讓你們可以修改源碼與使用,還能夠提交pr或者issues,開源的世界期待各位大神的參與,感謝!

除此以外如下代碼的記錄機制與其超時處理機制是很是好的編程思想,你們有興趣必定要看源代碼!

初始化

int mqtt_init(mqtt_client_t* c, client_init_params_t* init) 複製代碼

主要是配置mqtt_client_t結構的相關信息,若是沒有指定初始化參數,則系統會提供默認的參數。 但鏈接部分的參數則必須指定:

init_params.connect_params.network_params.addr = "[你的mqtt服務器IP地址或者是域名]";
    init_params.connect_params.network_params.port = 1883;	//端口號
    init_params.connect_params.user_name = "jiejietop";
    init_params.connect_params.password = "123456";
    init_params.connect_params.client_id = "clientid";

    mqtt_init(&client, &init_params);
複製代碼

鏈接服務器

int mqtt_connect(mqtt_client_t* c);
複製代碼

參數只有 mqtt_client_t 類型的指針,字符串類型的主題(支持通配符"#" "+"),主題的服務質量,以及收到報文的處理函數,如不指定則有默認處理函數。鏈接服務器則是使用非異步的方式設計,由於必須等待鏈接上服務器才能進行下一步操做。

過程以下:

  1. 調用底層的鏈接函數鏈接上服務器:
c->network->connect(c->network);
複製代碼
  1. 序列化mqttCONNECT報文而且發送
MQTTSerialize_connect(c->write_buf, c->write_buf_size, &connect_data)
mqtt_send_packet(c, len, &connect_timer)
複製代碼
  1. 等待來自服務器的CONNACK報文
mqtt_wait_packet(c, CONNACK, &connect_timer)
複製代碼
  1. 鏈接成功後建立一個內部線程mqtt_yield_thread,並在合適的時候啓動它:
platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, MQTT_THREAD_STACK_SIZE, MQTT_THREAD_PRIO, MQTT_THREAD_TICK)

if (NULL != c->thread) {
    mqtt_set_client_state(c, CLIENT_STATE_CONNECTED);
    platform_thread_startup(c->thread);
    platform_thread_start(c->thread);       /* start run mqtt thread */
}
複製代碼
  1. 而對於重連來講則不會從新建立線程,直接改變客戶端狀態爲鏈接狀態便可:
mqtt_set_client_state(c, CLIENT_STATE_CONNECTED);
複製代碼

訂閱報文

int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t handler) 複製代碼

訂閱報文使用異步設計來實現的: 過程以下:

  1. 序列化訂閱報文而且發送給服務器
MQTTSerialize_subscribe(c->write_buf, c->write_buf_size, 0, mqtt_get_next_packet_id(c), 1, &topic, (int*)&qos)
mqtt_send_packet(c, len, &timer)
複製代碼
  1. 建立對應的消息處理節點,這個消息節點在收到服務器的SUBACK訂閱應答報文後會掛載到消息處理列表msg_handler_list
mqtt_msg_handler_create(topic_filter, qos, handler)
複製代碼
  1. 在發送了報文給服務器那就要等待服務器的響應了,先記錄這個等待SUBACK
mqtt_ack_list_record(c, SUBACK, mqtt_get_next_packet_id(c), len, msg_handler)
複製代碼

取消訂閱

與訂閱報文的邏輯基本差很少的~

  1. 序列化訂閱報文而且發送給服務器
MQTTSerialize_unsubscribe(c->write_buf, c->write_buf_size, 0, packet_id, 1, &topic)
mqtt_send_packet(c, len, &timer)
複製代碼
  1. 建立對應的消息處理節點,這個消息節點在收到服務器的UNSUBACK取消訂閱應答報文後將消息處理列表msg_handler_list上的已經訂閱的主題消息節點銷燬
mqtt_msg_handler_create((const char*)topic_filter, QOS0, NULL)
複製代碼
  1. 在發送了報文給服務器那就要等待服務器的響應了,先記錄這個等待UNSUBACK
mqtt_ack_list_record(c, UNSUBACK, packet_id, len, msg_handler)
複製代碼

發佈報文

int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg) 複製代碼

參數只有 mqtt_client_t 類型的指針,字符串類型的主題(支持通配符),要發佈的消息(包括服務質量消息主體)。

mqtt_message_t msg;
    
    msg.qos = 2;
    msg.payload = (void *) buf;
    
	mqtt_publish(&client, "testtopic1", &msg);
複製代碼

核心思想都差很少,過程以下:

  1. 先序列化發佈報文,而後發送到服務器
MQTTSerialize_publish(c->write_buf, c->write_buf_size, 0, msg->qos, msg->retained, msg->id,
              topic, (unsigned char*)msg->payload, msg->payloadlen);
mqtt_send_packet(c, len, &timer)
複製代碼
  1. 對於QOS0的邏輯,不作任何處理,對於QOS1和QOS2的報文則須要記錄下來,在沒收到服務器應答的時候進行重發
if (QOS1 == msg->qos) {
        rc = mqtt_ack_list_record(c, PUBACK, mqtt_get_next_packet_id(c), len, NULL);
    } else if (QOS2 == msg->qos) {
        rc = mqtt_ack_list_record(c, PUBREC, mqtt_get_next_packet_id(c), len, NULL);
    }
複製代碼
  1. 還有很是重要的一點,重發報文的MQTT報文頭部須要設置DUP標誌位,這是MQTT協議的標準,所以,在重發的時候做者直接操做了報文的DUP標誌位,由於修改DUP標誌位的函數我沒有從MQTT庫中找到,因此我封裝了一個函數,這與LwIP中的交叉存取思想是一個道理,它假設我知道MQTT報文的全部操做,因此我能夠操做它,這樣子能夠提升不少效率:
mqtt_set_publish_dup(c,1);  /* may resend this data, set the udp flag in advance */
複製代碼

內部線程

static void mqtt_yield_thread(void *arg) 複製代碼

主要是對mqtt_yield函數的返回值作處理,好比在disconnect的時候銷燬這個線程。

核心的處理函數

  1. 數據包的處理mqtt_packet_handle
static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer) 複製代碼

對不一樣的包使用不同的處理:

switch (packet_type) {
        case 0: /* timed out reading packet */
            break;

        case CONNACK:
            break;

        case PUBACK:
        case PUBCOMP:
            rc = mqtt_puback_and_pubcomp_packet_handle(c, timer);
            break;

        case SUBACK:
            rc = mqtt_suback_packet_handle(c, timer);
            break;
            
        case UNSUBACK:
            rc = mqtt_unsuback_packet_handle(c, timer);
            break;

        case PUBLISH:
            rc = mqtt_publish_packet_handle(c, timer);
            break;

        case PUBREC:
        case PUBREL:
            rc = mqtt_pubrec_and_pubrel_packet_handle(c, timer);
            break;

        case PINGRESP:
            c->ping_outstanding = 0;
            break;

        default:
            goto exit;
    }
複製代碼

而且作保活的處理:

mqtt_keep_alive(c)
複製代碼

當發生超時後

if (platform_timer_is_expired(&c->last_sent) || platform_timer_is_expired(&c->last_received)) 
複製代碼

序列號一個心跳包而且發送給服務器

MQTTSerialize_pingreq(c->write_buf, c->write_buf_size);
mqtt_send_packet(c, len, &timer);
複製代碼

當再次發生超時後,表示與服務器的鏈接已斷開,須要重連的操做,設置客戶端狀態爲斷開鏈接

mqtt_set_client_state(c, CLIENT_STATE_DISCONNECTED);
複製代碼
  1. ack鏈表的掃描,當收到服務器的報文時,對ack列表進行掃描操做
mqtt_ack_list_scan(c);
複製代碼

當超時後就銷燬ack鏈表節點:

mqtt_ack_handler_destroy(ack_handler);
複製代碼

固然下面這幾種報文則須要重發操做:(PUBACK 、PUBREC、 PUBREL 、PUBCOMP,保證QOS1 QOS2的服務質量)

if ((ack_handler->type ==  PUBACK) || (ack_handler->type ==  PUBREC) || (ack_handler->type ==  PUBREL) || (ack_handler->type ==  PUBCOMP))
	mqtt_ack_handler_resend(c, ack_handler);
複製代碼
  1. 保持活性的時間過去了,可能掉線了,須要重連操做
mqtt_try_reconnect(c);
複製代碼

重連成功後嘗試從新訂閱報文,保證恢復原始狀態~

mqtt_try_resubscribe(c)
複製代碼

發佈應答與發佈完成報文的處理

static int mqtt_puback_and_pubcomp_packet_handle(mqtt_client_t *c, platform_timer_t *timer) 複製代碼
  1. 反序列化報文
MQTTDeserialize_ack(&packet_type, &dup, &packet_id, c->read_buf, c->read_buf_size)
複製代碼
  1. 取消對應的ack記錄
mqtt_ack_list_unrecord(c, packet_type, packet_id, NULL);
複製代碼

訂閱應答報文的處理

static int mqtt_suback_packet_handle(mqtt_client_t *c, platform_timer_t *timer) 複製代碼
  1. 反序列化報文
MQTTDeserialize_suback(&packet_id, 1, &count, (int*)&granted_qos, c->read_buf, c->read_buf_size)
複製代碼
  1. 取消對應的ack記錄
mqtt_ack_list_unrecord(c, packet_type, packet_id, NULL);
複製代碼
  1. 安裝對應的訂閱消息處理函數,若是是已存在的則不會安裝
mqtt_msg_handlers_install(c, msg_handler);
複製代碼

取消訂閱應答報文的處理

static int mqtt_unsuback_packet_handle(mqtt_client_t *c, platform_timer_t *timer) 複製代碼
  1. 反序列化報文
MQTTDeserialize_unsuback(&packet_id, c->read_buf, c->read_buf_size)
複製代碼
  1. 取消對應的ack記錄,而且獲取到已經訂閱的消息處理節點
mqtt_ack_list_unrecord(c, UNSUBACK, packet_id, &msg_handler)
複製代碼
  1. 銷燬對應的訂閱消息處理函數
mqtt_msg_handler_destory(msg_handler);
複製代碼

來自服務器的發佈報文的處理

static int mqtt_publish_packet_handle(mqtt_client_t *c, platform_timer_t *timer) 複製代碼
  1. 反序列化報文
MQTTDeserialize_publish(&msg.dup, &qos, &msg.retained, &msg.id, &topic_name,
        (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->read_buf, c->read_buf_size)
複製代碼
  1. 對於QOS0、QOS1的報文,直接去處理消息
mqtt_deliver_message(c, &topic_name, &msg);
複製代碼
  1. 對於QOS1的報文,還須要發送一個PUBACK應答報文給服務器
MQTTSerialize_ack(c->write_buf, c->write_buf_size, PUBACK, 0, msg.id);
複製代碼
  1. 而對於QOS2的報文則須要發送PUBREC報文給服務器,除此以外還須要記錄PUBREL到ack鏈表上,等待服務器的發佈釋放報文,最後再去處理這個消息
MQTTSerialize_ack(c->write_buf, c->write_buf_size, PUBREC, 0, msg.id);
mqtt_ack_list_record(c, PUBREL, msg.id + 1, len, NULL)
mqtt_deliver_message(c, &topic_name, &msg);
複製代碼

說明:一旦註冊到ack列表上的報文,當具備重複的報文是不會從新被註冊的,它會經過mqtt_ack_list_node_is_exist函數判斷這個節點是否存在,主要是依賴等待響應的消息類型與msgid。

發佈收到與發佈釋放報文的處理

static int mqtt_pubrec_and_pubrel_packet_handle(mqtt_client_t *c, platform_timer_t *timer) 複製代碼
  1. 反序列化報文
MQTTDeserialize_ack(&packet_type, &dup, &packet_id, c->read_buf, c->read_buf_size)
複製代碼
  1. 產生一個對應的應答報文
mqtt_publish_ack_packet(c, packet_id, packet_type);
複製代碼
  1. 取消對應的ack記錄
mqtt_ack_list_unrecord(c, UNSUBACK, packet_id, &msg_handler)
複製代碼

開源地址

github.com/jiejieTop/m…

相關文章
相關標籤/搜索