MQTT開源代理Mosquitto源碼分析(訪問控制篇)

1、總體流程概覽html

GitHub下載源碼後,代理的源碼在src中,同時還用到了lib庫中的一些函數。對項目的工做流程有個大概理解是分析mosquitto的訪問控制權限的基礎,網絡上已有不少中文博客在介紹,如逍遙子,儘管比較老,可是主要結構體的意義沒有變;首先對結構體的含義有所理解對後面進一步看源碼是很是有幫助的,如struct mosquitto表明了一個客戶端,mosquitto_db表明代理內的一個倉庫來存儲各類東西。git

由於是C語言編寫,首先尋找main函數,服務器從/src/mosquitto.c中的main函數開始啓動。注意,看的時候會發現有不少宏定義(如WIN32),咱們選擇本身熟悉的一個平臺開始看就好,把其餘的摺疊掉以避免產生混亂。main函數進行了訂閱樹初始化和加載安全配置文件後,便進入mosquitto_main_loop主循環;該函數首先開始用epoll機制來監聽socket讀,以後便進入了真正的核心主循環while(run){},這裏也纔是服務器運行真正邏輯開始的地方。github

從上至下流程歸納以下:web

  1. 檢查並釋放閒置的表明客戶端結構體context;
  2. 而後經過哈希表的形式遍歷客戶端(即context),發送客戶端context隊列裏的數據包,而且把超時的斷掉。
  3. 經過epoll_wait循環處理socket事件,net__socket_accept裏接受客戶端的鏈接同時建立了該客戶端的context;loop_handle_reads_writes根據讀寫事件發送或接收數據包。
  4. packet__read會讀取全部數據包內容,而後開始調用handle__packet開始根據數據幀的協議類型分開處理,特別注意一下服務器使用的是src文件夾下的read_handle.c裏的函數,區別於客戶端使用的lib,有時候自動跳轉會坑。根據handle__packet函數裏的switch case,就能夠方便的進行更詳細的跟進。

   

    while(run){//進入主死循環
        context__free_disused(db);
#ifdef WITH_SYS_TREE
        if(db->config->sys_interval > 0){
            sys_tree__update(db, db->config->sys_interval, start_time);
        }
#endif

#ifndef WITH_EPOLL
        memset(pollfds, -1, sizeof(struct pollfd)*pollfd_max);

        pollfd_index = 0;
        for(i=0; i<listensock_count; i++){
            pollfds[pollfd_index].fd = listensock[i];
            pollfds[pollfd_index].events = POLLIN;
            pollfds[pollfd_index].revents = 0;
            pollfd_index++;
        }
#endif

        now_time = time(NULL);

        time_count = 0;
        HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){//遍歷哈希表
            if(time_count > 0){
                time_count--;
            }else{
                time_count = 1000;
                now = mosquitto_time();
            }
            context->pollfd_index = -1;

            if(context->sock != INVALID_SOCKET){
#ifdef WITH_BRIDGE
                if(context->bridge){
                    mosquitto__check_keepalive(db, context);
                    if(context->bridge->round_robin == false
                            && context->bridge->cur_address != 0
                            && context->bridge->primary_retry
                            && now > context->bridge->primary_retry){

                        if(context->bridge->primary_retry_sock == INVALID_SOCKET){
                            rc = net__try_connect(context, context->bridge->addresses[0].address,
                                    context->bridge->addresses[0].port,
                                    &context->bridge->primary_retry_sock, NULL, false);

                            if(rc == 0){
                                COMPAT_CLOSE(context->bridge->primary_retry_sock);
                                context->bridge->primary_retry_sock = INVALID_SOCKET;
                                context->bridge->primary_retry = 0;
                                net__socket_close(db, context);
                                context->bridge->cur_address = 0;
                            }
                        }else{
                            len = sizeof(int);
                            if(!getsockopt(context->bridge->primary_retry_sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
                                if(err == 0){
                                    COMPAT_CLOSE(context->bridge->primary_retry_sock);
                                    context->bridge->primary_retry_sock = INVALID_SOCKET;
                                    context->bridge->primary_retry = 0;
                                    net__socket_close(db, context);
                                    context->bridge->cur_address = context->bridge->address_count-1;
                                }else{
                                    COMPAT_CLOSE(context->bridge->primary_retry_sock);
                                    context->bridge->primary_retry_sock = INVALID_SOCKET;
                                    context->bridge->primary_retry = now+5;
                                }
                            }else{
                                COMPAT_CLOSE(context->bridge->primary_retry_sock);
                                context->bridge->primary_retry_sock = INVALID_SOCKET;
                                context->bridge->primary_retry = now+5;
                            }
                        }
                    }
                }
#endif

                /* Local bridges never time out in this fashion. */
                if(!(context->keepalive)
                        || context->bridge
                        || now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){
                    //判斷當客戶端在線時,給客戶端發送inflight的數據包
                    if(db__message_write(db, context) == MOSQ_ERR_SUCCESS){
#ifdef WITH_EPOLL
                        if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){
                            if(!(context->events & EPOLLOUT)) {
                                ev.data.fd = context->sock;
                                ev.events = EPOLLIN | EPOLLOUT;
                                if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
                                    if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
                                            log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLOUT: %s", strerror(errno));
                                    }
                                }
                                context->events = EPOLLIN | EPOLLOUT;
                            }
                            context->ws_want_write = false;
                        }
                        else{
                            if(context->events & EPOLLOUT) {
                                ev.data.fd = context->sock;
                                ev.events = EPOLLIN;
                                if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
                                    if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
                                            log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLIN: %s", strerror(errno));
                                    }
                                }
                                context->events = EPOLLIN;
                            }
                        }
