nginx 事件模塊簡單剖析

咱們這裏以單進程啓動爲例
nginx.c中的main 函數調用ngx_single_process_cyclenginx

這個函數回循環調用數據結構

ngx_process_cycle.c 中的多線程

for ( ;; ) {
  ....
  ngx_process_events_and_timers
  ....
}

事件循環的核心函數是 ngx_process_events_and_timers 。這個函數主要乾了四件 事情:搶佔 accept mutex,等待並分發事件,處理 accept 事件,處理其餘io事件框架

咱們這裏只介紹等待分發事件
ngx_event.c 中的函數

(void) ngx_process_events(cycle, timer, flags);

這裏開始 wait並分發事件, 咱們來能夠來看一下這個函數
能夠看到在 ngx_event.h 中的一個宏post

#define ngx_process_events   ngx_event_actions.process_events

咱們來看一下 ngx_event_actions 這個數據結構ui

typedef struct {
    /*
    添加事件方法,它將負責把1個感興趣的事件添加到操做系統提供的事件驅動機制(如epoll,kqueue等)中,
    這樣,在事件發生以後,將能夠在調用下面的process_envets時獲取這個事件。
    */
    ngx_int_t  (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
    /*
    刪除事件方法,它將一個已經存在於事件驅動機制中的事件一出,這樣之後即便這個事件發生,調用process_events方法時也沒法再獲取這個事件
    */
    ngx_int_t  (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);

    /*
    啓用一個事件,目前事件框架不會調用這個方法,大部分事件驅動模塊對於該方法的實現都是與上面的add方法徹底一致的
    */
    ngx_int_t  (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
    /*
    禁用一個事件,目前事件框架不會調用這個方法,大部分事件驅動模塊對於該方法的實現都是與上面的del方法一致
    */
    ngx_int_t  (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);

    /*
    向事件驅動機制中添加一個新的鏈接,這意味着鏈接上的讀寫事件都添加到事件驅動機制中了
    */
    ngx_int_t  (*add_conn)(ngx_connection_t *c);
    // 從事件驅動機制中一出一個連續的讀寫事件
    ngx_int_t  (*del_conn)(ngx_connection_t *c, ngx_uint_t flags);

    // 僅在多線程環境下會被調用,目前,nginx在產品環境下還不會以多線程方式運行。
    ngx_int_t  (*process_changes)(ngx_cycle_t *cycle, ngx_uint_t nowait);
    // 在正常的工做循環中,將經過調用process_events方法來處理事件。
    // 這個方法僅在ngx_process_events_and_timers方法中調用,它是處理,分發事件的核心
    ngx_int_t  (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer,
                   ngx_uint_t flags);

    // 初始化事件驅動模塊的方法
    ngx_int_t  (*init)(ngx_cycle_t *cycle, ngx_msec_t timer);
    // 退出事件驅動模塊前調用的方法。
    void       (*done)(ngx_cycle_t *cycle);
} ngx_event_actions_t;


extern ngx_event_actions_t   ngx_event_actions;

這個數據結構中定義了不少函數指針,this

這裏咱們的配置是操作系統

events { 
    use epoll; 
    worker_connections 1024;     #因此nginx支持的總鏈接數就等於worker_processes * worker_connections 
}

使用的是 epoll 事件模塊,在epoll 模塊初始化的時候調用線程

ngx_epoll_module.c 中的ngx_epoll_init的函數

其中給ngx_event_actions賦值

ngx_event_actions = ngx_epoll_module_ctx.actions

咱們來看下 ngx_epoll_module_ctx
結構類型是


typedef struct { // 事件模塊的名稱 ngx_str_t *name; // 在解析配置項前,這個回調方法用於建立存儲配置項參數的結構體 void *(*create_conf)(ngx_cycle_t *cycle); // 在解析配置項完成後,init_conf方法會被調用,用於綜合處理當前事件模塊感興趣的所有配置項。 char *(*init_conf)(ngx_cycle_t *cycle, void *conf); // 對於事件驅動機制,每一個事件模塊須要實現的10個抽象方法 ngx_event_actions_t actions; } ngx_event_module_t;

初始化

//epoll是個event模塊
ngx_event_module_t  ngx_epoll_module_ctx = {
    &epoll_name,
    ngx_epoll_create_conf,               /* create configuration */
    ngx_epoll_init_conf,                 /* init configuration */

    {
        ngx_epoll_add_event,             /* add an event */
        ngx_epoll_del_event,             /* delete an event */
        ngx_epoll_add_event,             /* enable an event */
        ngx_epoll_del_event,             /* disable an event */
        ngx_epoll_add_connection,        /* add an connection */
        ngx_epoll_del_connection,        /* delete an connection */
        NULL,                            /* process the changes */
        ngx_epoll_process_events,        /* process the events */
        ngx_epoll_init,                  /* init the events */
        ngx_epoll_done,                  /* done the events */
    }
};

這些事件處理函數都在 ngx_epoll_module.c 這個文件中,你們能夠看一下源碼

綜上,根據咱們的配置, ngx_event.c 中的 ngx_process_events
實際調用的是 ngx_epoll_module.c 中的 ngx_epoll_process_events

這個函數有點長,咱們找些關鍵的點看一下,

//一開始就是等待事件,最長等待時間爲timer;nginx爲事件專門用紅黑樹維護了一個計時器
    events = epoll_wait(ep, event_list, (int) nevents, timer);

全部收集到的事件都放在了event_list 中,咱們來看一下這個event_list

static struct epoll_event  *event_list;

struct epoll_event {
    uint32_t      events;
    epoll_data_t  data;
};


typedef union epoll_data {
    void         *ptr;
    int           fd;
    uint32_t      u32;
    uint64_t      u64;
} epoll_data_t;

下面就開始對這些事件就行處理, 先加鎖,

ngx_mutex_lock(ngx_posted_events_mutex);
    //循環開始處理收到的全部事件
    for (i = 0; i < events; i++) {
        c = event_list[i].data.ptr;

        instance = (uintptr_t) c & 1;
        c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);

        rev = c->read;

        if (c->fd == -1 || rev->instance != instance) {

            /*
             * the stale event from a file descriptor
             * that was just closed in this iteration
             */

            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "epoll: stale event %p", c);
            continue;
        }
        //取得發生一個事件
        revents = event_list[i].events;

        ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "epoll: fd:%d ev:%04XD d:%p",
                       c->fd, revents, event_list[i].data.ptr);
        //記錄wait的錯誤返回狀態
        if (revents & (EPOLLERR|EPOLLHUP)) {
            ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "epoll_wait() error on fd:%d ev:%04XD",
                           c->fd, revents);
        }

