struct ngx_event_s { /* 事件上下文數據,一般data都是指向ngx_connection_t鏈接對象。 * 開啓文件異步I/O時,它可能會指向ngx_event_aio_t結構體。 */ void *data; /* 標誌位,爲1時表示事件是可寫的。一般它表示對應的TCP鏈接可寫,也就是鏈接處於能夠發送網絡包的狀態。*/ unsigned write:1; /* 標誌位,爲1時表示爲此事件能夠創建新的鏈接。 一般在ngx_cycle_t中的listening動態數組中,每個監聽對象ngx_listening_t,對應的讀事件中的accept標誌位纔會是1。 */ unsigned accept:1; /* used to detect the stale events in kqueue, rtsig, and epoll * 這個標誌位用於區分當前事件是否過時,它僅僅是給事件驅動模塊使用的,而事件消費模塊可不用關心。 * 爲何須要這個標誌位呢?當開始處理一批事件時,處理前面的事件可能會關閉一些鏈接, 而這些鏈接有可能影響這批事件中還未處理到的後面的事件,這時可經過instance來避免處理後面的過時事件。 */ unsigned instance:1; /* the event was passed or would be passed to a kernel; * in aio mode - operation was posted. * 標誌位,爲1表示當前事件是活躍的,爲0表示事件是不活躍的。 * 這個狀態對應着事件驅動模塊處理方式的不一樣。例如,在添加事件,刪除事件和處理事件時, * active標誌位的不一樣都會對應着不一樣的處理方式。在使用事件時,通常不會直接改變active標誌位。 */ unsigned active:1; /* 標誌位,爲1表示禁用事件,僅在kqueue或者rtsig事件驅動模塊中有效,對於epoll事件驅動模塊則沒有意義。 */ unsigned disabled:1; /* the ready event; in aio mode 0 means that no operation can be posted * 標誌位,爲1表示當前事件準備就緒,也就是說,容許這個事件的handler處理這個事件。 * 在HTTP框架中,常常會檢查事件的ready標誌位,以肯定是否能夠接收請求或者發送相應。 */ unsigned ready:1; /* 該標誌位僅對kqueue,eventport等模塊有意義,而對於linux上的epoll事件驅動模塊則是無心義的。*/ unsigned oneshot:1; /* aio operation is complete 用於異步aio事件的處理*/ unsigned complete:1; /* 標誌位,eof表示當前處理的字符流已經結束,error表示事件處理過程出錯了*/ unsigned eof:1; unsigned error:1; /* 標誌位,爲1表示這個事件超時,用以提示handler作超時處理,它與timer_set都用了定時器 */ unsigned timedout:1; unsigned timer_set:1; /* 標誌位,delayed爲1表示須要延遲處理這個事件,它僅用於限速功能 */ unsigned delayed:1; /* 標誌位,爲1表示延遲創建TCP鏈接,也就是TCP三次握手後並不創建鏈接,而是等到真正收到數據包後才建鏈接 */ unsigned deferred_accept:1; /* the pending eof reported by kqueue, epoll or in aio chain operation */ /* 標誌位,爲1表示等待字符流結束,它只與kqueue和aio事件驅動機制有關 */ unsigned pending_eof:1; //接受、讀、寫 unsigned posted:1; //關閉 unsigned closed:1; /* to test on worker exit */ unsigned channel:1; unsigned resolver:1; unsigned cancelable:1; #if (NGX_HAVE_KQUEUE) unsigned kq_vnode:1; /* the pending errno reported by kqueue */ int kq_errno; #endif /* * kqueue only: * accept: number of sockets that wait to be accepted * read: bytes to read when event is ready * or lowat when event is set with NGX_LOWAT_EVENT flag * write: available space in buffer when event is ready * or lowat when event is set with NGX_LOWAT_EVENT flag * * epoll with EPOLLRDHUP: * accept: 1 if accept many, 0 otherwise * read: 1 if there can be data to read, 0 otherwise * * iocp: TODO * * otherwise: * accept: 1 if accept many, 0 otherwise */ #if (NGX_HAVE_KQUEUE) || (NGX_HAVE_IOCP) int available; #else unsigned available:1; #endif /* 這個事件發生時的處理方法 */ ngx_event_handler_pt handler; #if (NGX_HAVE_IOCP) ngx_event_ovlp_t ovlp; #endif ngx_uint_t index; ngx_log_t *log; ngx_rbtree_node_t timer; /* the posted queue */ ngx_queue_t queue; #if 0 /* the threads support */ /* * the event thread context, we store it here * if $(CC) does not understand __thread declaration * and pthread_getspecific() is too costly */ void *thr_ctx; #if (NGX_EVENT_T_PADDING) /* event should not cross cache line in SMP */ uint32_t padding[NGX_EVENT_T_PADDING]; #endif #endif };
struct ngx_event_aio_s { void *data; ngx_event_handler_pt handler; ngx_file_t *file; ngx_fd_t fd; #if (NGX_HAVE_AIO_SENDFILE || NGX_COMPAT) ssize_t (*preload_handler)(ngx_buf_t *file); #endif #if (NGX_HAVE_EVENTFD) int64_t res; #endif #if !(NGX_HAVE_EVENTFD) || (NGX_TEST_BUILD_EPOLL) ngx_err_t err; size_t nbytes; #endif ngx_aiocb_t aiocb; ngx_event_t event; };
typedef struct { /* 添加/移出事件方法,負責把事件添加/移出到操做系統提供的事件驅動機制(如epoll,kqueue等)中, 這樣在事件發生以後,將能夠/沒法調用下面的process_envets時獲取這個事件。 */ ngx_int_t (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); ngx_int_t (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); /* 啓用/禁用一個事件,目前事件框架不會調用,大部分事件驅動模塊對該方法的實現都與add/del徹底一致 * ngx_int_t (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); ngx_int_t (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); /* 向事件驅動機制中添加/移除一個新的鏈接,這意味着鏈接上的讀寫事件都添加到/移出事件驅動機制中了 */ ngx_int_t (*add_conn)(ngx_connection_t *c); ngx_int_t (*del_conn)(ngx_connection_t *c, ngx_uint_t flags); ngx_int_t (*notify)(ngx_event_handler_pt handler); 'nginx的核心事件,將epoll事件封裝到nginx事件列表中' '啓動nginx的事件列表' /* 在正常的工做循環中,將經過調用process_events方法來處理事件。 * 這個方法僅在ngx_process_events_and_timers方法中調用,它是處理分發事件的核心。*/ ngx_int_t (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags); /* 初始化和退出事件驅動模塊的方法:初始化epoll */ ngx_int_t (*init)(ngx_cycle_t *cycle, ngx_msec_t timer); void (*done)(ngx_cycle_t *cycle); } ngx_event_actions_t;
/** 模塊的進程啓動函數 */ static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle) { ngx_uint_t m, i; ngx_event_t *rev, *wev; ngx_listening_t *ls; ngx_connection_t *c, *next, *old; ngx_core_conf_t *ccf; ngx_event_conf_t *ecf; ngx_event_module_t *module; ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module); ecf = ngx_event_get_conf(cycle->conf_ctx, ngx_event_core_module); if (ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex) { ngx_use_accept_mutex = 1; ngx_accept_mutex_held = 0; ngx_accept_mutex_delay = ecf->accept_mutex_delay; } else { ngx_use_accept_mutex = 0; } //初始化nginx的事件列表 ngx_queue_init(&ngx_posted_accept_events); ngx_queue_init(&ngx_posted_events); if (ngx_event_timer_init(cycle->log) NGX_ERROR) { return NGX_ERROR; } //初始化 NGX_EVENT_MODULE類型模塊的事件上下文: // epoll事件引擎:epoll_create for (m = 0; cycle->modules[m]; m++) { if (cycle->modules[m]->type != NGX_EVENT_MODULE) { continue; } if (cycle->modules[m]->ctx_index != ecf->use) { continue; } module = cycle->modules[m]->ctx; if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) { /* fatal */ exit(2); } break; } if (ngx_timer_resolution && !(ngx_event_flags & NGX_USE_TIMER_EVENT)) { ngx_log_error(NGX_LOG_WARN, cycle->log, 0, "the \"timer_resolution\" directive is not supported " "with the configured event method, ignored"); ngx_timer_resolution = 0; } cycle->connections = ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log); if (cycle->connections NULL) { return NGX_ERROR; } c = cycle->connections; cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log); if (cycle->read_events NULL) { return NGX_ERROR; } rev = cycle->read_events; for (i = 0; i < cycle->connection_n; i++) { rev[i].closed = 1; rev[i].instance = 1; } cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log); if (cycle->write_events NULL) { return NGX_ERROR; } wev = cycle->write_events; for (i = 0; i < cycle->connection_n; i++) { wev[i].closed = 1; } i = cycle->connection_n; next = NULL; do { i--; c[i].data = next; c[i].read = &cycle->read_events[i]; c[i].write = &cycle->write_events[i]; c[i].fd = (ngx_socket_t) -1; next = &c[i]; } while (i); cycle->free_connections = next; cycle->free_connection_n = cycle->connection_n; /* for each listening socket */ ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) { c = ngx_get_connection(ls[i].fd, cycle->log); if (c NULL) { return NGX_ERROR; } c->type = ls[i].type; c->log = &ls[i].log; c->listening = &ls[i]; ls[i].connection = c; rev = c->read; rev->log = c->log; rev->accept = 1; if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) { if (ls[i].previous) { old = ls[i].previous->connection; if (ngx_del_event(old->read, NGX_READ_EVENT, NGX_CLOSE_EVENT) NGX_ERROR) { return NGX_ERROR; } old->fd = (ngx_socket_t) -1; } } '初始化tcp/UNIX域的accept事件' rev->handler = (c->type SOCK_STREAM) ? ngx_event_accept : ngx_event_recvmsg; '獲取ngx_use_accept_mutex' if (ngx_use_accept_mutex) { continue; } '獲取ngx_use_accept_mutex成功後,將該套接字註冊到I/O引擎上' if (ngx_add_event(rev, NGX_READ_EVENT, 0) NGX_ERROR) { return NGX_ERROR; } return NGX_OK; }
ngx_trylock_accept_mutex 當獲取到標誌位後才進行** accept 事件註冊**node
ngx_process_events 處理事件linux
釋放 accept_mutex 鎖nginx
ngx_event_process_posted 處理 posted 隊列的事件數組
ngx_process_events_and_timers()網絡
/** nginx的事件初始器 */ void ngx_process_events_and_timers(ngx_cycle_t *cycle) { ngx_uint_t flags; ngx_msec_t timer, delta; if (ngx_timer_resolution) { timer = NGX_TIMER_INFINITE; flags = 0; } else { timer = ngx_event_find_timer(); flags = NGX_UPDATE_TIME; } if (ngx_use_accept_mutex) { if (ngx_accept_disabled > 0) { ngx_accept_disabled--; } else { //獲取ngx_use_accept_mutex,將套接字註冊到I/O事件引擎上,事件類型accept if (ngx_trylock_accept_mutex(cycle) NGX_ERROR) { return; } if (ngx_accept_mutex_held) { flags |= NGX_POST_EVENTS; } else { if (timer NGX_TIMER_INFINITE || timer > ngx_accept_mutex_delay) { timer = ngx_accept_mutex_delay; } } } } delta = ngx_current_msec; '調用處理好的 read、write放到ngx_posted_events中,等待nginx處理' (void) ngx_process_events(cycle, timer, flags); delta = ngx_current_msec - delta; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "timer delta: %M", delta); //處理nginx事件隊列中的事件 ngx_event_process_posted(cycle, &ngx_posted_accept_events); if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); } if (delta) { ngx_event_expire_timers(); } ngx_event_process_posted(cycle, &ngx_posted_events); }
/** 獲取ngx_accept_mutex鎖,添加事件 */ ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle) { if (ngx_shmtx_trylock(&ngx_accept_mutex)) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex locked"); if (ngx_accept_mutex_held && ngx_accept_events 0) { return NGX_OK; } if (ngx_enable_accept_events(cycle) NGX_ERROR) { ngx_shmtx_unlock(&ngx_accept_mutex); return NGX_ERROR; } ngx_accept_events = 0; ngx_accept_mutex_held = 1; return NGX_OK; } ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex lock failed: %ui", ngx_accept_mutex_held); if (ngx_accept_mutex_held) { if (ngx_disable_accept_events(cycle, 0) NGX_ERROR) { return NGX_ERROR; } ngx_accept_mutex_held = 0; } return NGX_OK; }
/** 處理事件隊列中的事件 */ void ngx_event_process_posted(ngx_cycle_t *cycle, ngx_queue_t *posted) { ngx_queue_t *q; ngx_event_t *ev; while (!ngx_queue_empty(posted)) { q = ngx_queue_head(posted); ev = ngx_queue_data(q, ngx_event_t, queue); ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "posted event %p", ev); ngx_delete_posted_event(ev); ev->handler(ev); } }
#define ngx_post_event(ev, q) \ \ if (!(ev)->posted) { \ (ev)->posted = 1; \ ngx_queue_insert_tail(q, &(ev)->queue); \ \ ngx_log_debug1(NGX_LOG_DEBUG_CORE, (ev)->log, 0, "post event %p", ev);\ \ } else { \ ngx_log_debug1(NGX_LOG_DEBUG_CORE, (ev)->log, 0, \ "update posted event %p", ev); \ }
static ngx_int_t ngx_enable_accept_events(ngx_cycle_t *cycle) { ngx_uint_t i; ngx_listening_t *ls; ngx_connection_t *c; ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) { c = ls[i].connection; if (c NULL || c->read->active) { continue; } '調用epoll的 ngx_epoll_add_event,添加套接字到I/O事件引擎上' if (ngx_add_event(c->read, NGX_READ_EVENT, 0) NGX_ERROR) { return NGX_ERROR; } } return NGX_OK; }
/* nginx事件:用來處理I/O引擎中tcp的accept類型事件 根據其它模塊(http、stream、mail)設置的 */ void ngx_event_accept(ngx_event_t *ev) { socklen_t socklen; ngx_err_t err; ngx_log_t *log; ngx_uint_t level; ngx_socket_t s; ngx_event_t *rev, *wev; ngx_sockaddr_t sa; ngx_listening_t *ls; ngx_connection_t *c, *lc; ngx_event_conf_t *ecf; if (ev->timedout) { if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) { return; } ev->timedout = 0; } ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module); if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) { ev->available = ecf->multi_accept; } lc = ev->data; ls = lc->listening; ev->ready = 0; ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0, "accept on %V, ready: %d", &ls->addr_text, ev->available); do { socklen = sizeof(ngx_sockaddr_t); // 接受 s = accept(lc->fd, &sa.sockaddr, &socklen); if (s (ngx_socket_t) -1) { err = ngx_socket_errno; if (err NGX_EAGAIN) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, err, "accept() not ready"); return; } level = NGX_LOG_ALERT; if (err NGX_ECONNABORTED) { level = NGX_LOG_ERR; } else if (err NGX_EMFILE || err NGX_ENFILE) { level = NGX_LOG_CRIT; } ngx_log_error(level, ev->log, err, "accept() failed"); if (err NGX_ECONNABORTED) { if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available--; } if (ev->available) { continue; } } if (err NGX_EMFILE || err NGX_ENFILE) { if (ngx_disable_accept_events((ngx_cycle_t *) ngx_cycle, 1) != NGX_OK) { return; } if (ngx_use_accept_mutex) { if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); ngx_accept_mutex_held = 0; } ngx_accept_disabled = 1; } else { ngx_add_timer(ev, ecf->accept_mutex_delay); } } return; } ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n; c = ngx_get_connection(s, ev->log); if (c NULL) { if (ngx_close_socket(s) -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_close_socket_n " failed"); } return; } c->type = SOCK_STREAM; c->pool = ngx_create_pool(ls->pool_size, ev->log); if (c->pool NULL) { ngx_close_accepted_connection(c); return; } if (socklen > (socklen_t) sizeof(ngx_sockaddr_t)) { socklen = sizeof(ngx_sockaddr_t); } c->sockaddr = ngx_palloc(c->pool, socklen); if (c->sockaddr NULL) { ngx_close_accepted_connection(c); return; } ngx_memcpy(c->sockaddr, &sa, socklen); log = ngx_palloc(c->pool, sizeof(ngx_log_t)); if (log NULL) { ngx_close_accepted_connection(c); return; } /* set a blocking mode for iocp and non-blocking mode for others */ if (ngx_inherited_nonblocking) { if (ngx_event_flags & NGX_USE_IOCP_EVENT) { if (ngx_blocking(s) -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_blocking_n " failed"); ngx_close_accepted_connection(c); return; } } } else { if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) { if (ngx_nonblocking(s) -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_nonblocking_n " failed"); ngx_close_accepted_connection(c); return; } } } *log = ls->log; c->recv = ngx_recv; c->send = ngx_send; c->recv_chain = ngx_recv_chain; c->send_chain = ngx_send_chain; c->log = log; c->pool->log = log; c->socklen = socklen; c->listening = ls; c->local_sockaddr = ls->sockaddr; c->local_socklen = ls->socklen; rev = c->read; wev = c->write; wev->ready = 1; if (ngx_event_flags & NGX_USE_IOCP_EVENT) { rev->ready = 1; } if (ev->deferred_accept) { rev->ready = 1; } rev->log = log; wev->log = log; /* * TODO: MT: - ngx_atomic_fetch_add() * or protection by critical section or light mutex * * TODO: MP: - allocated in a shared memory * - ngx_atomic_fetch_add() * or protection by critical section or light mutex */ c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); if (ls->addr_ntop) { c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len); if (c->addr_text.data NULL) { ngx_close_accepted_connection(c); return; } c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen, c->addr_text.data, ls->addr_text_max_len, 0); if (c->addr_text.len 0) { ngx_close_accepted_connection(c); return; } } if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) 0) { if (ngx_add_conn(c) NGX_ERROR) { ngx_close_accepted_connection(c); return; } } log->data = NULL; log->handler = NULL; '根據其它模塊(http、stream、mail)設置的' ls->handler(c); if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available--; } } while (ev->available); }
特色:框架
例如:啓用epoll模塊事件結構:將全部的I/O操做都封裝成一個nginx事件,而後在work中循環調用異步
ngx_event_module_t ngx_epoll_module_ctx = { &epoll_name, ngx_epoll_create_conf, /* create configuration */ ngx_epoll_init_conf, /* init configuration */ //nginx的事件處理函數 { ngx_epoll_add_event, /* add an event */ ngx_epoll_del_event, /* delete an event */ ngx_epoll_add_event, /* enable an event */ ngx_epoll_del_event, /* disable an event */ ngx_epoll_add_connection, /* add an connection */ ngx_epoll_del_connection, /* delete an connection */ ngx_epoll_process_events, /* process the events */ ngx_epoll_init, /* init the events */ ngx_epoll_done, /* done the events */ } };
/** NGX_EVENT_MOUDLE中epoll的actions->init()函數 */ static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer) { ngx_epoll_conf_t *epcf; //獲取epoll的配置信息 epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module); if (ep -1) { ep = epoll_create(cycle->connection_n / 2); //建立epoll if (ep -1) { ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, "epoll_create() failed"); return NGX_ERROR; } } if (nevents < epcf->events) { if (event_list) { ngx_free(event_list); } event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events, cycle->log); if (event_list NULL) { return NGX_ERROR; } } nevents = epcf->events; ngx_io = ngx_os_io; ngx_event_actions = ngx_epoll_module_ctx.actions; //模塊的總事件 return NGX_OK; }
/** 添加nginx事件(一個套接字)到epoll中 */ static ngx_int_t ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags) { int op; uint32_t events, prev; ngx_event_t *e; ngx_connection_t *c; struct epoll_event ee; c = ev->data; events = (uint32_t) event; if (event NGX_READ_EVENT) { e = c->write; prev = EPOLLOUT; //表示對應的文件描述符能夠寫; } else { e = c->read; } if (e->active) { op = EPOLL_CTL_MOD; //修改已經註冊的fd的監聽事件 events |= prev; } else { op = EPOLL_CTL_ADD; //註冊新的fd到epfd中; } ee.events = events | (uint32_t) flags; ee.data.ptr = (void *) ((uintptr_t) c | ev->instance); ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0, "epoll add event: fd:%d op:%d ev:%08XD", c->fd, op, ee.events); '將套接字添加到I/O引擎中' if (epoll_ctl(ep, op, c->fd, &ee) -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "epoll_ctl(%d, %d) failed", op, c->fd); return NGX_ERROR; } ev->active = 1; return NGX_OK; }
/** I/O引擎事件處理函數 **/ static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { int events; uint32_t revents; ngx_int_t instance, i; ngx_uint_t level; ngx_err_t err; ngx_event_t *rev, *wev; ngx_queue_t *queue; ngx_connection_t *c; /* NGX_TIMER_INFINITE INFTIM */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll timer: %M", timer); //獲取epoll處理好的事件 events = epoll_wait(ep, event_list, (int) nevents, timer); err = (events -1) ? ngx_errno : 0; if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) { ngx_time_update(); } if (err) { if (err NGX_EINTR) { if (ngx_event_timer_alarm) { ngx_event_timer_alarm = 0; return NGX_OK; } level = NGX_LOG_INFO; } else { level = NGX_LOG_ALERT; } ngx_log_error(level, cycle->log, err, "epoll_wait() failed"); return NGX_ERROR; } if (events 0) { if (timer != NGX_TIMER_INFINITE) { return NGX_OK; } ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "epoll_wait() returned no events without timeout"); return NGX_ERROR; } //循環處理 for (i = 0; i < events; i++) { c = event_list[i].data.ptr; instance = (uintptr_t) c & 1; c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1); rev = c->read; if (c->fd -1 || rev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } revents = event_list[i].events; ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: fd:%d ev:%04XD d:%p", c->fd, revents, event_list[i].data.ptr); // 事件的錯誤信息 if (revents & (EPOLLERR|EPOLLHUP)) { ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll_wait() error on fd:%d ev:%04XD", c->fd, revents); /* * if the error events were returned, add EPOLLIN and EPOLLOUT * to handle the events at least in one active handler */ revents |= EPOLLIN|EPOLLOUT; } //讀事件 if ((revents & EPOLLIN) && rev->active) { rev->ready = 1; if (flags & NGX_POST_EVENTS) { queue = rev->accept ? &ngx_posted_accept_events : &ngx_posted_events; '將準備好讀事件添加到ngx_posted_accept_events和ngx_posted_events' ngx_post_event(rev, queue); } else { '若是不是NGX_POST_EVENTS,直接調用gx_posted_accept_events()和ngx_posted_events()' rev->handler(rev); } } wev = c->write; //寫事件 if ((revents & EPOLLOUT) && wev->active) { if (c->fd -1 || wev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } wev->ready = 1; if (flags & NGX_POST_EVENTS) { '將準備好的寫事件添加到ngx_posted_events' ngx_post_event(wev, &ngx_posted_events); } else { wev->handler(wev); } } } return NGX_OK; }
/** http鏈接處理函數 1.初始化ngx_http_connection_t 2.設置read,write的處理函數 3.將這個鏈接封裝成一個nginx事件,添加到事件隊列中 */ void ngx_http_init_connection(ngx_connection_t *c) { ngx_uint_t i; ngx_event_t *rev; struct sockaddr_in *sin; ngx_http_port_t *port; ngx_http_in_addr_t *addr; ngx_http_log_ctx_t *ctx; ngx_http_connection_t *hc; #if (NGX_HAVE_INET6) struct sockaddr_in6 *sin6; ngx_http_in6_addr_t *addr6; #endif //初始化http鏈接 hc = ngx_pcalloc(c->pool, sizeof(ngx_http_connection_t)); if (hc NULL) { ngx_http_close_connection(c); return; } //分配鏈接 c->data = hc; /* find the server configuration for the address:port */ //監聽端口 port = c->listening->servers; if (port->naddrs > 1) { /* * there are several addresses on this port and one of them * is an "*:port" wildcard so getsockname() in ngx_http_server_addr() * is required to determine a server address */ if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) { ngx_http_close_connection(c); return; } switch (c->local_sockaddr->sa_family) { #if (NGX_HAVE_INET6) case AF_INET6: sin6 = (struct sockaddr_in6 *) c->local_sockaddr; addr6 = port->addrs; /* the last address is "*" */ for (i = 0; i < port->naddrs - 1; i++) { if (ngx_memcmp(&addr6[i].addr6, &sin6->sin6_addr, 16) 0) { break; } } hc->addr_conf = &addr6[i].conf; break; #endif default: /* AF_INET */ sin = (struct sockaddr_in *) c->local_sockaddr; //初始化地址 addr = port->addrs; /* the last address is "*" */ for (i = 0; i < port->naddrs - 1; i++) { if (addr[i].addr sin->sin_addr.s_addr) { break; } } hc->addr_conf = &addr[i].conf; break; } } else { switch (c->local_sockaddr->sa_family) { #if (NGX_HAVE_INET6) case AF_INET6: addr6 = port->addrs; hc->addr_conf = &addr6[0].conf; break; #endif default: /* AF_INET */ addr = port->addrs; hc->addr_conf = &addr[0].conf; break; } } /* the default server configuration for the address:port */ hc->conf_ctx = hc->addr_conf->default_server->ctx; //配置文件 ctx = ngx_palloc(c->pool, sizeof(ngx_http_log_ctx_t)); if (ctx NULL) { ngx_http_close_connection(c); return; } ctx->connection = c; ctx->request = NULL; ctx->current_request = NULL; c->log->connection = c->number; c->log->handler = ngx_http_log_error; c->log->data = ctx; c->log->action = "waiting for request"; c->log_error = NGX_ERROR_INFO; rev = c->read; rev->handler = ngx_http_wait_request_handler; //讀事件 c->write->handler = ngx_http_empty_handler; //寫事件 #if (NGX_HTTP_V2) if (hc->addr_conf->http2) { rev->handler = ngx_http_v2_init; } #endif #if (NGX_HTTP_SSL) { ngx_http_ssl_srv_conf_t *sscf; sscf = ngx_http_get_module_srv_conf(hc->conf_ctx, ngx_http_ssl_module); if (sscf->enable || hc->addr_conf->ssl) { c->log->action = "SSL handshaking"; if (hc->addr_conf->ssl && sscf->ssl.ctx NULL) { ngx_log_error(NGX_LOG_ERR, c->log, 0, "no \"ssl_certificate\" is defined " "in server listening on SSL port"); ngx_http_close_connection(c); return; } hc->ssl = 1; rev->handler = ngx_http_ssl_handshake; //SSL處理事件 } } #endif if (hc->addr_conf->proxy_protocol) { hc->proxy_protocol = 1; c->log->action = "reading PROXY protocol"; } if (rev->ready) { /* the deferred accept(), iocp */ if (ngx_use_accept_mutex) { ngx_post_event(rev, &ngx_posted_events); '' return; } rev->handler(rev); '直接執行read處理函數' return; } //設置結構的時間 ngx_add_timer(rev, c->listening->post_accept_timeout); ngx_reusable_connection(c, 1); '將這個鏈接套接字放到I/O事件引擎上' if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_http_close_connection(c); return; } }
/** http 鏈接的讀事件 1. 初始化ngx_http_connection_t 2. 設置buffer的大小 3. 測試鏈接是否可用 3. 若是有動態代理,則將其封裝成一個事件放到nginx的事件隊列 5. 從新設置鏈接事件爲:ngx_http_process_request_line 6. 解析讀到的內容 */ static void ngx_http_wait_request_handler(ngx_event_t *rev) { u_char *p; size_t size; ssize_t n; ngx_buf_t *b; ngx_connection_t *c; ngx_http_connection_t *hc; ngx_http_core_srv_conf_t *cscf; //初始化ngx_connection_t c = rev->data; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http wait request handler"); if (rev->timedout) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out"); ngx_http_close_connection(c); return; } if (c->close) { ngx_http_close_connection(c); return; } //初始化ngx_http_connection_t鏈接 hc = c->data; cscf = ngx_http_get_module_srv_conf(hc->conf_ctx, ngx_http_core_module); //初始化buffer的大小 size = cscf->client_header_buffer_size; b = c->buffer; if (b NULL) { b = ngx_create_temp_buf(c->pool, size); if (b NULL) { ngx_http_close_connection(c); return; } c->buffer = b; } else if (b->start NULL) { b->start = ngx_palloc(c->pool, size); if (b->start NULL) { ngx_http_close_connection(c); return; } b->pos = b->start; b->last = b->start; b->end = b->last + size; } //讀取數據:用於測試鏈接是否可用 n = c->recv(c, b->last, size); //若是出錯 if (n NGX_AGAIN) { if (!rev->timer_set) { ngx_add_timer(rev, c->listening->post_accept_timeout); ngx_reusable_connection(c, 1); } if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_http_close_connection(c); return; } /* * We are trying to not hold c->buffer's memory for an idle connection. */ if (ngx_pfree(c->pool, b->start) NGX_OK) { b->start = NULL; } return; } if (n NGX_ERROR) { ngx_http_close_connection(c); return; } if (n 0) { ngx_log_error(NGX_LOG_INFO, c->log, 0, "client closed connection"); ngx_http_close_connection(c); return; } b->last += n; //調用動態代理 if (hc->proxy_protocol) { hc->proxy_protocol = 0; //返回動態代理的地址 p = ngx_proxy_protocol_read(c, b->pos, b->last); if (p NULL) { ngx_http_close_connection(c); return; } b->pos = p; if (b->pos b->last) { c->log->action = "waiting for request"; b->pos = b->start; b->last = b->start; '將這個鏈接封裝成一個nginx事件,添加到事件隊列中' ngx_post_event(rev, &ngx_posted_events); return; } } c->log->action = "reading client request line"; ngx_reusable_connection(c, 0); c->data = ngx_http_create_request(c); if (c->data NULL) { ngx_http_close_connection(c); return; } '從新設置事件的處理函數' rev->handler = ngx_http_process_request_line; '解析讀到的內容' ngx_http_process_request_line(rev); }