#else
                        pollfds[pollfd_index].fd = context->sock;
                        pollfds[pollfd_index].events = POLLIN;
                        pollfds[pollfd_index].revents = 0;
                        if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){
                            pollfds[pollfd_index].events |= POLLOUT;
                            context->ws_want_write = false;
                        }
                        context->pollfd_index = pollfd_index;
                        pollfd_index++;
#endif
                    }else{
                        do_disconnect(db, context);
                    }
                }else{//客戶端超時
                    if(db->config->connection_messages == true){
                        if(context->id){
                            id = context->id;
                        }else{
                            id = "<unknown>";
                        }
                        log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s has exceeded timeout, disconnecting.", id);
                    }
                    /* Client has exceeded keepalive*1.5 */
                    do_disconnect(db, context);
                }
            }
        }

#ifdef WITH_BRIDGE
        time_count = 0;
        for(i=0; i<db->bridge_count; i++){
            if(!db->bridges[i]) continue;

            context = db->bridges[i];

            if(context->sock == INVALID_SOCKET){
                if(time_count > 0){
                    time_count--;
                }else{
                    time_count = 1000;
                    now = mosquitto_time();
                }
                /* Want to try to restart the bridge connection */
                if(!context->bridge->restart_t){
                    context->bridge->restart_t = now+context->bridge->restart_timeout;
                    context->bridge->cur_address++;
                    if(context->bridge->cur_address == context->bridge->address_count){
                        context->bridge->cur_address = 0;
                    }
                }else{
                    if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect)
                            || (context->bridge->start_type == bst_automatic && now > context->bridge->restart_t)){

#if defined(__GLIBC__) && defined(WITH_ADNS)
                        if(context->adns){
                            /* Connection attempted, waiting on DNS lookup */
                            rc = gai_error(context->adns);
                            if(rc == EAI_INPROGRESS){
                                /* Just keep on waiting */
                            }else if(rc == 0){
                                rc = bridge__connect_step2(db, context);
                                if(rc == MOSQ_ERR_SUCCESS){
#ifdef WITH_EPOLL
                                    ev.data.fd = context->sock;
                                    ev.events = EPOLLIN;
                                    if(context->current_out_packet){
                                        ev.events |= EPOLLOUT;
                                    }
                                    if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
                                        if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
                                                log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno));
                                        }
                                    }else{
                                        context->events = ev.events;
                                    }
#else
                                    pollfds[pollfd_index].fd = context->sock;
                                    pollfds[pollfd_index].events = POLLIN;
                                    pollfds[pollfd_index].revents = 0;
                                    if(context->current_out_packet){
                                        pollfds[pollfd_index].events |= POLLOUT;
                                    }
                                    context->pollfd_index = pollfd_index;
                                    pollfd_index++;
#endif
                                }else if(rc == MOSQ_ERR_CONN_PENDING){
                                    context->bridge->restart_t = 0;
                                }else{
                                    context->bridge->cur_address++;
                                    if(context->bridge->cur_address == context->bridge->address_count){
                                        context->bridge->cur_address = 0;
                                    }
                                    context->bridge->restart_t = 0;
                                }
                            }else{
                                /* Need to retry */
                                if(context->adns->ar_result){
                                    freeaddrinfo(context->adns->ar_result);
                                }
                                mosquitto__free(context->adns);
                                context->adns = NULL;
                                context->bridge->restart_t = 0;
                            }
                        }else{
                            rc = bridge__connect_step1(db, context);
                            if(rc){
                                context->bridge->cur_address++;
                                if(context->bridge->cur_address == context->bridge->address_count){
                                    context->bridge->cur_address = 0;
                                }
                            }else{
                                /* Short wait for ADNS lookup */
                                context->bridge->restart_t = 1;
                            }
                        }
#else
                        {
                            rc = bridge__connect(db, context);
                            context->bridge->restart_t = 0;
                            if(rc == MOSQ_ERR_SUCCESS){
                                if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
                                    context->bridge->primary_retry = now + 5;
                                }
#ifdef WITH_EPOLL
                                ev.data.fd = context->sock;
                                ev.events = EPOLLIN;
                                if(context->current_out_packet){
                                    ev.events |= EPOLLOUT;
                                }
                                if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
                                    if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
                                            log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno));
                                    }
                                }else{
                                    context->events = ev.events;
                                }
#else
                                pollfds[pollfd_index].fd = context->sock;
                                pollfds[pollfd_index].events = POLLIN;
                                pollfds[pollfd_index].revents = 0;
                                if(context->current_out_packet){
                                    pollfds[pollfd_index].events |= POLLOUT;
                                }
                                context->pollfd_index = pollfd_index;
                                pollfd_index++;
#endif
                            }else{
                                context->bridge->cur_address++;
                                if(context->bridge->cur_address == context->bridge->address_count){
                                    context->bridge->cur_address = 0;
                                }
                            }
                        }
#endif
                    }
                }
            }
        }
#endif
        now_time = time(NULL);
        if(db->config->persistent_client_expiration > 0 && now_time > expiration_check_time){
            HASH_ITER(hh_id, db->contexts_by_id, context, ctxt_tmp){
                if(context->sock == INVALID_SOCKET && context->clean_session == 0){
                    /* This is a persistent client, check to see if the
                     * last time it connected was longer than
                     * persistent_client_expiration seconds ago. If so,
                     * expire it and clean up.
                     */
                    if(now_time > context->disconnect_t+db->config->persistent_client_expiration){
                        if(context->id){
                            id = context->id;
                        }else{
                            id = "<unknown>";
                        }
                        log__printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", id);
                        G_CLIENTS_EXPIRED_INC();
                        context->clean_session = true;
                        context->state = mosq_cs_expiring;
                        do_disconnect(db, context);
                    }
                }
            }
            expiration_check_time = time(NULL) + 3600;
        }

#ifndef WIN32
        sigprocmask(SIG_SETMASK, &sigblock, &origsig);
#ifdef WITH_EPOLL
        //監聽socket事件
        fdcount = epoll_wait(db->epollfd, events, MAX_EVENTS, 100);
#else
        fdcount = poll(pollfds, pollfd_index, 100);
#endif
        sigprocmask(SIG_SETMASK, &origsig, NULL);
#else
        fdcount = WSAPoll(pollfds, pollfd_index, 100);
#endif
#ifdef WITH_EPOLL
        switch(fdcount){
        case -1:
            if(errno != EINTR){
                log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll waiting: %s.", strerror(errno));
            }
            break;
        case 0:
            break;
        default:
            //循環處理socket事件
            for(i=0; i<fdcount; i++){
                for(j=0; j<listensock_count; j++){
                    if (events[i].data.fd == listensock[j]) {
                        if (events[i].events & (EPOLLIN | EPOLLPRI)){
                            //接受客戶端的鏈接,net__socket_accept裏同時建立了該客戶端的context
                            while((ev.data.fd = net__socket_accept(db, listensock[j])) != -1){
                                ev.events = EPOLLIN;
                                if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, ev.data.fd, &ev) == -1) {
                                    log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: %s", strerror(errno));
                                }
                                context = NULL;
                                HASH_FIND(hh_sock, db->contexts_by_sock, &(ev.data.fd), sizeof(mosq_sock_t), context);
                                if(!context) {
                                    log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: no context");
                                }
                                context->events = EPOLLIN;
                            }
                        }
                        break;
                    }
                }
                if (j == listensock_count) {
                    loop_handle_reads_writes(db, events[i].data.fd, events[i].events);
                }
            }
        }
#else
        if(fdcount == -1){
            log__printf(NULL, MOSQ_LOG_ERR, "Error in poll: %s.", strerror(errno));
        }else{
            loop_handle_reads_writes(db, pollfds);

            for(i=0; i<listensock_count; i++){
                if(pollfds[i].revents & (POLLIN | POLLPRI)){
                    while(net__socket_accept(db, listensock[i]) != -1){
                    }
                }
            }
        }
#endif
#ifdef WITH_PERSISTENCE
        if(db->config->persistence && db->config->autosave_interval){
            if(db->config->autosave_on_changes){
                if(db->persistence_changes >= db->config->autosave_interval){
                    persist__backup(db, false);
                    db->persistence_changes = 0;
                }
            }else{
                if(last_backup + db->config->autosave_interval < mosquitto_time()){
                    persist__backup(db, false);
                    last_backup = mosquitto_time();
                }
            }
        }
#endif

#ifdef WITH_PERSISTENCE
        if(flag_db_backup){
            persist__backup(db, false);
            flag_db_backup = false;
        }
#endif
        if(flag_reload){
            log__printf(NULL, MOSQ_LOG_INFO, "Reloading config.");
            config__read(db, db->config, true);
            mosquitto_security_cleanup(db, true);
            mosquitto_security_init(db, true);
            mosquitto_security_apply(db);
            log__close(db->config);
            log__init(db->config);
            flag_reload = false;
        }
        if(flag_tree_print){
            sub__tree_print(db->subs, 0);
            flag_tree_print = false;
        }
#ifdef WITH_WEBSOCKETS
        for(i=0; i<db->config->listener_count; i++){
            /* Extremely hacky, should be using the lws provided external poll
             * interface, but their interface has changed recently and ours
             * will soon, so for now websockets clients are second class
             * citizens. */
            if(db->config->listeners[i].ws_context){
                libwebsocket_service(db->config->listeners[i].ws_context, 0);
            }
        }
        if(db->config->have_websockets_listener){
            temp__expire_websockets_clients(db);
        }
#endif
    }//end while(run)
View Code

 

 2、mosquitto原生權限功能安全

在mosquitto_plugin.h中惟一一次出現了對這幾個權限宏定義的說明:服務器

