MQTT C Client實現消息推送(入門指南)【轉】

MQTT C Client實現消息推送(入門指南)

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是IBM開發的一個即時通信協議,經過MQTT協議,目前已經擴展出了數十個MQTT服務器端程序,能夠經過PHP,JAVA,Python,C,C#等系統語言來向MQTT發送相關消息。隨着移動互聯網的發展,MQTT因爲開放源代碼,耗電量小等特色,將會在移動消息推送領域會有更多的貢獻,在物聯網領域,傳感器與服務器的通訊,信息的收集,MQTT均可以做爲考慮的方案之一。在將來MQTT會進入到咱們生活的各各方面。The Paho MQTT C Client is a fully fledged MQTT client written in ANSI standard C. It avoids C++ in order to be as portable as possible. A C++ layer over this library is also available in Paho.git


目錄:github

 

 

何爲MQTT?

MQTT主要用於服務端對客戶端進行消息推送,根據這個具體要求,很容易知道它包括兩個部分:客戶端、服務端。編程

MQTT消息推送是基於主題topic模式的,能夠分開來講:服務器

  • 客戶端發佈一條消息時,必須指定消息主題。(如,topic=」天氣」,payload=」北京今天霧霾好大啊~~嗚嗚」),其中topic就是主題,payload是發送的具體內容。
  • 服務端推送消息,也是基於主題的。當服務器發現有主題(如,topic=「天氣」)時,就會給全部訂閱該主題的客戶端推送payload內容。 
    • 這裏須要個前提,就是有客戶端訂閱topic=」天氣」這個主題;
    • 一旦客戶端訂閱該主題,服務端就會每收到該主題的消息,都會推送給訂閱該主題的客戶端。若是客戶端不須要關注該主題了,也就是說不想接受到這樣的推送消息了,只要取消otpic=」天氣」的主題訂閱便可。

MQTT協議是爲大量計算能力有限,且工做在低帶寬、不可靠的網絡的遠程傳感器和控制設備通信而設計的協議,它具備如下主要的幾項特性:markdown

  1. 使用發佈/訂閱消息模式,提供一對多的消息發佈,解除應用程序耦合;
  2. 對負載內容屏蔽的消息傳輸;
  3. 使用 TCP/IP 提供網絡鏈接;
  4. 有三種消息發佈服務質量: 
    • 「至多一次」,消息發佈徹底依賴底層 TCP/IP 網絡。會發生消息丟失或重複。這一級別可用於以下狀況,環境傳感器數據,丟失一次讀記錄無所謂,由於不久後還會有第二次發送。
    • 「至少一次」,確保消息到達,但消息重複可能會發生。
    • 「只有一次」,確保消息到達一次。這一級別可用於以下狀況,在計費系統中,消息重複或丟失會致使不正確的結果。(在實際編程中,只須要設置QoS值便可實現以上幾種不一樣消息發佈服務質量模式)
  5. 小型傳輸,開銷很小(固定長度的頭部是 2 字節),協議交換最小化,以下降網絡流量;
  6. 使用 Last Will 和 Testament 特性通知有關各方客戶端異常中斷的機制;

生成dll庫?混合編程?

在開始開發以前須要作一些準備工做,MQTT已經把全部的APIs封裝好了,咱們可使用它的dll庫,也能夠直接導入源碼進行混合編程,通常要求不高的話(由於不太懂得話,最好不要修改源碼)能夠直接將源碼生成dll,而後使用便可,下文就是使用該方式:網絡

git clone https://github.com/eclipse/paho.mqtt.c.gitsession

從這裏得到C Client源碼以後,能夠直接使用VS打開(我是VS2013):eclipse

這裏寫圖片描述

對於上圖的說明,下載源碼後,打開將是以上界面,包括十來個工程,這裏講解幾個:異步

  • paho-mqtt3a : 通常實際開發中就是使用這個,a表示的是異步消息推送(asynchronous)。
  • paho-mqtt3as : as表示的是 異步+加密(asynchronous+OpenSSL)。
  • paho-mqtt3c : c 表示的應該是同步(Synchronize),通常性能較差,是發送+等待模式。
  • paho-mqtt3cs : 同上,增長了一個OpenSSL而已。

這裏根據自身的須要選擇不一樣的項目生成DLL便可,右擊單個項目->生成。因爲你電腦中可能沒有OPenSSL環境,若是點擊VS工具欄中的生成解決方案,十有八九會失敗,由於它會生成全部項目的解決方案,其實你根本用不着這麼多。async

另外,上圖中沒法打開包括文件VersionInfo.h,你只須要在src文件夾中找到VersionInfo.h.in文件,去掉.in後綴->從新生成便可。

MQTT C Client實戰

瞭解更多能夠閱讀《MQTT C Client for Posix and Windows》一文,下面根據官網資料,摘錄了幾個C語言實現MQTT的小DEMO。

MQTT使用起來也十分容易,基本上就那四五個函數:MQTTClient_create(建立客戶端)、MQTTClient_connect(鏈接服務端)、MQTTClient_publishMessage(客戶端->服務端發送消息)、MQTTClient_subscribe(客戶端訂閱某個主題)等等。其中,不少異步回調函數,須要本身去實現,如,

