nginx事件(ngx_event_actions_t)

事件模塊啓動流程:

  • 在ngx_init_cycle() 中初始化完配置信息後
  • work進程的啓動調用方法ngx_worker_process_cycle()
  • 在ngx_worker_process_init() 中調用模塊的 init_process() 啓動各模塊(源碼中只有ngx_event_core_module模塊的ngx_event_process_init(),其它的模塊爲NULL)
    • ngx_event_core_module->ngx_event_process_init()
    • 初始化nginx的事件列表ngx_posted_accept_events和ngx_posted_events
      • ngx_queue_init(&ngx_posted_accept_events);
      • ngx_queue_init(&ngx_posted_events);
    • 啓動事件引擎(select、poll、epoll),調用NGX_EVENT_MODULE類型模塊的module->ctx>actions->init()
      • ngx_devpoll_module模塊的 ngx_devpoll_module->ctx>actions->ngx_devpoll_init()
      • ngx_epoll_module模塊的 ngx_epoll_module->ctx>actions->ngx_epoll_init()
      • ngx_eventport_module模塊的 ngx_eventport_module->ctx>actions->ngx_eventport_init()
      • ngx_iocp_module模塊的 ngx_iocp_module->ctx>actions->ngx_iocp_init()
      • ngx_select_module模塊的 ngx_select_module->ctx>actions->ngx_select_init()
      • win32的ngx_select_module模塊的 ngx_select_module->ctx>actions->ngx_select_init()
    • 初始cycle的讀寫事件數組
    • 初始化鏈接池
    • 遍歷listen列表來,綁定空閒鏈接,並設置初始化tcp/UNIX域的accep類型I/O的read->handlerr=ngx_event_accept或ngx_event_recvmsg
    • 因爲拿不到ngx_use_accept_mutex,下面的事件添加放到了:ngx_process_events_and_timers
  • 初始化完後,work進程循環調用nginx的事件處理器ngx_process_events_and_timers()
      1. 獲取ngx_use_accept_mutexngx_trylock_accept_mutex()->ngx_enable_accept_events()->ngx_add_event(),註冊accept到I/O事件引擎上
        • 這裏調用各I/O事件引擎的addha函數以下
        • ngx_devpoll_module模塊的 ngx_devpoll_module->ctx>actions->ngx_devpoll_add_event()
        • ngx_epoll_module模塊的 ngx_epoll_module->ctx>actions->ngx_epoll_add_event()
        • ngx_eventport_module模塊的 ngx_eventport_module->ctx>actions->ngx_eventport_add_event()
        • ngx_iocp_module模塊的 ngx_iocp_module->ctx>actions->ngx_iocp_add_event()
        • ngx_select_module模塊的 ngx_select_module->ctx>actions->ngx_select_add_event()
        • win32的ngx_select_module模塊的 ngx_select_module->ctx>actions->ngx_select_add_event()
      1. ngx_process_events回調,用來處理I/O引擎的時間,即 ngx_epoll_module->ctx>actions->ngx_epoll_process_events()
        • I/O引擎完成套接字註冊事件是完成的是讀、寫事件,則封裝成nginx的read、write事件,放在nginx的全局ngx_posted_events事件池中。
        • 若是read不是NGX_READ_EVENT(即accept類的讀I/O事件)則直接調用,ngx_posted_accept_events和ngx_posted_events
      1. ngx_event_process_posted:nginx的事件處理函數,處理ngx_posted_accept_events和ngx_posted_events 事件隊列(http、mai、stream調用後設置read、write的事件處理函數)
        • ngx_event_process_posted(cycle, &ngx_posted_accept_events);
          • 執行ngx_event_accept()或ngx_event_recvmsg()
            • http模塊配置設置時的listen->heandler = ngx_http_init_connection
            • mail模塊配置設置時的listen->heandler = ngx_mail_init_connection
            • stream模塊配置設置時的listen->heandler = ngx_stream_init_connection
          • ngx_event_recvmsg()直接接受UNIX進程消息
        • ngx_event_process_posted(cycle, &ngx_posted_events),將I/O事件調加到nginx事件列表中;

結構:ngx_event_s

  • 在nginx中運行的都是event,而epoll中運行的都是套接字。全部直接將套接字和event關聯起來
struct ngx_event_s {
    
    /* 事件上下文數據,一般data都是指向ngx_connection_t鏈接對象。
     * 開啓文件異步I/O時,它可能會指向ngx_event_aio_t結構體。
     */
    void            *data;  

    /* 標誌位,爲1時表示事件是可寫的。一般它表示對應的TCP鏈接可寫,也就是鏈接處於能夠發送網絡包的狀態。*/
    unsigned         write:1;   
    
    /* 標誌位,爲1時表示爲此事件能夠創建新的鏈接。
        一般在ngx_cycle_t中的listening動態數組中,每個監聽對象ngx_listening_t,對應的讀事件中的accept標誌位纔會是1。
    */
    unsigned         accept:1;  

    /* used to detect the stale events in kqueue, rtsig, and epoll
     * 這個標誌位用於區分當前事件是否過時,它僅僅是給事件驅動模塊使用的,而事件消費模塊可不用關心。
     * 爲何須要這個標誌位呢?當開始處理一批事件時,處理前面的事件可能會關閉一些鏈接,
       而這些鏈接有可能影響這批事件中還未處理到的後面的事件,這時可經過instance來避免處理後面的過時事件。
     */
    unsigned         instance:1;

    /* the event was passed or would be passed to a kernel;
     * in aio mode - operation was posted.
     * 標誌位,爲1表示當前事件是活躍的,爲0表示事件是不活躍的。
     * 這個狀態對應着事件驅動模塊處理方式的不一樣。例如,在添加事件,刪除事件和處理事件時,
     * active標誌位的不一樣都會對應着不一樣的處理方式。在使用事件時,通常不會直接改變active標誌位。
     */
    unsigned         active:1;
    
     /* 標誌位,爲1表示禁用事件,僅在kqueue或者rtsig事件驅動模塊中有效,對於epoll事件驅動模塊則沒有意義。
     */
    unsigned         disabled:1;

    /* the ready event; in aio mode 0 means that no operation can be posted 
     * 標誌位,爲1表示當前事件準備就緒,也就是說,容許這個事件的handler處理這個事件。
     * 在HTTP框架中,常常會檢查事件的ready標誌位,以肯定是否能夠接收請求或者發送相應。
     */ 
    unsigned         ready:1;

    /* 該標誌位僅對kqueue,eventport等模塊有意義,而對於linux上的epoll事件驅動模塊則是無心義的。*/
    unsigned         oneshot:1;

    /* aio operation is complete 用於異步aio事件的處理*/
    unsigned         complete:1;
    
    /* 標誌位,eof表示當前處理的字符流已經結束,error表示事件處理過程出錯了*/
    unsigned         eof:1;
    unsigned         error:1;

    /* 標誌位,爲1表示這個事件超時,用以提示handler作超時處理,它與timer_set都用了定時器 */
    unsigned         timedout:1;
    unsigned         timer_set:1;
    
     /* 標誌位,delayed爲1表示須要延遲處理這個事件,它僅用於限速功能 */
    unsigned         delayed:1;

    /* 標誌位,爲1表示延遲創建TCP鏈接,也就是TCP三次握手後並不創建鏈接,而是等到真正收到數據包後才建鏈接 */
    unsigned         deferred_accept:1;

    /* the pending eof reported by kqueue, epoll or in aio chain operation */
    /* 標誌位,爲1表示等待字符流結束,它只與kqueue和aio事件驅動機制有關 */
    unsigned         pending_eof:1;
    //接受、讀、寫
    unsigned         posted:1;
    //關閉
    unsigned         closed:1;

    /* to test on worker exit */
    unsigned         channel:1;
    unsigned         resolver:1;

    unsigned         cancelable:1;

#if (NGX_HAVE_KQUEUE)
    unsigned         kq_vnode:1;

    /* the pending errno reported by kqueue */
    int              kq_errno;
#endif

    /*
     * kqueue only:
     *   accept:     number of sockets that wait to be accepted
     *   read:       bytes to read when event is ready
     *               or lowat when event is set with NGX_LOWAT_EVENT flag
     *   write:      available space in buffer when event is ready
     *               or lowat when event is set with NGX_LOWAT_EVENT flag
     *
     * epoll with EPOLLRDHUP:
     *   accept:     1 if accept many, 0 otherwise
     *   read:       1 if there can be data to read, 0 otherwise
     *
     * iocp: TODO
     *
     * otherwise:
     *   accept:     1 if accept many, 0 otherwise
     */

#if (NGX_HAVE_KQUEUE) || (NGX_HAVE_IOCP)
    int              available;
#else
    unsigned         available:1;
#endif
    
    /* 這個事件發生時的處理方法 */
    ngx_event_handler_pt  handler;


#if (NGX_HAVE_IOCP)
    ngx_event_ovlp_t ovlp;
#endif

    ngx_uint_t       index;

    ngx_log_t       *log;

    ngx_rbtree_node_t   timer;

    /* the posted queue */
    ngx_queue_t      queue;

#if 0

    /* the threads support */

    /*
     * the event thread context, we store it here
     * if $(CC) does not understand __thread declaration
     * and pthread_getspecific() is too costly
     */

    void            *thr_ctx;

#if (NGX_EVENT_T_PADDING)

    /* event should not cross cache line in SMP */

    uint32_t         padding[NGX_EVENT_T_PADDING];
#endif
#endif
};

結構:ngx_event_aio_s

struct ngx_event_aio_s {
    void                      *data;
    ngx_event_handler_pt       handler;
    ngx_file_t                *file;

    ngx_fd_t                   fd;

#if (NGX_HAVE_AIO_SENDFILE || NGX_COMPAT)
    ssize_t                  (*preload_handler)(ngx_buf_t *file);
#endif

#if (NGX_HAVE_EVENTFD)
    int64_t                    res;
#endif

#if !(NGX_HAVE_EVENTFD) || (NGX_TEST_BUILD_EPOLL)
    ngx_err_t                  err;
    size_t                     nbytes;
#endif

    ngx_aiocb_t                aiocb;
    ngx_event_t                event;
};

結構:ngx_event_actions_t:nginx的epoll模塊處理函數封裝結構。

  • 主要用來將 epoll事件處理成nginx事件,nginx事件處理成 epoll 事件
typedef struct {
    /* 添加/移出事件方法,負責把事件添加/移出到操做系統提供的事件驅動機制(如epoll,kqueue等)中,
      這樣在事件發生以後,將能夠/沒法調用下面的process_envets時獲取這個事件。
    */
    ngx_int_t  (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
    ngx_int_t  (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);

    /* 啓用/禁用一個事件,目前事件框架不會調用,大部分事件驅動模塊對該方法的實現都與add/del徹底一致 *
    ngx_int_t  (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
    ngx_int_t  (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);

     /* 向事件驅動機制中添加/移除一個新的鏈接,這意味着鏈接上的讀寫事件都添加到/移出事件驅動機制中了 */
    ngx_int_t  (*add_conn)(ngx_connection_t *c);
    ngx_int_t  (*del_conn)(ngx_connection_t *c, ngx_uint_t flags);

    ngx_int_t  (*notify)(ngx_event_handler_pt handler);
    
    'nginx的核心事件,將epoll事件封裝到nginx事件列表中'
    '啓動nginx的事件列表'
    /* 在正常的工做循環中,將經過調用process_events方法來處理事件。
     * 這個方法僅在ngx_process_events_and_timers方法中調用,它是處理分發事件的核心。*/
    ngx_int_t  (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer,
                                 ngx_uint_t flags);

    /* 初始化和退出事件驅動模塊的方法:初始化epoll */
    ngx_int_t  (*init)(ngx_cycle_t *cycle, ngx_msec_t timer);
    void       (*done)(ngx_cycle_t *cycle);
} ngx_event_actions_t;
事件進程啓動命令:ngx_event_process_init:
  • 初始化全部的connection結構體鏈表,放在cycle->freee_connections 中
  • 從cycle->freee_connections獲取空閒鏈接,綁定listen數組中的套接字
  • 根據listen->type來,初始化tcp/UNIX域的accep類型I/O的read->handler =ngx_event_accept?ngx_event_recvmsg
  • 因爲拿不到ngx_use_accept_mutex,下面的事件添加放到了:ngx_process_events_and_timers中
    • 經過ngx_add_event()將這個鏈接,以accept的類型註冊放到選擇的I/O事件引擎上**(epoll、select、poll)**
/**
    模塊的進程啓動函數
*/
static ngx_int_t
ngx_event_process_init(ngx_cycle_t *cycle)
{
    ngx_uint_t           m, i;
    ngx_event_t         *rev, *wev;
    ngx_listening_t     *ls;
    ngx_connection_t    *c, *next, *old;
    ngx_core_conf_t     *ccf;
    ngx_event_conf_t    *ecf;
    ngx_event_module_t  *module;

    ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
    ecf = ngx_event_get_conf(cycle->conf_ctx, ngx_event_core_module);

    if (ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex) {
        ngx_use_accept_mutex = 1;
        ngx_accept_mutex_held = 0;
        ngx_accept_mutex_delay = ecf->accept_mutex_delay;

    } else {
        ngx_use_accept_mutex = 0;
    }

    //初始化nginx的事件列表
    ngx_queue_init(&ngx_posted_accept_events);
    ngx_queue_init(&ngx_posted_events);

    if (ngx_event_timer_init(cycle->log)  NGX_ERROR) {
        return NGX_ERROR;
    }
    //初始化 NGX_EVENT_MODULE類型模塊的事件上下文:
    // epoll事件引擎:epoll_create
    for (m = 0; cycle->modules[m]; m++) {
        if (cycle->modules[m]->type != NGX_EVENT_MODULE) {
            continue;
        }

        if (cycle->modules[m]->ctx_index != ecf->use) {
            continue;
        }

        module = cycle->modules[m]->ctx;

        if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) {
            /* fatal */
            exit(2);
        }

        break;
    }


    if (ngx_timer_resolution && !(ngx_event_flags & NGX_USE_TIMER_EVENT)) {
        ngx_log_error(NGX_LOG_WARN, cycle->log, 0,
                      "the \"timer_resolution\" directive is not supported "
                      "with the configured event method, ignored");
        ngx_timer_resolution = 0;
    }


    cycle->connections =
        ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log);
    if (cycle->connections  NULL) {
        return NGX_ERROR;
    }

    c = cycle->connections;

    cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n,
                                   cycle->log);
    if (cycle->read_events  NULL) {
        return NGX_ERROR;
    }

    rev = cycle->read_events;
    for (i = 0; i < cycle->connection_n; i++) {
        rev[i].closed = 1;
        rev[i].instance = 1;
    }

    cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n,
                                    cycle->log);
    if (cycle->write_events  NULL) {
        return NGX_ERROR;
    }

    wev = cycle->write_events;
    for (i = 0; i < cycle->connection_n; i++) {
        wev[i].closed = 1;
    }

    i = cycle->connection_n;
    next = NULL;

    do {
        i--;

        c[i].data = next;
        c[i].read = &cycle->read_events[i];
        c[i].write = &cycle->write_events[i];
        c[i].fd = (ngx_socket_t) -1;

        next = &c[i];
    } while (i);

    cycle->free_connections = next;
    cycle->free_connection_n = cycle->connection_n;

    /* for each listening socket */
    ls = cycle->listening.elts;
    for (i = 0; i < cycle->listening.nelts; i++) {


        c = ngx_get_connection(ls[i].fd, cycle->log);

        if (c  NULL) {
            return NGX_ERROR;
        }

        c->type = ls[i].type;
        c->log = &ls[i].log;

        c->listening = &ls[i];
        ls[i].connection = c;

        rev = c->read;

        rev->log = c->log;
        rev->accept = 1;
        if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) {
            if (ls[i].previous) {
                old = ls[i].previous->connection;
                if (ngx_del_event(old->read, NGX_READ_EVENT, NGX_CLOSE_EVENT)
                     NGX_ERROR)
                {
                    return NGX_ERROR;
                }

                old->fd = (ngx_socket_t) -1;
            }
        }
        '初始化tcp/UNIX域的accept事件'
        rev->handler = (c->type  SOCK_STREAM) ? ngx_event_accept
                                                : ngx_event_recvmsg;
        '獲取ngx_use_accept_mutex'
        if (ngx_use_accept_mutex) { 
            continue;
        }
        '獲取ngx_use_accept_mutex成功後,將該套接字註冊到I/O引擎上'
        if (ngx_add_event(rev, NGX_READ_EVENT, 0)  NGX_ERROR) {
            return NGX_ERROR;
        }
    return NGX_OK;
}
work的事件處理函數(nginx的事件處理模塊):ngx_process_events_and_timers
  • ngx_trylock_accept_mutex 當獲取到標誌位後才進行** accept 事件註冊**node

  • ngx_process_events 處理事件linux

  • 釋放 accept_mutex 鎖nginx

  • ngx_event_process_posted 處理 posted 隊列的事件數組

  • ngx_process_events_and_timers()網絡

/**
    nginx的事件初始器
*/

void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
    ngx_uint_t  flags;
    ngx_msec_t  timer, delta;

    if (ngx_timer_resolution) {
        timer = NGX_TIMER_INFINITE;
        flags = 0;

    } else {
        timer = ngx_event_find_timer();
        flags = NGX_UPDATE_TIME;
    }

    if (ngx_use_accept_mutex) {
        if (ngx_accept_disabled > 0) {
            ngx_accept_disabled--;

        } else {
            //獲取ngx_use_accept_mutex,將套接字註冊到I/O事件引擎上,事件類型accept
            if (ngx_trylock_accept_mutex(cycle)  NGX_ERROR) {
                return;
            }

            if (ngx_accept_mutex_held) {
                flags |= NGX_POST_EVENTS;

            } else {
                if (timer  NGX_TIMER_INFINITE
                    || timer > ngx_accept_mutex_delay)
                {
                    timer = ngx_accept_mutex_delay;
                }
            }
        }
    }

    delta = ngx_current_msec;

    '調用處理好的 read、write放到ngx_posted_events中,等待nginx處理'
    (void) ngx_process_events(cycle, timer, flags); 

    delta = ngx_current_msec - delta;

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "timer delta: %M", delta);

    //處理nginx事件隊列中的事件
    ngx_event_process_posted(cycle, &ngx_posted_accept_events);

    if (ngx_accept_mutex_held) {
        ngx_shmtx_unlock(&ngx_accept_mutex);
    }

    if (delta) {
        ngx_event_expire_timers();
    }

    ngx_event_process_posted(cycle, &ngx_posted_events);
}
  • ngx_trylock_accept_mutex():調用
/**
    獲取ngx_accept_mutex鎖,添加事件
*/
ngx_int_t
ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
{
    if (ngx_shmtx_trylock(&ngx_accept_mutex)) {

        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "accept mutex locked");

        if (ngx_accept_mutex_held && ngx_accept_events  0) {
            return NGX_OK;
        }

        if (ngx_enable_accept_events(cycle)  NGX_ERROR) {
            ngx_shmtx_unlock(&ngx_accept_mutex);
            return NGX_ERROR;
        }

        ngx_accept_events = 0;
        ngx_accept_mutex_held = 1;

        return NGX_OK;
    }

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "accept mutex lock failed: %ui", ngx_accept_mutex_held);

    if (ngx_accept_mutex_held) {
        if (ngx_disable_accept_events(cycle, 0)  NGX_ERROR) {
            return NGX_ERROR;
        }

        ngx_accept_mutex_held = 0;
    }

    return NGX_OK;
}
  • ngx_event_process_posted:nginx本身的處理事件列表中的事件
/**
    處理事件隊列中的事件
*/
void
ngx_event_process_posted(ngx_cycle_t *cycle, ngx_queue_t *posted)
{
    ngx_queue_t  *q;
    ngx_event_t  *ev;

    while (!ngx_queue_empty(posted)) {

        q = ngx_queue_head(posted);
        ev = ngx_queue_data(q, ngx_event_t, queue);

        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                      "posted event %p", ev);

        ngx_delete_posted_event(ev);

        ev->handler(ev);
    }
}
  • ngx_post_event:給nginx的事件隊列添加事件
#define ngx_post_event(ev, q)                                                 \
                                                                              \
    if (!(ev)->posted) {                                                      \
        (ev)->posted = 1;                                                     \
        ngx_queue_insert_tail(q, &(ev)->queue);                               \
                                                                              \
        ngx_log_debug1(NGX_LOG_DEBUG_CORE, (ev)->log, 0, "post event %p", ev);\
                                                                              \
    } else  {                                                                 \
        ngx_log_debug1(NGX_LOG_DEBUG_CORE, (ev)->log, 0,                      \
                       "update posted event %p", ev);                         \
    }
  • ngx_enable_accept_events:將accept添加到I/O引擎中
static ngx_int_t
ngx_enable_accept_events(ngx_cycle_t *cycle)
{
    ngx_uint_t         i;
    ngx_listening_t   *ls;
    ngx_connection_t  *c;

    ls = cycle->listening.elts;
    for (i = 0; i < cycle->listening.nelts; i++) {

        c = ls[i].connection;

        if (c  NULL || c->read->active) {
            continue;
        }
        '調用epoll的 ngx_epoll_add_event,添加套接字到I/O事件引擎上'
        if (ngx_add_event(c->read, NGX_READ_EVENT, 0)  NGX_ERROR) {
            return NGX_ERROR;
        }
    }

    return NGX_OK;
}
  • I/O事件引擎完成accept類型事件時,read->handler= ngx_event_accept