/*
 * Function: mosquitto_auth_acl_check
 *
 * Called by the broker when topic access must be checked. access will be one
 * of:
 *  MOSQ_ACL_SUBSCRIBE when a client is asking to subscribe to a topic string.
 *                     This differs from MOSQ_ACL_READ in that it allows you to
 *                     deny access to topic strings rather than by pattern. For
 *                     example, you may use MOSQ_ACL_SUBSCRIBE to deny
 *                     subscriptions to '#', but allow all topics in
 *                     MOSQ_ACL_READ. This allows clients to subscribe to any
 *                     topic they want, but not discover what topics are in use
 *                     on the server.
 *  MOSQ_ACL_READ      when a message is about to be sent to a client (i.e. whether
 *                     it can read that topic or not).
 *  MOSQ_ACL_WRITE     when a message has been received from a client (i.e. whether
 *                     it can write to that topic or not).
 *

 後面的解釋說明了實現時要在哪些位置檢查這個權限。執行檢查的函數是websocket

int mosquitto_acl_check(struct mosquitto_db *db, struct mosquitto *context, const char *topic, long payloadlen, void* payload, int qos, bool retain, int access)

其中context就是就是被檢查的客戶端信息,topic、payload、retain等是當前消息的屬性,access是要檢查的具體權限。經過這個函數參數的接口設計能夠猜想其是根據客戶端的context來進行檢查,也就是根據客戶端的事件(ps.否則怎麼知道要傳入哪一個context?通常都是哪一個context有行爲用哪一個吧)。那麼是否是全部消息都能找到對應的客戶端context呢?請繼續看下文分解。網絡

  • WRITE權限是當代理收到客戶端的消息時進行檢查的,特別注意,last will消息存儲在了客戶端的context裏,所以是當do_disconnect的時候代理才根據這個context發送last will消息;可是,按照這個定義retain消息顯然是不在這個權限管轄範圍內的,由於代理可能已經很早就存儲了該消息,發送的客戶端的context早已經被清楚掉了。不過,mosquitto這個項目已經添加了在publish時限制retain的功能限制,能夠在這裏看到討論。此外還有不少郵件列有相關的權限設計討論(關於subscribe權限的提出 1 2:可見做者以爲設計在消息發出時檢查是由於不用考慮通配符的問題,實現簡單,而沒有考慮撤銷問題,後來補了訂閱權限是防止訂閱通配符,也能提升效率),有時觀察這些一線牛人的討論也能從中學到不少,能夠直觀的看到這個項目的權限是如何一步一步創建起來的,爲何要這麼創建。還能看到有論文的做者利用mosquitto實現方案時與做者的討論 。
  • SUBSCRIBE權限是在客戶端訂閱時檢查,不一樣在於能夠拒絕訂閱#。可見做者沒有考慮只有這一個權限會動態撤銷有問題。
  •  READ權限是在消息即將放入客戶端context的發送隊列時進行檢查的,包括subscribe時的retained消息,每一個消息要發送的時候。這個實現的特色就容許管理員動態更新策略,取消掉客戶端接收某個主題消息的權限。

想要查看做者具體是在哪裏檢查什麼權限的能夠全局搜索這個函數在哪裏調用過。session

 

3、對於mosquitto原生權限的改進app

上節提到了,因爲權限檢查函數須要context的特色,以及retain消息是保存在訂閱樹葉子節點上的特色,致使retain消息WRITE權限檢查丟失。本節討論如何加入檢查retained消息權限的功能。先來看代理是如何處理retained消息的。

  • 代理接收並存儲retained消息:retained消息是PUBLISH發佈到服務器的(last will也可設置,傳入的是一個函數)。經過PUBLISH對應函數,能夠看到db__messages_easy_queue調用了db__message_store這個函數,將消息及各類屬性存入stored保存下來,而後調用sub__messages_queue將消息加入訂閱樹對應的結點中。最終是在subs__process將retain消息放入結點struct mosquitto__subhier *hier的retained中。
  • 代理髮布retained消息:handle__subscribe函數中,檢查完權限並加入訂閱樹中(sub__add)後,檢查該主題下是否有retained消息須要發送,調用sub__retain_queue、retain__search,再使用retain__process發送該消息。
     

因此修改思路就是在存入消息的時候,即db__message_store中,保存retained消息發送源的context(爲了複用mosquitto_acl_check);在要發送給訂閱客戶端的時候,即retain__process中,檢查發送源的權限。雖然看似簡單但仍是要考慮不少其餘因素,尤爲C語音要本身控制內存釋放與初始化,一不當心就會段錯誤。具體修改細節:

  • 1.修改mosquitto_broker_internal.h裏mosquitto結構體,加入該客戶端共註冊過多少個retained message的計數,以方便維護該客戶端context的消亡。加數:新retain消息入代理時;減:該retain消息被替代時。注意要初始化這個值!找到context第一次被建立在context.c中的context__init函數。

  • 2.在handle_publish.c中能夠看出,代理會存儲消息,使用了database.c中的db__message_store函數。應該修改這個函數,將發送源的context存入給mosquitto_msg_store。

  • 3. mosquitto_msg_store確定也要加入一個mosquitto結構體指針存儲context。在subs.c中的subs__process能夠看到若是是retain消息就把這個結構體存入當前主題結點。

  • 4. subs.c文件的retain__process,在把retained消息給客戶端以前檢查發送源權限。

  • 5. 在loop.c文件的do_disconnect函數,在調用context__add_to_disused以前檢查是否是有retained message註冊過,即檢查計數。只有對該context調用do_disconnect才能銷燬context。

  • 6. 由於怕contex最後沒有被調用do_disconnect釋放掉,因此要在保存消息被刪掉時專門檢查一下「已經不在線的客戶端是否還有retained message,沒有就減小計數,若計數爲0,且不須要恢復會話context->clean_session==true(不能影響保存會話且沒有retain消息的人),且不在線context->state= = mosq_cs_disconnected,就調用釋放函數context__add_to_disused。

  • 7.是否影響會話恢復?

  • 8.注意msg_store也有維護,如database.c中的db__msg_store_deref後要釋放掉這個消息的時候db__msg_store_remove,就減小源context的引用。(由於這時候保存的消息要被清掉了)

  • 10. 整個項目有不少地方調用了db__message_store,應該仔細考察到底何時存context!

  • 11.store message也記得初始化!全部加入的東西都要記得初始化和釋放

  • 12.一直保留着有retain消息的context,甚至保留其id,對系統有什麼影響?再有人想使用相同的id會發生什麼?區分出在線的該id和不在線的?

  • 13.當心宏定義致使代碼實際沒有加入進去。

 
補充:Mosquitto做者接收了本人上報的漏洞CVE-2018-1254六、CVE-2018-12550並在Mosquitto的1.5.6版本進行了修復,思路與本文類似,
建議直接對照1.5.5和1.5.6查看做者是怎麼改的,本文只是拋磚引玉。 轉載請註名:By Ascii0x03,2018.12.31.
https://www.cnblogs.com/ascii0x03/p/10074710.html
相關文章
相關標籤/搜索