MQTTAsync_setCallbacks(mqtt->_client, mqtt->_client, connlost, msgarrvd, NULL);

MQTTAsync_setCallbacks中,

  • connlost函數指針,是當MQTT意外斷開連接時會回調的函數,由本身實現;
  • msgarrvd函數指針,是當服務器有消息推送回來時,客戶端在此處接受服務端消息內容。

另外,就是一些函數執行是否成功的回調函數,C語言封裝回調以後,就是這麼寫法,看起來有些變扭。有興趣的能夠看《淺談C/C++回調函數(Callback)& 函數指針》文章,再瞭解如下回調函數。

mqtt->_conn_opts.onSuccess = onConnect; mqtt->_conn_opts.onFailure = onConnectFailure;

 

最後,不得不說的就是,MQTT有些發送或者是訂閱的內容時(某些函數中),在編程最好將參數中傳進來的值在內存中拷貝一份再操做,筆者當時開發時,就是由於這樣的問題,折騰了較長時間,後來在wireshark中發現數據包中根本沒有內容,才知道是因爲函數參數是指針形式,直接在異步中使用可能會發生一些未知的錯誤。

Synchronous publication example

#include "stdio.h" #include "stdlib.h" #include "string.h" #include "MQTTClient.h" #define ADDRESS "tcp://localhost:1883" #define CLIENTID "ExampleClientPub" #define TOPIC "MQTT Examples" #define PAYLOAD "Hello World!" #define QOS 1 #define TIMEOUT 10000L int main(int argc, char* argv[]) { MQTTClient client; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_deliveryToken token; int rc; MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(EXIT_FAILURE); } pubmsg.payload = PAYLOAD; pubmsg.payloadlen = strlen(PAYLOAD); pubmsg.qos = QOS; pubmsg.retained = 0; MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token); printf("Waiting for up to %d seconds for publication of %s\n" "on topic %s for client with ClientID: %s\n", (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID); rc = MQTTClient_waitForCompletion(client, token, TIMEOUT); printf("Message with delivery token %d delivered\n", token); MQTTClient_disconnect(client, 10000); MQTTClient_destroy(&client); return rc; }

 

 

Asynchronous publication example

#include "stdio.h" #include "stdlib.h" #include "string.h" #include "MQTTClient.h" #define ADDRESS "tcp://localhost:1883" #define CLIENTID "ExampleClientPub" #define TOPIC "MQTT Examples" #define PAYLOAD "Hello World!" #define QOS 1 #define TIMEOUT 10000L volatile MQTTClient_deliveryToken deliveredtoken; void delivered(void *context, MQTTClient_deliveryToken dt) { printf("Message with token value %d delivery confirmed\n", dt); deliveredtoken = dt; } int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) { int i; char* payloadptr; printf("Message arrived\n"); printf(" topic: %s\n", topicName); printf(" message: "); payloadptr = message->payload; for(i=0; i<message->payloadlen; i++) { putchar(*payloadptr++); } putchar('\n'); MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return 1; } void connlost(void *context, char *cause) { printf("\nConnection lost\n"); printf(" cause: %s\n", cause); } int main(int argc, char* argv[]) { MQTTClient client; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_deliveryToken token; int rc; MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered); if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(EXIT_FAILURE); } pubmsg.payload = PAYLOAD; pubmsg.payloadlen = strlen(PAYLOAD); pubmsg.qos = QOS; pubmsg.retained = 0; deliveredtoken = 0; MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token); printf("Waiting for publication of %s\n" "on topic %s for client with ClientID: %s\n", PAYLOAD, TOPIC, CLIENTID); while(deliveredtoken != token); MQTTClient_disconnect(client, 10000); MQTTClient_destroy(&client); return rc; }

 

 

Asynchronous subscription example

#include "stdio.h" #include "stdlib.h" #include "string.h" #include "MQTTClient.h" #define ADDRESS "tcp://localhost:1883" #define CLIENTID "ExampleClientSub" #define TOPIC "MQTT Examples" #define PAYLOAD "Hello World!" #define QOS 1 #define TIMEOUT 10000L volatile MQTTClient_deliveryToken deliveredtoken; void delivered(void *context, MQTTClient_deliveryToken dt) { printf("Message with token value %d delivery confirmed\n", dt); deliveredtoken = dt; } int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) { int i; char* payloadptr; printf("Message arrived\n"); printf(" topic: %s\n", topicName); printf(" message: "); payloadptr = message->payload; for(i=0; i<message->payloadlen; i++) { putchar(*payloadptr++); } putchar('\n'); MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return 1; } void connlost(void *context, char *cause) { printf("\nConnection lost\n"); printf(" cause: %s\n", cause); } int main(int argc, char* argv[]) { MQTTClient client; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; int rc; int ch; MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered); if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(EXIT_FAILURE); } printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n" "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS); MQTTClient_subscribe(client, TOPIC, QOS); do { ch = getchar(); } while(ch!='Q' && ch != 'q'); MQTTClient_disconnect(client, 10000); MQTTClient_destroy(&client); return rc; }
相關文章
相關標籤/搜索