/*
    nginx事件:用來處理I/O引擎中tcp的accept類型事件
    根據其它模塊(http、stream、mail)設置的
*/
void
ngx_event_accept(ngx_event_t *ev)
{
    socklen_t          socklen;
    ngx_err_t          err;
    ngx_log_t         *log;
    ngx_uint_t         level;
    ngx_socket_t       s;
    ngx_event_t       *rev, *wev;
    ngx_sockaddr_t     sa;
    ngx_listening_t   *ls;
    ngx_connection_t  *c, *lc;
    ngx_event_conf_t  *ecf;

    if (ev->timedout) {
        if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) {
            return;
        }

        ev->timedout = 0;
    }

    ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);

    if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) {
        ev->available = ecf->multi_accept;
    }

    lc = ev->data;
    ls = lc->listening;
    ev->ready = 0;

    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
                   "accept on %V, ready: %d", &ls->addr_text, ev->available);

    do {
        socklen = sizeof(ngx_sockaddr_t);
        // 接受
        s = accept(lc->fd, &sa.sockaddr, &socklen); 

        if (s  (ngx_socket_t) -1) {
            err = ngx_socket_errno;

            if (err  NGX_EAGAIN) {
                ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, err,
                               "accept() not ready");
                return;
            }

            level = NGX_LOG_ALERT;

            if (err  NGX_ECONNABORTED) {
                level = NGX_LOG_ERR;

            } else if (err  NGX_EMFILE || err  NGX_ENFILE) {
                level = NGX_LOG_CRIT;
            }

            ngx_log_error(level, ev->log, err, "accept() failed");
            if (err  NGX_ECONNABORTED) {
                if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
                    ev->available--;
                }

                if (ev->available) {
                    continue;
                }
            }

            if (err  NGX_EMFILE || err  NGX_ENFILE) {
                if (ngx_disable_accept_events((ngx_cycle_t *) ngx_cycle, 1)
                    != NGX_OK)
                {
                    return;
                }

                if (ngx_use_accept_mutex) {
                    if (ngx_accept_mutex_held) {
                        ngx_shmtx_unlock(&ngx_accept_mutex);
                        ngx_accept_mutex_held = 0;
                    }

                    ngx_accept_disabled = 1;

                } else {
                    ngx_add_timer(ev, ecf->accept_mutex_delay);
                }
            }

            return;
        }

        ngx_accept_disabled = ngx_cycle->connection_n / 8
                              - ngx_cycle->free_connection_n;

        c = ngx_get_connection(s, ev->log);

        if (c  NULL) {
            if (ngx_close_socket(s)  -1) {
                ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
                              ngx_close_socket_n " failed");
            }

            return;
        }

        c->type = SOCK_STREAM;

        c->pool = ngx_create_pool(ls->pool_size, ev->log);
        if (c->pool  NULL) {
            ngx_close_accepted_connection(c);
            return;
        }

        if (socklen > (socklen_t) sizeof(ngx_sockaddr_t)) {
            socklen = sizeof(ngx_sockaddr_t);
        }

        c->sockaddr = ngx_palloc(c->pool, socklen);
        if (c->sockaddr  NULL) {
            ngx_close_accepted_connection(c);
            return;
        }

        ngx_memcpy(c->sockaddr, &sa, socklen);

        log = ngx_palloc(c->pool, sizeof(ngx_log_t));
        if (log  NULL) {
            ngx_close_accepted_connection(c);
            return;
        }

        /* set a blocking mode for iocp and non-blocking mode for others */

        if (ngx_inherited_nonblocking) {
            if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
                if (ngx_blocking(s)  -1) {
                    ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
                                  ngx_blocking_n " failed");
                    ngx_close_accepted_connection(c);
                    return;
                }
            }

        } else {
            if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) {
                if (ngx_nonblocking(s)  -1) {
                    ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
                                  ngx_nonblocking_n " failed");
                    ngx_close_accepted_connection(c);
                    return;
                }
            }
        }

        *log = ls->log;

        c->recv = ngx_recv;
        c->send = ngx_send;
        c->recv_chain = ngx_recv_chain;
        c->send_chain = ngx_send_chain;

        c->log = log;
        c->pool->log = log;

        c->socklen = socklen;
        c->listening = ls;
        c->local_sockaddr = ls->sockaddr;
        c->local_socklen = ls->socklen;

        rev = c->read;
        wev = c->write;

        wev->ready = 1;

        if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
            rev->ready = 1;
        }

        if (ev->deferred_accept) {
            rev->ready = 1;
        }

        rev->log = log;
        wev->log = log;

        /*
         * TODO: MT: - ngx_atomic_fetch_add()
         *             or protection by critical section or light mutex
         *
         * TODO: MP: - allocated in a shared memory
         *           - ngx_atomic_fetch_add()
         *             or protection by critical section or light mutex
         */

        c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
        if (ls->addr_ntop) {
            c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);
            if (c->addr_text.data  NULL) {
                ngx_close_accepted_connection(c);
                return;
            }

            c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen,
                                             c->addr_text.data,
                                             ls->addr_text_max_len, 0);
            if (c->addr_text.len  0) {
                ngx_close_accepted_connection(c);
                return;
            }
        }

        if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT)  0) {
            if (ngx_add_conn(c)  NGX_ERROR) {
                ngx_close_accepted_connection(c);
                return;
            }
        }

        log->data = NULL;
        log->handler = NULL;
        
        '根據其它模塊(http、stream、mail)設置的'
        ls->handler(c); 

        if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
            ev->available--;
        }

    } while (ev->available);
}

