Mosquitto 是一個IBM 開源pub/sub訂閱發佈協議 MQTT 的一個單機版實現(目前也只有單機版),MQTT主打輕便,比較適用於移動設備等上面,花費流量少,解析代價低。相對於XMPP等來講,簡單許多。php
MQTT採用二進制協議,而不是XMPP的XML協議,因此通常消息甚至只須要花費2個字節的大小就能夠交換信息了,對於移動開發比較有優點。html
IBM雖然開源了其MQTT消息協議,可是卻沒有開源其RSMB服務端程序,不過還好目前有比較穩定的實現可用,本文的Mosquitto是其中比較活躍的實現之一,具體在 這裏 有目前的實現列表可供選擇。web
趁着大腦尚未進入睡眠狀態記錄一下剛纔看代碼學到的東西。我下載的版本是1.2.2版,在 這裏 能夠找到 下載連接 。redis
關於 MQTT 3.1協議 自己比較簡單,42頁的PDF介紹完了,相比XMPP那長長的文檔,謝天謝地了。因爲剛看,因此不少細節都沒有深刻進去,這裏只是記錄個大概,後續有時間慢慢補好坑吧。數組
整體來講,mosquitto實現有以下幾個特色:網絡
總之,是一個比較簡單單能夠適用於通常的服務中提供pub/sub功能支持,但若是放到大量併發的系統中,能夠優化的地方還有不少。關於mosquitto的性能,暫時沒有找到官方的評測,不過在 郵件組裏面找到的一些討論 彷佛顯示其性能上限爲20W鏈接時在線的狀態,固然具體取決於業務邏輯,交互是否不少等。不過這樣的成績仍是不錯的。一臺機器能夠起多個實例的嘛。session
mosquitto.c文件main開頭調用_mosquitto_net_init初始化SSL加密的庫,而後調用mqtt3_config_init初始化配置的各個數據結構爲默認值。配置文件的解析由mqtt3_config_parse_args牽頭完成,具體配置文件解析就很少寫了,fgets一行行的讀取配置,而後設置到config全局變量中。其中包括對於監聽地址等的讀取。數據結構
而後保存pid進程號。mqtt3_db_open打開db文件多線程
int main(int argc, char *argv[]) { memset(&int_db, 0, sizeof(struct mosquitto_db)); _mosquitto_net_init(); mqtt3_config_init(&config); rc = mqtt3_config_parse_args(&config, argc, argv);//k: init && load config file, set struct members
配置讀取完後,就能夠打開監聽端口了,使用mqtt3_socket_listen打開監聽端口,並將SOCK套接字放在局部變量listensock裏面,以便後面統一使用。併發
listener_max = -1; listensock_index = 0; for(i=0; i<config.listener_count; i++){ if(mqtt3_socket_listen(&config.listeners[i])){ _mosquitto_free(int_db.contexts); mqtt3_db_close(&int_db); if(config.pid_file){ remove(config.pid_file); } return 1; } listensock_count += config.listeners[i].sock_count; listensock = _mosquitto_realloc(listensock, sizeof(int)*listensock_count); if(!listensock){ _mosquitto_free(int_db.contexts); mqtt3_db_close(&int_db); if(config.pid_file){ remove(config.pid_file); } return 1; } for(j=0; j<config.listeners[i].sock_count; j++){ if(config.listeners[i].socks[j] == INVALID_SOCKET){ _mosquitto_free(int_db.contexts); mqtt3_db_close(&int_db); if(config.pid_file){ remove(config.pid_file); } return 1; } listensock[listensock_index] = config.listeners[i].socks[j]; if(listensock[listensock_index] > listener_max){ listener_max = listensock[listensock_index]; } listensock_index++; } }
關於mqtt3_socket_listen函數也比較經典,socket(),bind(), listen()的流程,不一樣的是使用了新版的套接字信息獲取函數getaddrinfo,該函數支持IPV4和IPV6,對應用層透明,不須要處理這些信息。
mqtt3_socket_listen(struct _mqtt3_listener *listener) { snprintf(service, 10, "%d", listener->port); memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_family = PF_UNSPEC; hints.ai_flags = AI_PASSIVE; hints.ai_socktype = SOCK_STREAM; //致使下面返回多個鏈表節的的因素可能有: //hostname參數關聯的地址有多個,那麼每一個返回一個節點;好比host爲域名的時候,nslookup返回幾個ip就有幾個 //service參數指定的服務會吃多個套接字接口類型,那麼也返回多個 if(getaddrinfo(listener->host, service, &hints, &ainfo)) return INVALID_SOCKET; listener->sock_count = 0; listener->socks = NULL; for(rp = ainfo; rp; rp = rp->ai_next){ //···· sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); if(sock == -1){ strerror_r(errno, err, 256); _mosquitto_log_printf(NULL, MOSQ_LOG_WARNING, "Warning: %s", err); continue; } listener->sock_count++; listener->socks = _mosquitto_realloc(listener->socks, sizeof(int)*listener->sock_count); if(!listener->socks){ _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); return MOSQ_ERR_NOMEM; } listener->socks[listener->sock_count-1] = sock; /* Set non-blocking */ opt = fcntl(sock, F_GETFL, 0); if(bind(sock, rp->ai_addr, rp->ai_addrlen) == -1){ strerror_r(errno, err, 256); _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: %s", err); COMPAT_CLOSE(sock); return 1; } if(listen(sock, 100) == -1){ strerror_r(errno, err, 256); _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: %s", err); COMPAT_CLOSE(sock); return 1; } } freeaddrinfo(ainfo); }
打開監聽套接字後,就能夠進入消息事件循環,標準網絡服務程序的必須過程。這個由main函數調用mosquitto_main_loop啓動。mosquitto_main_loop函數主體也是一個大循環,在循環裏面進行超時檢測,事件處理,網絡讀寫等等。因爲使用poll模型,因此就須要在進行poll()等待以前準備須要監聽的套接字數組列表pollfds,效率低的地方就在這裏。
對於監聽套接字,簡單將其加入pollfds裏面,註冊POLLIN可讀事件便可。若是對於其餘跟客戶端等的鏈接,就須要多作一步操做了。若是是橋接模式,進行相應的處理,這裏暫時不介紹橋接模式,橋接模式是爲了分佈式部署加入的非標準協議,目前只有IBM rsmb和mosquitto實現了。
對於跟客戶端的鏈接,mosquitto會在poll等待以前調用mqtt3_db_message_write嘗試發送一次未發送的數據給對方,避免沒必要要的等待可能。
int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock_count, int listener_max) { memset(pollfds, -1, sizeof(struct pollfd)*pollfd_count); pollfd_index = 0; for(i=0; i<listensock_count; i++){//註冊監聽sock的pollfd可讀事件。也就是新鏈接事件 pollfds[pollfd_index].fd = listensock[i]; pollfds[pollfd_index].events = POLLIN; pollfds[pollfd_index].revents = 0; pollfd_index++; } time_count = 0; for(i=0; i<db->context_count; i++){//遍歷每個客戶端鏈接,嘗試將其加入poll數組中 if(db->contexts[i]){ //···· /* Local bridges never time out in this fashion. */ if(!(db->contexts[i]->keepalive) || db->contexts[i]->bridge || now - db->contexts[i]->last_msg_in < (time_t)(db->contexts[i]->keepalive)*3/2){ //在進入poll等待以前,先嚐試將未發送的數據發送出去 if(mqtt3_db_message_write(db->contexts[i]) == MOSQ_ERR_SUCCESS){ pollfds[pollfd_index].fd = db->contexts[i]->sock; pollfds[pollfd_index].events = POLLIN | POLLRDHUP; pollfds[pollfd_index].revents = 0; if(db->contexts[i]->current_out_packet){ pollfds[pollfd_index].events |= POLLOUT; } db->contexts[i]->pollfd_index = pollfd_index; pollfd_index++; }else{//嘗試發送失敗,鏈接出問題了 mqtt3_context_disconnect(db, db->contexts[i]); } }else{//超過1.5倍的時間,超時關閉鏈接 if(db->config->connection_messages == true){ _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s has exceeded timeout, disconnecting.", db->co ntexts[i]->id); } /* Client has exceeded keepalive*1.5 */ mqtt3_context_disconnect(db, db->contexts[i]);//關閉鏈接,清空數據,後續還能夠用.sock=INVALID_SOCKET } }else{ #endif if(db->contexts[i]->clean_session == true){ //這個鏈接上次因爲什麼緣由,掛了,設置了clean session,因此這裏直接完全清空其結構 mqtt3_context_cleanup(db, db->contexts[i], true); db->contexts[i] = NULL; }else if(db->config->persistent_client_expiration > 0){ //協議規定persistent_client的狀態必須永久保存,這裏避免鏈接永遠沒法刪除,增長這個超時選項。 //也就是若是一個客戶端斷開鏈接一段時間了,那麼咱們會主動幹掉他 /* This is a persistent client, check to see if the * last time it connected was longer than * persistent_client_expiration seconds ago. If so, * expire it and clean up. */ if(now > db->contexts[i]->disconnect_t+db->config->persistent_client_expiration){ _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", db- >contexts[i]->id); #ifdef WITH_SYS_TREE g_clients_expired++; #endif db->contexts[i]->clean_session = true; mqtt3_context_cleanup(db, db->contexts[i], true); db->contexts[i] = NULL; } } #ifdef WITH_BRIDGE }
而後先使用mqtt3_db_message_timeout_check檢測一下超時沒有收到客戶端回包確認的消息,mosquitto對於超時的消息處理,是會進行重發的。不過理論上,TCP是不須要重發的,具體見這裏: MQTT消息推送協議應用數據包超時是否須要重發? 不過,因爲mosquitto對於客戶端斷開鏈接的處理比較弱,鏈接從新創建後,使用的相關數據結構仍是相同的,所以重發其實也能夠,只是這個時候的重發,其實是在一個鏈接上沒有收到ACK回包,而後後續創建的新鏈接上進行重傳。不是在一個鏈接上重傳。可是這樣其實也有不少弊端,好比客戶端必須支持消息的持久化記錄,不然容易雙方對不上話的狀況。
int mqtt3_db_message_timeout_check(struct mosquitto_db *db, unsigned int timeout) {//循環遍歷每個鏈接的每一個消息msg,看起是否超時,若是超時,將消息狀態改成上一個狀態,從然後續觸發重發 int i; time_t threshold; enum mosquitto_msg_state new_state = mosq_ms_invalid; struct mosquitto *context; struct mosquitto_client_msg *msg; threshold = mosquitto_time() - timeout; for(i=0; i<db->context_count; i++){//遍歷每個鏈接, context = db->contexts[i]; if(!context) continue; msg = context->msgs; while(msg){//遍歷每一個msg消息,看看其狀態,若是超時了,那麼從上一個消息開始重發.其實不須要重發http://chenzhenianqing.cn/ar ticles/977.html //固然若是這個是複用了以前斷開過的鏈接,那就須要重發。可是,這個時候其實能夠重發整個消息的。否則容易出問題,客戶端難> 度大 if(msg->timestamp < threshold && msg->state != mosq_ms_queued){ switch(msg->state){ case mosq_ms_wait_for_puback: new_state = mosq_ms_publish_qos1; break; case mosq_ms_wait_for_pubrec: new_state = mosq_ms_publish_qos2; break; case mosq_ms_wait_for_pubrel: new_state = mosq_ms_send_pubrec; break; case mosq_ms_wait_for_pubcomp: new_state = mosq_ms_resend_pubrel; break; default: break; } if(new_state != mosq_ms_invalid){ msg->timestamp = mosquitto_time();//設置當前時間,下次依據來判斷超時
超時提早檢測完成後就能夠進入poll等待了。等待完成後,對於有可讀事件的鏈接,調用loop_handle_reads_writes進行事件讀寫處理,對於監聽端口的事件,使用mqtt3_socket_accept去接受新鏈接。
loop_handle_reads_writes新事件處理函數比較簡單,主體仍是循環判斷可讀可寫事件,進行相應的處理。具體很少介紹了,須要關注的是因爲是異步讀寫,因此須要記錄上次讀寫狀態,以便下次進入上下午繼續讀取數據。可寫事件由_mosquitto_packet_write完成,可讀事件由_mosquitto_packet_read完成。
新客戶端鏈接的事件則由qtt3_socket_accept完成,其會將新鏈接放在db->contexts[i]數組的某個空位置,每次都會遍歷尋找一個空的槽位放新鏈接。這裏有個小優化其實就是用hints的機制,記錄上次的查找位置,避免屢次重複的從前面找到後面。
鏈接讀寫事件處理完成後,mosquitto會檢測是否須要從新reload部分配置文件。這個由SIGHUP的信號觸發。
限於篇幅,具體的邏輯請求處理下次再介紹了。
mosquitto是一個簡單可依賴的開源MQTT實現,能支持10W左右的同時在線(未親測),單機版本,但經過bridge橋接模式支持部分分佈式,但有限;協議自己很是適合在移動設備上使用,耗電少,處理快,屬於header上帶有消息體長度的協議,這個在異步網絡事件代碼編寫時是碼農最愛的,哈哈。
對於後續的提升優化的地方,簡單記錄幾點:
參考: