在對MQTT的學習過程當中 一下的內容對我提供了幫助
https://www.runoob.com/w3cnote/mqtt-intro.html 對MQTT的入門級介紹 很基礎講解了什麼是MQTT
https://mosquitto.org/api/files/mosquitto-h.html 這個網站記載了幾乎全部的mosquitto的接口 你想知道的函數接口他都有 函數簡介可能看不出來什麼 點進去看一下詳細解釋仍是很清晰的html
如下的圖片可以更加直觀地說明MQTT協議的通訊方式api
根據這兩張圖片談一下我對MQTT的瞭解:
在MQTT協議中 運行了broker的纔算是真正的服務器 它掌控着全部消息的publish和subscribe 而他的客戶端程序能夠有一下三重狀況
1.運行了publisher的程序 作着publish的工做
2.運行了subscriber的程序 作着subscribe的工做
3.運行了subscriber和publisher的程序 既能夠publish也能夠subscribe 這並不意味着這樣的程序就是broker
這三種狀況你能夠根據本身實際的須要來肯定本身的客戶端能夠是哪種
例如你的客戶端僅僅是須要收到來自broker的topic爲"ycy"的消息而後解析處理 並不須要作任何的publish工做 那麼你就能夠選擇第2種客戶端服務器
個人程序的功能是從雲端獲得固定topic的消息以後就開始處理這個消息 處理完成後publish出去 因爲我在Linux機上 我必須作一個仿broker來實現從雲端獲得消息這一步網絡
首先個人預想是這樣的
session
可是這樣就會致使我每次publish出去數據 我本身又會收到一份 這樣的話我又會再次掃描這個數據 以肯定是否是須要解析處理 萬一的狀況剛好是約定的須要解析的字符(這就誤會大了)
因此 在我這裏沒有使用同一個topic 其實還有解決方法二(下文再說)less
解決方法二:
就是在my_message_callback這個函數中過濾一下 將本身publish的topic過濾一下(在我下面的代碼中並無實現 只是提供這樣的思路 strcmp函數能夠幫到不少 這個方法我另外試了一下 必定要注意多了一個'\n'字符 這是由於你輸入以後會按下回車發送 因此發送的字符中會帶有這個字符)函數
mqtt_server.coop
#include <stdio.h> #include <stdlib.h> #include <mosquitto.h> #include <string.h> #define HOST "localhost" #define PORT 1883 #define KEEP_ALIVE 60 #define MSG_MAX_SIZE 512 #define TOPIC_NUM 3 bool session = true; const static char* topic[TOPIC_NUM] = { "Gai爺:", "ycy ", "CCYY " }; void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) { if(message->payloadlen){ printf("%s %s", message->topic, (char *)message->payload); }else{ printf("%s (null)\n", message->topic); } fflush(stdout); } void my_connect_callback(struct mosquitto *mosq, void *userdata, int result) { int i; if(!result){ /* Subscribe to broker information topics on successful connect. */ mosquitto_subscribe(mosq, NULL, "CCYY ", 2); }else{ fprintf(stderr, "Connect failed\n"); } } void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos) { int i; printf("Subscribed (mid: %d): %d", mid, granted_qos[0]); for(i=1; i<qos_count; i++){ printf(", %d", granted_qos[i]); } printf("\n"); } void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str) { /* Pring all log messages regardless of level. */ printf("%s\n", str); } int main() { struct mosquitto *mosq = NULL; char buff[MSG_MAX_SIZE]; //libmosquitto 庫初始化 mosquitto_lib_init(); //建立mosquitto客戶端 mosq = mosquitto_new(NULL,session,NULL); if(!mosq){ printf("create client failed..\n"); mosquitto_lib_cleanup(); return 1; } //設置回調函數,須要時可以使用 mosquitto_log_callback_set(mosq, my_log_callback); mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_message_callback_set(mosq, my_message_callback); mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); //鏈接服務器 if(mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE)){ fprintf(stderr, "Unable to connect.\n"); return 1; } //開啓一個線程,在線程裏不停的調用 mosquitto_loop() 來處理網絡信息 int loop = mosquitto_loop_start(mosq); if(loop != MOSQ_ERR_SUCCESS) { printf("mosquitto loop error\n"); return 1; } while(fgets(buff, MSG_MAX_SIZE, stdin) != NULL) { /*發佈消息*/ mosquitto_publish(mosq,NULL,"ycy ",strlen(buff)+1,buff,0,0); memset(buff,0,sizeof(buff)); } mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return 0; }
mqtt_client.c學習
#include <stdio.h> #include <stdlib.h> #include <mosquitto.h> #include <string.h> #define HOST "localhost" #define PORT 1883 #define KEEP_ALIVE 60 #define MSG_MAX_SIZE 512 #define TOPIC_NUM 3 bool session = true; const static char* topic[TOPIC_NUM] = { "Gai爺:", "ycy ", "CCYY " }; void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) { if(message->payloadlen){ printf("%s %s", message->topic, (char *)message->payload); }else{ printf("%s (null)\n", message->topic); } fflush(stdout); } void my_connect_callback(struct mosquitto *mosq, void *userdata, int result) { int i; if(!result){ /* Subscribe to broker information topics on successful connect. */ mosquitto_subscribe(mosq, NULL, "ycy ", 2); }else{ fprintf(stderr, "Connect failed\n"); } } void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos) { int i; printf("Subscribed (mid: %d): %d", mid, granted_qos[0]); for(i=1; i<qos_count; i++){ printf(", %d", granted_qos[i]); } printf("\n"); } void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str) { /* Pring all log messages regardless of level. */ printf("%s\n", str); } int main() { struct mosquitto *mosq = NULL; char buff[MSG_MAX_SIZE]; //libmosquitto 庫初始化 mosquitto_lib_init(); //建立mosquitto客戶端 mosq = mosquitto_new(NULL,session,NULL); if(!mosq){ printf("create client failed..\n"); mosquitto_lib_cleanup(); return 1; } //設置回調函數,須要時可以使用 mosquitto_log_callback_set(mosq, my_log_callback); mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_message_callback_set(mosq, my_message_callback); mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); //鏈接服務器 if(mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE)){ fprintf(stderr, "Unable to connect.\n"); return 1; } //開啓一個線程,在線程裏不停的調用 mosquitto_loop() 來處理網絡信息 int loop = mosquitto_loop_start(mosq); if(loop != MOSQ_ERR_SUCCESS) { printf("mosquitto loop error\n"); return 1; } while(fgets(buff, MSG_MAX_SIZE, stdin) != NULL) { /*發佈消息*/ mosquitto_publish(mosq,NULL,"CCYY ",strlen(buff)+1,buff,0,0); memset(buff,0,sizeof(buff)); } mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return 0; }
makefile網站
#不是系統默認庫 要記得添加鏈接選項 all:Client @echo "" @echo "Start compiling......" @echo "" Client:Server gcc -o Client mqtt_client.c -lmosquitto -lpthread Server: gcc -o Server mqtt_server.c -lmosquitto -lpthread clean: -rm Server Client
程序運行截圖以下:
Server
Client
以上