epoll I/O引擎模塊

NGX_EVENT_MODULE的事件actions:
  • 特色:框架

    • 事件處理的函數集放在 cycle->ctx中
    • 事件的actions是一個全局變量:ngx_event_actions_t結構的ngx_event_actions
    • 每次actions.init()後會將,該module的 ngx_event_actions = cycle->ctx->actions
  • 例如:啓用epoll模塊事件結構:將全部的I/O操做都封裝成一個nginx事件,而後在work中循環調用異步

ngx_event_module_t  ngx_epoll_module_ctx = {
    &epoll_name,
    ngx_epoll_create_conf,               /* create configuration */
    ngx_epoll_init_conf,                 /* init configuration */
    //nginx的事件處理函數
    {
        ngx_epoll_add_event,             /* add an event */
        ngx_epoll_del_event,             /* delete an event */
        ngx_epoll_add_event,             /* enable an event */
        ngx_epoll_del_event,             /* disable an event */
        ngx_epoll_add_connection,        /* add an connection */
        ngx_epoll_del_connection,        /* delete an connection */
        ngx_epoll_process_events,        /* process the events */
        ngx_epoll_init,                  /* init the events */
        ngx_epoll_done,                  /* done the events */
    }
};
epoll須要的函數函數
  • ngx_epoll_init() :事件啓動函數,主要是初始化epoll模塊
/**
    NGX_EVENT_MOUDLE中epoll的actions->init()函數
*/
static ngx_int_t
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
    ngx_epoll_conf_t  *epcf;
    //獲取epoll的配置信息
    epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);

    if (ep  -1) {
        ep = epoll_create(cycle->connection_n / 2); //建立epoll

        if (ep  -1) {
            ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                          "epoll_create() failed");
            return NGX_ERROR;
        }
    }

    if (nevents < epcf->events) {
        if (event_list) {
            ngx_free(event_list);
        }

        event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events,
                               cycle->log);
        if (event_list  NULL) {
            return NGX_ERROR;
        }
    }

    nevents = epcf->events;

    ngx_io = ngx_os_io;

    ngx_event_actions = ngx_epoll_module_ctx.actions;   //模塊的總事件

    return NGX_OK;
}
  • ngx_epoll_add_event: 添加套接字到I/O事件引擎上
/**
    添加nginx事件(一個套接字)到epoll中
*/
static ngx_int_t
ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
{
    int                  op;
    uint32_t             events, prev;
    ngx_event_t         *e;
    ngx_connection_t    *c;
    struct epoll_event   ee;

    c = ev->data;

    events = (uint32_t) event;

    if (event  NGX_READ_EVENT) {
        e = c->write;
        prev = EPOLLOUT;    //表示對應的文件描述符能夠寫;
    } else {
        e = c->read;
    }
    if (e->active) {
        op = EPOLL_CTL_MOD; //修改已經註冊的fd的監聽事件
        events |= prev;

    } else {
        op = EPOLL_CTL_ADD; //註冊新的fd到epfd中;
    }
    ee.events = events | (uint32_t) flags;
    ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);

    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
                   "epoll add event: fd:%d op:%d ev:%08XD",
                   c->fd, op, ee.events);
    '將套接字添加到I/O引擎中'
    if (epoll_ctl(ep, op, c->fd, &ee)  -1) {
        ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
                      "epoll_ctl(%d, %d) failed", op, c->fd);
        return NGX_ERROR;
    }
    ev->active = 1;
    return NGX_OK;
}
  • ngx_epoll_process_events:epoll事件模塊的I/O處理函數,將準備好的I/O事件封裝到nginx事件隊列中
/**
    I/O引擎事件處理函數
**/
static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
    int                events;
    uint32_t           revents;
    ngx_int_t          instance, i;
    ngx_uint_t         level;
    ngx_err_t          err;
    ngx_event_t       *rev, *wev;
    ngx_queue_t       *queue;
    ngx_connection_t  *c;

    /* NGX_TIMER_INFINITE  INFTIM */

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "epoll timer: %M", timer);
    //獲取epoll處理好的事件
    events = epoll_wait(ep, event_list, (int) nevents, timer);

    err = (events  -1) ? ngx_errno : 0;

    if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
        ngx_time_update();
    }

    if (err) {
        if (err  NGX_EINTR) {

            if (ngx_event_timer_alarm) {
                ngx_event_timer_alarm = 0;
                return NGX_OK;
            }

            level = NGX_LOG_INFO;

        } else {
            level = NGX_LOG_ALERT;
        }

        ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
        return NGX_ERROR;
    }

    if (events  0) {
        if (timer != NGX_TIMER_INFINITE) {
            return NGX_OK;
        }

        ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                      "epoll_wait() returned no events without timeout");
        return NGX_ERROR;
    }
    //循環處理
    for (i = 0; i < events; i++) {
        c = event_list[i].data.ptr;

        instance = (uintptr_t) c & 1;
        c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);

        rev = c->read;

        if (c->fd  -1 || rev->instance != instance) {

            /*
             * the stale event from a file descriptor
             * that was just closed in this iteration
             */

            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "epoll: stale event %p", c);
            continue;
        }

        revents = event_list[i].events;

        ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "epoll: fd:%d ev:%04XD d:%p",
                       c->fd, revents, event_list[i].data.ptr);
        // 事件的錯誤信息
        if (revents & (EPOLLERR|EPOLLHUP)) {
            ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "epoll_wait() error on fd:%d ev:%04XD",
                           c->fd, revents);

            /*
             * if the error events were returned, add EPOLLIN and EPOLLOUT
             * to handle the events at least in one active handler
             */

            revents |= EPOLLIN|EPOLLOUT;
        }

        //讀事件
        if ((revents & EPOLLIN) && rev->active) {

            rev->ready = 1;

            if (flags & NGX_POST_EVENTS) {
                queue = rev->accept ? &ngx_posted_accept_events
                                    : &ngx_posted_events;

                '將準備好讀事件添加到ngx_posted_accept_events和ngx_posted_events'
                ngx_post_event(rev, queue);
            } else {
                '若是不是NGX_POST_EVENTS,直接調用gx_posted_accept_events()和ngx_posted_events()'
                rev->handler(rev);  
            }
        }

        wev = c->write;
        //寫事件
        if ((revents & EPOLLOUT) && wev->active) {

            if (c->fd  -1 || wev->instance != instance) {

                /*
                 * the stale event from a file descriptor
                 * that was just closed in this iteration
                 */

                ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                               "epoll: stale event %p", c);
                continue;
            }

            wev->ready = 1;
            if (flags & NGX_POST_EVENTS) {
                '將準備好的寫事件添加到ngx_posted_events'
                ngx_post_event(wev, &ngx_posted_events);  

            } else {
                wev->handler(wev);
            }
        }
    }

    return NGX_OK;
}