#if 0
        if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                          "strange epoll_wait() events fd:%d ev:%04XD",
                          c->fd, revents);
        }
#endif
        //該事件是一個讀事件,並該鏈接上註冊的讀事件是active的
        if ((revents & (EPOLLERR|EPOLLHUP))
             && (revents & (EPOLLIN|EPOLLOUT)) == 0)
        {
            /*
             * if the error events were returned without EPOLLIN or EPOLLOUT,
             * then add these flags to handle the events at least in one
             * active handler
             */

            revents |= EPOLLIN|EPOLLOUT;
        }

        if ((revents & EPOLLIN) && rev->active) {

            if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) {
                rev->posted_ready = 1;

            } else {
                rev->ready = 1;
            }
            //事件放入到相應的隊列中
            if (flags & NGX_POST_EVENTS) {
                queue = (ngx_event_t **) (rev->accept ?
                               &ngx_posted_accept_events : &ngx_posted_events);

                ngx_locked_post_event(rev, queue);

            } else {
                rev->handler(rev);
            }
        }

        wev = c->write;

        if ((revents & EPOLLOUT) && wev->active) {

            if (c->fd == -1 || wev->instance != instance) {

                /*
                 * the stale event from a file descriptor
                 * that was just closed in this iteration
                 */

                ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                               "epoll: stale event %p", c);
                continue;
            }

            if (flags & NGX_POST_THREAD_EVENTS) {
                wev->posted_ready = 1;

            } else {
                wev->ready = 1;
            }

            if (flags & NGX_POST_EVENTS) {
                ngx_locked_post_event(wev, &ngx_posted_events);

            } else {
                wev->handler(wev);
            }
        }
    }

    ngx_mutex_unlock(ngx_posted_events_mutex);

先加鎖,對event_list 中的事件循環處理,
在每一個循環中,
先獲取這個事件所在的鏈接, 而後判斷是讀事件仍是寫事件, 而後調用註冊在這個事件上的的回調函數。 讀事件的回調函數是
ngx_http_init_connection 這樣就進入了 HTTP框架處理流程

相關文章
相關標籤/搜索