事件處理框架所要解決的問題是如何收集,管理,分發事件。這裏所說的事件,主要以網絡事件和定時器事件爲主,而網絡事件中又以TCP網絡事件爲主。因爲網絡事件與網卡中斷處理程序,內核提供的系統調用密切相關,因此網絡事件的驅動取決於不一樣的操做系統平臺,在同一操做系統中也受制於不一樣的操做系統內核版本。所以不一樣操做系統有不一樣的事件驅動機制。nginx
基於模塊化的設計思想,nginx對於事件處理分不一樣的模塊來完成。首先是ngx_events_module,它是NGX_CORE_MODULE類型的模塊,主要負責配置文件events塊配置項的解析;其次是ngx_event_core_module,它是NGX_EVENTS_MODULE類型的模塊,這個模塊會決定使用哪一種事件驅動機制,而且怎樣調用事件驅動完成事件的管理;最後是ngx_epoll_module,ngx_kqueue_module,ngx_poll_module等一系列模塊,這些模塊實現了具體的事件驅動機制。服務器
在模塊接口ngx_module_t中,有一個指向模塊上下文的指針,不一樣的模塊採用不一樣的結構體。網絡
對於NGX_EVENT_MODULE類型的模塊,其上下文結構體爲ngx_event_module_t:數據結構
typedef struct { // 事件模塊的名稱 ngx_str_t * name; // 用於建立保存配置項參數的結構體 void * (*create_conf)(ngx_cycle_t * cycle); // 結合配置文件初始化配置項參數 char * (*init_conf)(ngx_cycle_t * cycle, void * conf); // 事件驅動機制的核心方法抽象 ngx_event_actions_t actions; } ngx_event_module_t; typedef struct { // 添加事件方法 負責把一個感興趣的事件添加到事件驅動中 ngx_int_t (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); // 刪除事件方法 ngx_int_t (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); // 啓用一個事件 ngx_int_t (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); // 禁用一個事件 ngx_int_t (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); // 向事件驅動中添加一個新的鏈接,該鏈接的讀寫事件均會被添加到事件驅動中 ngx_int_t (*add_conn)(ngx_connection_t * c); // 從事件驅動中刪除一個新的鏈接 ngx_int_t (*del_conn)(ngx_connection_t * c, ngx_uint_t flags); ngx_int_t (*notify)(ngx_event_handler_pt handler); // 處理事件方法 ngx_int_t (*process_event)(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags); // 事件驅動初始化方法 ngx_int_t (*init)(ngx_cycle_t *cycle, ngx_msec_t timer); // 退出事件驅動前調用的方法 void (*done)(ngx_cycle_t *cycle); } ngx_event_actions_t;
ngx_epoll_module,ngx_kqueue_module,ngx_poll_module等模塊均會實現ngx_event_actions_t中定義的方法以此實現對應的事件驅動機制。另外,在事件模塊初始化的過程當中會對全局變量ngx_event_actions賦值,後續經過ngx_event_actions完成事件的管理,這樣邏輯上就有了較明顯的分層。框架
extern ngx_event_actions_t ngx_event_actions; #define ngx_process_events ngx_event_actions.process_events #define ngx_done_events ngx_event_actions.done #define ngx_add_event ngx_event_actions.add #define ngx_del_event ngx_event_actions.del #define ngx_add_conn ngx_event_actions.add_conn #define ngx_del_conn ngx_event_actions.del_conn #define ngx_notify ngx_event_actions.notify
事件模塊的運行流程:socket
即調用ngx_event_core_module模塊的ngx_event_module_init方法,在該方法中會初始化一些與系統相關的信息,例如進程打開的最大文件數,原子鎖(文件鎖)的初始化等。tcp
即調用ngx_event_core_module模塊的ngx_event_process_init方法,在該方法中會初始化用於多進程偵聽的鎖,初始化選用的事件驅動機制、初始化鏈接池、讀寫事件、最後將偵聽套接字做爲可讀事件添加到事件驅動中。模塊化
經過ngx_event_actions開始事件循環。函數
每個事件都由ngx_event_t結構體來表示。該結構體中最核心的部分就是handler回調方法,它由每個事件消費模塊實現,以此決定這個事件究竟如何被處理。ui
typedef struct ngx_event_s ngx_event_t; struct ngx_event_s { // 事件相關的對象, 一般指向ngx_connection_t鏈接對象 void * data; // 事件的處理方法, 每一個事件消費模塊都會從新實現它 ngx_event_handler_pt handler; ... };
在nginx中,定義了基本的數據結構ngx_connection_t來表示鏈接。這個鏈接能夠是被動鏈接:即客戶端主動發起的,nginx服務器被動接受的tcp鏈接;也能夠是主動鏈接:即nginx主動向上游服務器創建的鏈接,並以此鏈接與上游服務器通訊。主動鏈接由結構體ngx_peer_connection來表示,它是以ngx_connection_t結構體爲基礎來實現的。
typedef struct ngx_connection_s ngx_connection_t; struct ngx_connection_s { // 做爲空閒鏈接時, 指向鏈接池中下一個空閒鏈接 // 做爲非空閒鏈接時, 其意義由使用鏈接的模塊定義, 例如http模塊將data指向一個http請求(ngx_http_request_t) void * data; // 鏈接對應的讀事件 ngx_event_t * read; // 鏈接對應的寫事件 ngx_event_t * write; // 鏈接對應的偵聽對象 ngx_listening_t * listening; // 鏈接的套接字句柄 ngx_socket_t fd; ... };
每一個nginx須要偵聽的端口都由結構體ngx_listening_t來表示,該結構體中包含了socket套接字,偵聽的ip地址,端口,以及偵聽端口上成功創建新鏈接後的回調處理方法。
typedef struct ngx_listening_s ngx_listening_t; struct ngx_listening_s { // 偵聽套接字 ngx_socket_t fd; // 偵聽地址 struct sockaddr * sockaddr; // 新鏈接成功創建後的處理方法 ngx_connection_handler_pt handler; // 對應的鏈接對象 ngx_connection_t * connection; ... };
在初始化過程當中,每一個鏈接都會自動對應到一個讀事件和寫事件;將偵聽套接字添加到事件驅動過程當中,爲每一個偵聽套接字分配一個鏈接,並對分配到的鏈接的讀事件的處理函數賦值;當新鏈接創建後回調偵聽的處理方法,在該方法中會修改新鏈接讀寫事件的處理方法,因爲偵聽是在不一樣的模塊中被初始化的,有了新鏈接的回調處理方法,不一樣的模塊就能方便的集成事件處理框架。例如http模塊在解析配置構造ngx_listening_t結構體時,將其回調處理方法設置爲ngx_http_init_connection,當有新鏈接成功創建時,該函數被回調,而且根據配置與實際狀況將新鏈接的讀寫事件處理方法修改成ngx_http_wait_request_handler,ngx_http_empty_handler或其餘http的處理方法。
相關源碼:
static ngx_int_t ngx_event_process_init( ngx_cycle_t * cycle ) { ... // 爲鏈接分配內存空間 cycle->connections = ngx_alloc(sizeof(ngx_connection_t)*cycle->connection_n, cycle->log); if( cycle->connections == NULL ) { return NGX_ERROR; } c = cycle->connections; // 爲讀事件分配內存空間並初始化 cycle->read_events = ngx_alloc(sizeof(ngx_event_t)*cycle->connection_n, cycle->log); if( cycle->read_events == NULL ) { return NGX_ERROR; } rev = cycle->read_events; for( i = 0; i < cycle->connection_n; i++ ) { rev[i].closed = 1; rev[i].instance = 1; } // 爲寫事件分配內存空間並初始化 cycle->write_events = ngx_alloc(sizeof(ngx_event_t)*cycle->connection_n, cycle->log); if( cycle->write_events == NULL ) { return NGX_ERROR; } wev = cycle->write_events; for( i = 0; i < cycle->connection_n; i++ ) { wev[i].closed = 1; wev[i].instance = 1; } // 每一個鏈接的讀事件和寫事件 i = cycle->connection_n; next = NULL; do { i--; c[i].data = next; c[i].read = &cycle->read_events[i]; c[i].write = &cycle->write_events[i]; c[i].fd = (ngx_socket_t)-1; } while(i); // 爲每一個偵聽分配一個鏈接, 用於事件管理 ls = cycle->listening.elts; for( i = 0 ; i < cycle->listening.nelts; i++ ) { c = ngx_get_connection(ls[i].fd, cycle->log); if( c == NULL ) { return NGX_ERROR; } c->type = ls[i].type; c->log = &ls[i].log; c->listening = &ls[i]; ls[i].connection = c; rev = c->read; rev->log = c->log; rev->accept = 1; rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept : ngx_event_recvmsg; if(ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) { return NGX_ERROR; } } return NGX_OK; }
參考:
《深刻理解nginx》
《nginx開發從入門到精通》