http模塊

  • 在work進程中,循環調用I/O事件模塊的 ngx_process_events() 時。若是不是 NGX_POST_EVENTS類型的讀I/O(即accept事件),則調用了ngx_event_accept()
  • 而後調用了listen->heandler = ngx_http_init_connection
ngx_http_init_connection:http鏈接的accept處理函數
  • 初始化ngx_http_connection_t
  • 設置read,write的處理函數
  • 將這個鏈接套接字放到I/O事件引擎上
/**
    http鏈接處理函數
    1.初始化ngx_http_connection_t
    2.設置read,write的處理函數
    3.將這個鏈接封裝成一個nginx事件,添加到事件隊列中
*/
void
ngx_http_init_connection(ngx_connection_t *c)
{
    ngx_uint_t              i;
    ngx_event_t            *rev;
    struct sockaddr_in     *sin;
    ngx_http_port_t        *port;
    ngx_http_in_addr_t     *addr;
    ngx_http_log_ctx_t     *ctx;
    ngx_http_connection_t  *hc;
#if (NGX_HAVE_INET6)
    struct sockaddr_in6    *sin6;
    ngx_http_in6_addr_t    *addr6;
#endif
    //初始化http鏈接
    hc = ngx_pcalloc(c->pool, sizeof(ngx_http_connection_t));
    if (hc  NULL) {
        ngx_http_close_connection(c);
        return;
    }
    //分配鏈接
    c->data = hc;

    /* find the server configuration for the address:port */
    //監聽端口
    port = c->listening->servers;

    if (port->naddrs > 1) {

        /*
         * there are several addresses on this port and one of them
         * is an "*:port" wildcard so getsockname() in ngx_http_server_addr()
         * is required to determine a server address
         */

        if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) {
            ngx_http_close_connection(c);
            return;
        }

        switch (c->local_sockaddr->sa_family) {

#if (NGX_HAVE_INET6)
        case AF_INET6:
            sin6 = (struct sockaddr_in6 *) c->local_sockaddr;

            addr6 = port->addrs;

            /* the last address is "*" */

            for (i = 0; i < port->naddrs - 1; i++) {
                if (ngx_memcmp(&addr6[i].addr6, &sin6->sin6_addr, 16)  0) {
                    break;
                }
            }

            hc->addr_conf = &addr6[i].conf;

            break;
#endif

        default: /* AF_INET */
            sin = (struct sockaddr_in *) c->local_sockaddr;
            //初始化地址
            addr = port->addrs;

            /* the last address is "*" */

            for (i = 0; i < port->naddrs - 1; i++) {
                if (addr[i].addr  sin->sin_addr.s_addr) {
                    break;
                }
            }

            hc->addr_conf = &addr[i].conf;

            break;
        }

    } else {

        switch (c->local_sockaddr->sa_family) {

#if (NGX_HAVE_INET6)
        case AF_INET6:
            addr6 = port->addrs;
            hc->addr_conf = &addr6[0].conf;
            break;
#endif

        default: /* AF_INET */
            addr = port->addrs;
            hc->addr_conf = &addr[0].conf;
            break;
        }
    }

    /* the default server configuration for the address:port */
    hc->conf_ctx = hc->addr_conf->default_server->ctx;  //配置文件

    ctx = ngx_palloc(c->pool, sizeof(ngx_http_log_ctx_t));
    if (ctx  NULL) {
        ngx_http_close_connection(c);
        return;
    }

    ctx->connection = c;
    ctx->request = NULL;
    ctx->current_request = NULL;

    c->log->connection = c->number;
    c->log->handler = ngx_http_log_error;
    c->log->data = ctx;
    c->log->action = "waiting for request";

    c->log_error = NGX_ERROR_INFO;

    rev = c->read;  
    rev->handler = ngx_http_wait_request_handler;   //讀事件
    c->write->handler = ngx_http_empty_handler;     //寫事件

#if (NGX_HTTP_V2)
    if (hc->addr_conf->http2) {
        rev->handler = ngx_http_v2_init;
    }
#endif

#if (NGX_HTTP_SSL)
    {
    ngx_http_ssl_srv_conf_t  *sscf;

    sscf = ngx_http_get_module_srv_conf(hc->conf_ctx, ngx_http_ssl_module);

    if (sscf->enable || hc->addr_conf->ssl) {

        c->log->action = "SSL handshaking";

        if (hc->addr_conf->ssl && sscf->ssl.ctx  NULL) {
            ngx_log_error(NGX_LOG_ERR, c->log, 0,
                          "no \"ssl_certificate\" is defined "
                          "in server listening on SSL port");
            ngx_http_close_connection(c);
            return;
        }

        hc->ssl = 1;

        rev->handler = ngx_http_ssl_handshake;  //SSL處理事件
    }
    }
#endif

    if (hc->addr_conf->proxy_protocol) {
        hc->proxy_protocol = 1;
        c->log->action = "reading PROXY protocol";
    }

    if (rev->ready) {
        /* the deferred accept(), iocp */

        if (ngx_use_accept_mutex) {
            ngx_post_event(rev, &ngx_posted_events);    ''
            return;
        }

        rev->handler(rev);  '直接執行read處理函數'
        return;
    }
    //設置結構的時間
    ngx_add_timer(rev, c->listening->post_accept_timeout);
    ngx_reusable_connection(c, 1);
    
    '將這個鏈接套接字放到I/O事件引擎上'
    if (ngx_handle_read_event(rev, 0) != NGX_OK) {
        ngx_http_close_connection(c);
        return;
    }
}
ngx_http_wait_request_handler:http鏈接的read處理函數
  • 初始化ngx_http_connection_t
  • 設置buffer的大小
  • 測試鏈接是否可用
  • 若是有動態代理,則將其封裝成一個事件放到nginx的事件隊列中
  • 從新設置鏈接事件爲:ngx_http_process_request_line
  • 解析讀到的內容
/**
    http 鏈接的讀事件
    1. 初始化ngx_http_connection_t
    2. 設置buffer的大小
    3. 測試鏈接是否可用
    3. 若是有動態代理,則將其封裝成一個事件放到nginx的事件隊列
    5. 從新設置鏈接事件爲:ngx_http_process_request_line
    6. 解析讀到的內容
*/
static void
ngx_http_wait_request_handler(ngx_event_t *rev)
{
    u_char                    *p;
    size_t                     size;
    ssize_t                    n;
    ngx_buf_t                 *b;
    ngx_connection_t          *c;
    ngx_http_connection_t     *hc;
    ngx_http_core_srv_conf_t  *cscf;
    //初始化ngx_connection_t
    c = rev->data;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http wait request handler");

    if (rev->timedout) {
        ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out");
        ngx_http_close_connection(c);
        return;
    }

    if (c->close) {
        ngx_http_close_connection(c);
        return;
    }
    //初始化ngx_http_connection_t鏈接
    hc = c->data;
    cscf = ngx_http_get_module_srv_conf(hc->conf_ctx, ngx_http_core_module);
    //初始化buffer的大小
    size = cscf->client_header_buffer_size;

    b = c->buffer;

    if (b  NULL) {
        b = ngx_create_temp_buf(c->pool, size);
        if (b  NULL) {
            ngx_http_close_connection(c);
            return;
        }

        c->buffer = b;

    } else if (b->start  NULL) {

        b->start = ngx_palloc(c->pool, size);
        if (b->start  NULL) {
            ngx_http_close_connection(c);
            return;
        }

        b->pos = b->start;
        b->last = b->start;
        b->end = b->last + size;
    }
    //讀取數據:用於測試鏈接是否可用
    n = c->recv(c, b->last, size);
    //若是出錯
    if (n  NGX_AGAIN) {

        if (!rev->timer_set) {
            ngx_add_timer(rev, c->listening->post_accept_timeout);
            ngx_reusable_connection(c, 1);
        }

        if (ngx_handle_read_event(rev, 0) != NGX_OK) {
            ngx_http_close_connection(c);
            return;
        }

        /*
         * We are trying to not hold c->buffer's memory for an idle connection.
         */

        if (ngx_pfree(c->pool, b->start)  NGX_OK) {
            b->start = NULL;
        }

        return;
    }

    if (n  NGX_ERROR) {
        ngx_http_close_connection(c);
        return;
    }

    if (n  0) {
        ngx_log_error(NGX_LOG_INFO, c->log, 0,
                      "client closed connection");
        ngx_http_close_connection(c);
        return;
    }

    b->last += n;
    //調用動態代理
    if (hc->proxy_protocol) {
        hc->proxy_protocol = 0;
        //返回動態代理的地址
        p = ngx_proxy_protocol_read(c, b->pos, b->last);

        if (p  NULL) {
            ngx_http_close_connection(c);
            return;
        }

        b->pos = p;

        if (b->pos  b->last) {
            c->log->action = "waiting for request";
            b->pos = b->start;
            b->last = b->start;
            '將這個鏈接封裝成一個nginx事件,添加到事件隊列中'
            ngx_post_event(rev, &ngx_posted_events);
            return;
        }
    }

    c->log->action = "reading client request line";

    ngx_reusable_connection(c, 0);

    c->data = ngx_http_create_request(c);
    if (c->data  NULL) {
        ngx_http_close_connection(c);
        return;
    }
    '從新設置事件的處理函數'
    rev->handler = ngx_http_process_request_line;
    '解析讀到的內容'
    ngx_http_process_request_line(rev);
}
ngx_http_empty_handler:http鏈接的write處理函數
相關文章
相關標籤/搜索