在文章Paho - MQTT C Cient的實現中,我介紹瞭如何使用Paho開源項目建立MQTTClient_pulish客戶端。但只是簡單的介紹了使用方法,並且客戶端的結果與以前介紹的並不吻合,今天我就結合新的例子,給你們講解一下Paho使用MQTT客戶端的主要過程。
如同前面介紹的,MQTT客戶端分爲同步客戶端和異步客戶端。今天主要講解的是同步客戶端,結構仍是如同步客戶端中介紹的:windows
1.建立一個客戶端對象;
2.設置鏈接MQTT服務器的選項;
3.若是多線程(異步模式)操做被使用則設置回調函數(詳見 Asynchronous >vs synchronous client applications);
4.訂閱客戶端須要接收的任意話題;
5.重複如下操做直到結束:
a.發佈客戶端須要的任意信息;
b.處理全部接收到的信息;
6.斷開客戶端鏈接;
7.釋放客戶端使用的全部內存。服務器
好,直接上代碼,MQTT簡單的同步客戶端。session
#include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include "MQTTClient.h" #if !defined(WIN32) #include <unistd.h> #else #include <windows.h> #endif #define NUM_THREADS 2 #define ADDRESS "tcp://localhost:1883" //更改此處地址 #define CLIENTID "aaabbbccc_pub" //更改此處客戶端ID #define SUB_CLIENTID "aaabbbccc_sub" //更改此處客戶端ID #define TOPIC "topic01" //更改發送的話題 #define PAYLOAD "Hello Man, Can you see me ?!" // #define QOS 1 #define TIMEOUT 10000L #define USERNAME "test_user" #define PASSWORD "jim777" #define DISCONNECT "out" int CONNECT = 1; volatile MQTTClient_deliveryToken deliveredtoken; void delivered(void *context, MQTTClient_deliveryToken dt) { printf("Message with token value %d delivery confirmed\n", dt); deliveredtoken = dt; } int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) { int i; char* payloadptr; printf("Message arrived\n"); printf(" topic: %s\n", topicName); printf(" message: "); payloadptr = message->payload; if(strcmp(payloadptr, DISCONNECT) == 0){ printf(" \n out!!"); CONNECT = 0; } for(i=0; i<message->payloadlen; i++) { putchar(*payloadptr++); } printf("\n"); MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return 1; } void connlost(void *context, char *cause) { printf("\nConnection lost\n"); printf(" cause: %s\n", cause); } void *subClient(void *threadid){ long tid; tid = (long)threadid; printf("Hello World! It's me, thread #%ld!\n", tid); MQTTClient client; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; int rc; int ch; MQTTClient_create(&client, ADDRESS, SUB_CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.username = USERNAME; conn_opts.password = PASSWORD; MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered); if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(EXIT_FAILURE); } printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n" "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS); MQTTClient_subscribe(client, TOPIC, QOS); do { ch = getchar(); } while(ch!='Q' && ch != 'q'); MQTTClient_unsubscribe(client, TOPIC); MQTTClient_disconnect(client, 10000); MQTTClient_destroy(&client); pthread_exit(NULL); } void *pubClient(void *threadid){ long tid; tid = (long)threadid; int count = 0; printf("Hello World! It's me, thread #%ld!\n", tid); //聲明一個MQTTClient MQTTClient client; //初始化MQTT Client選項 MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; //#define MQTTClient_message_initializer { {'M', 'Q', 'T', 'M'}, 0, 0, NULL, 0, 0, 0, 0 } MQTTClient_message pubmsg = MQTTClient_message_initializer; //聲明消息token MQTTClient_deliveryToken token; int rc; //使用參數建立一個client,並將其賦值給以前聲明的client MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.username = USERNAME; conn_opts.password = PASSWORD; //使用MQTTClient_connect將client鏈接到服務器,使用指定的鏈接選項。成功則返回MQTTCLIENT_SUCCESS if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(EXIT_FAILURE); } pubmsg.payload = PAYLOAD; pubmsg.payloadlen = strlen(PAYLOAD); pubmsg.qos = QOS; pubmsg.retained = 0; while(CONNECT){ MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token); printf("Waiting for up to %d seconds for publication of %s\n" "on topic %s for client with ClientID: %s\n", (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID); rc = MQTTClient_waitForCompletion(client, token, TIMEOUT); printf("Message with delivery token %d delivered\n", token); usleep(3000000L); } MQTTClient_disconnect(client, 10000); MQTTClient_destroy(&client); } int main(int argc, char* argv[]) { pthread_t threads[NUM_THREADS]; long t; pthread_create(&threads[0], NULL, subClient, (void *)0); pthread_create(&threads[1], NULL, pubClient, (void *)1); pthread_exit(NULL); }
在代碼中,我建立了兩個線程,分別用來處理訂閱客戶端和發佈客戶端。多線程
接下來我講解一下這個簡單的客戶端,其中,大致的流程以下:
大致的流程如圖所示,在客戶端啓動以後,會啓動線程,建立一個訂閱客戶端,它會監聽消息的到達,在消息到達以後會觸發相應的回調函數以對消息進行處理;後在啓動一個線程,建立一個發送客戶端,用來發送消息的,每次發送消息以前會判斷是否要掉線,如CONNECT=0則會掉線,不然發送消息給topic01。app
如下函數完成的是訂閱的功能。異步
void *subClient(void *threadid)
第一步:聲明客戶端,並經過函數給其賦值;tcp
MQTTClient client; MQTTClient_create(&client, ADDRESS, SUB_CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
第二步:設置鏈接MQTT服務器的選項;函數
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
第三步:設置回調函數;ui
MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered); //相應的回調函數connlost,msgarrvd,delivered個人代碼中都有
第四步:使用客戶端和鏈接選項鍊接服務器;編碼
MQTTClient_connect(client, &conn_opts))
第五步訂閱話題;
MQTTClient_subscribe(client, TOPIC, QOS);
第六步一直等待,知道輸入'Q' 或'q';
do { ch = getchar(); } while(ch!='Q' && ch != 'q');
第六步一直等待,直到輸入'Q' 或'q';
do { ch = getchar(); } while(ch!='Q' && ch != 'q');
第七步取消訂閱;
MQTTClient_unsubscribe(client, TOPIC);
第八步.斷開客戶端鏈接;
MQTTClient_disconnect(client, 10000);
第九步.釋放客戶端使用的全部內存;
MQTTClient_destroy(&client);
至此,訂閱客戶端就結束了。通常訂閱客戶端的大致結構都是這樣。不一樣的是回調函數的個性化上。
如下函數完成的是發送的功能。
void *pubClient(void *threadid)
第一步:聲明客戶端,並經過函數給其賦值;
MQTTClient client; MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
第二步:設置鏈接MQTT服務器的選項;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
第三步:使用客戶端和鏈接選項鍊接服務器;
MQTTClient_connect(client, &conn_opts)
第四步設置發送消息的屬性;
pubmsg.payload = PAYLOAD; pubmsg.payloadlen = strlen(PAYLOAD); pubmsg.qos = QOS; pubmsg.retained = 0;
第五步循環發送消息;
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
第六步一直等待,當CONNECT=0時退出該客戶端;
第七步.斷開客戶端鏈接;
MQTTClient_disconnect(client, 10000);
第八步.釋放客戶端使用的全部內存;
MQTTClient_destroy(&client);
至此,發送客戶端就結束了。通常的發送客戶端大致結構也如此,但異步客戶端可能有些許不一樣,無非就是設計回調函數,而後在鏈接,斷開鏈接等時可使用回調函數作一些操做而已,具體的能夠本身研究。
爲了讓你們可以更深刻了解,我把本身學到的一些函數和結構體大體在下面講解了一下。
MQTTClient
定義:typedef void* MQTTClient;
含義:表明MQTT客戶端的句柄。成功調用MQTTClient_create()後,能夠獲得有效的客戶端句柄。
MQTTClient_connectOptions
定義:
typedef struct { char struct_id[4];//結構體的識別序列,必須爲MQTC int struct_version;//結構體版本 /** 在0,1,2,3,4,5中取值: 0-表示沒有SSL選項且沒有serverURIs; 1-表示沒有serverURIs; 2-表示沒有MQTTVersion 3-表示沒有返回值; 4-表示沒有二進制密碼選項 */ int keepAliveInterval; /** 在這段時間內沒有數據相關的消息時,客戶端發送一個很是小的MQTT「ping」消息,服務器將會確認這個消息 */ int cleansession; /** 當cleansession爲true時,會話狀態信息在鏈接和斷開鏈接時被丟棄。 將cleansession設置爲false將保留會話狀態信息 */ int reliable; /* 將該值設置爲true意味着必須完成發佈的消息(已收到確認),才能發送另外一個消息 */ MQTTClient_willOptions* will; /* 若是程序不使用最後的意願和遺囑功能,請將此指針設置爲NULL。 */ const char* username;//用戶名 const char* password;//密碼 int connectTimeout;//容許嘗試鏈接的過期時間 int retryInterval;//嘗試重連的時間 MQTTClient_SSLOptions* ssl; /* 若是程序不使用最後的ssl,請將此指針設置爲NULL。 */ int serverURIcount; char* const* serverURIs; /* 鏈接服務器的url,以protocol:// host:port爲格式 */ int MQTTVersion; /* MQTT的版本,MQTTVERSION_3_1(3),MQTTVERSION_3_1_1 (4) */ struct { const char* serverURI; int MQTTVersion; int sessionPresent; } returned; struct { int len; const void* data; } binarypwd; } MQTTClient_connectOptions;
含義:用來設置MQTTClient的鏈接選項的結構體。
MQTTClient_message
定義:
typedef struct { char struct_id[4];//結構體的識別序列,必須爲MQTM int struct_version;//結構體的版本,必須爲0 int payloadlen;//MQTT信息的長度 void* payload;//指向消息負載的指針 int qos;//服務質量 int retained;//保留標誌 int dup;dup//標誌指示這個消息是不是重複的。 只有在收到QoS1消息時纔有意義。 若是爲true,則客戶端應用程序應採起適當的措施來處理重複的消息。 int msgid;//消息標識符一般保留供MQTT客戶端和服務器內部使用。 } MQTTClient_message;
含義:表明MQTT信息的結構體。
MQTTClient_create
定義:
DLLExport int MQTTClient_create( MQTTClient * handle, const char * serverURI, const char * clientId, int persistence_type, void * persistence_context )
做用:該函數建立了一個用於鏈接到特定服務器,使用特定持久存儲的MQTT客戶端。
參數 含義 handle 指向MQTT客戶端句柄的指針。句柄被成功從函數中返回的客戶端引用所填充 serverURI 以空結尾的字符串,其指定客戶端將鏈接到的服務器。其格式爲protocol://host:port。如今的(protocol)協議必須是tcp或ssl,而host能夠指定爲IP地址或域名。例如, 要使用默認 MQTT 端口鏈接到本地計算機上運行的服務器, 請指定爲 tcp://localhost:1883。 clientId 客戶端標識符(clientId)是一個以空結尾的 UTF-8 編碼字符串,客戶端鏈接到服務器時將它傳遞過去。 persistence_type 客戶端所使用的持久類型。MQTTCLIENT_PERSISTENCE_NONE-使用內存持久化。若是客戶端運行的設備或系統出故障或關閉, 則任何正在運行的消息的當前狀態都將丟失, 甚至在 QoS1 和 QoS2 中也可能沒法傳遞某些消息; MQTTCLIENT_PERSISTENCE_DEFAULT-使用默認的持久化機制(文件系統)。正在運行消息的狀態被保存在持久存儲中,以便在乎外出現時對消息的丟失提供一些保護; MQTTCLIENT_PERSISTENCE_USER-使用程序指定的持久化實現。使用這種類型,應用程序可對持久化機制進行控制,應用程序必須實現MQTTClient_persistence 接口。 persistence_context 若是應用程序使用的是MQTTCLIENT_PERSISTENCE_NONE持久化,該參數不使用,並且值應該設置爲NULL。對於MQTTCLIENT_PERSISTENCE_DEFAULT持久化,應該設置持久化目錄的位置(若是設置爲NULL,則使用工做目錄做爲持久化目錄)。使用MQTTCLIENT_PERSISTENCE_USER持久化,則將此參數指向有效的MQTTClient_persistence結構。
MQTTClient_setCallbacks
定義:
DLLExport int MQTTClient_setCallbacks ( MQTTClient handle, void * context, MQTTClient_connectionLost * cl, MQTTClient_messageArrived * ma, MQTTClient_deliveryComplete * dc )
做用:該函數爲特定的客戶端建立回調函數。若是您的客戶端應用程序不使用特定的回調函數,請將相關參數設置爲NULL。 調用MQTTClient_setCallbacks()使客戶端進入多線程模式。 任何須要的消息確認和狀態通訊都在後臺處理,而不須要客戶端應用程序的任何干預。
注意:在調用該函數時,MQTT客戶端必須斷開鏈接。(即先要調用該函數在鏈接客戶端)。
| 參數 | 含義 |
| ---|-------------|
| handle | 指向MQTT客戶端句柄的指針。句柄被成功從函數中返回的客戶端引用所填充 |
| context| 指向任何應用程序特定上下文的指針。 上下文指針被傳遞給每一個回調函數,以提供對回調中的上下文信息的訪問。|
|cl|指向MQTTClient_connectionLost()回調函數的指針。 若是您的應用程序不處理斷開鏈接,您能夠將其設置爲NULL。|
|ma|指向MQTTClient_messageArrived()回調函數的指針。 當您調用MQTTClient_setCallbacks()時,必須指定此回調函數。|
|dc|指向MQTTClient_deliveryComplete()回調函數的指針。 若是您的應用程序同步發佈,或者您不想檢查是否成功發送,則能夠將其設置爲NULL。|
MQTTClient_connect
定義:
DLLExport int MQTTClient_connect ( MQTTClient handle, MQTTClient_connectOptions * options )
做用:此函數嘗試使用指定的選項將先前建立的客戶端鏈接到MQTT服務器。
參數 含義 handle 指向MQTT客戶端句柄的指針。句柄被成功從函數中返回的客戶端引用所填充 options 指向有效的MQTTClient_connectOptions結構的指針。
> | 返回值 |
---|---|
0 | 鏈接成功 |
1 | 拒絕鏈接:不可接受的協議版本。 |
2 | 拒絕鏈接:標識符被拒絕。 |
3 | 拒絕鏈接:服務器不可用。 |
4 | 拒絕鏈接:用戶名或密碼錯誤。 |
5 | 拒絕鏈接:未經受權。 |
6 | 保留給將來用。 |
MQTTClient_subscribe
定義:
DLLExport int MQTTClient_subscribe ( MQTTClient handle, const char * topic, int qos )
做用:此功能嘗試將客戶訂閱到單個主題,該主題可能包含通配符。 此函數還指定服務質量。
參數 含義 handle 指向MQTT客戶端句柄的指針。句柄被成功從函數中返回的客戶端引用所填充 topic 訂閱的主題,可以使用通配符。 qos 訂閱的請求服務質量
MQTTClient_publishMessage
定義:
DLLExport int MQTTClient_publishMessage ( MQTTClient handle, const char * topicName, MQTTClient_message * msg, MQTTClient_deliveryToken * dt )
做用:此功能嘗試將客戶訂閱到單個主題,該主題可能包含通配符。 此函數還指定服務質量。
參數 含義 handle 指向MQTT客戶端句柄的指針。句柄被成功從函數中返回的客戶端引用所填充 topicName 與信息相關的主題。 msg 指向有效的 MQTTClient_message 結構的指針, 其中包含要發佈消息的有效負載和屬性 dt 指向MQTTClient_deliveryToken的指針。當函數成功返回時,dt會被賦值爲表明消息的token。若是程序中沒有使用傳遞token,將其設置爲NULL。
MQTTClient_waitForCompletion
定義:
DLLExport int MQTTClient_waitForCompletion ( MQTTClient handle, MQTTClient_deliveryToken dt, unsigned long timeout )
做用:客戶端應用程序調用此函數來將主線程的執行與消息的完成發佈同步。 被調用時,MQTTClient_waitForCompletion()阻塞執行,直到消息成功傳遞或已超過指定的時間。
參數 含義 handle 指向MQTT客戶端句柄的指針。句柄被成功從函數中返回的客戶端引用所填充 dt 表明消息的MQTTClient_deliveryToken用來檢測是否成功傳遞。傳遞token由發佈函數MQTTClient_publish () 和 MQTTClient_publishMessage ()所產生。 timeout 等待的最大毫秒數。 返回值:
消息成功傳遞則返回MQTTCLIENT_SUCCESS(0) ,若是時間已過時或檢測token時出問題,則返回錯誤碼。
對paho客戶端的講解就到此結束了,若有不明白的,能夠給我留言,一塊兒討論,一塊兒進步。