nginx事件模塊 -- 第七篇 建立鏈接

微信公衆號:鄭爾多斯
關注可瞭解更多的Nginx知識。任何問題或建議,請公衆號留言;
關注公衆號,有趣有內涵的文章第一時間送達!node

事件驅動框架

咱們前面分析過,在ngx_event_process_init()中,會將每一個監聽端口的rev->handler設置爲ngx_event_accept()函數,而後把對應的讀事件加入到epoll驅動模塊中。這樣,在執行ngx_epoll_process_events()的時候,若是有新鏈接事件出現,那麼就會調用ngx_event_accept()方法進行創建鏈接。
Nginx爲了解決驚羣現象和負載均衡問題,使用了accept_mutex以及post事件處理機制,那麼什麼是post事件處理機制?
所謂的post事件處理機制就是容許事件延後執行。Nginx設計了兩個隊列,一個是由被觸發的監聽鏈接的讀事件構成的ngx_posted_accept_events隊列,另外一個是由普通讀/寫事件構成的ngx_posted_events隊列。nginx

epoll_wait()產生的事件,分別存放到這兩個隊列中。讓存放新鏈接事件的ngx_posted_accept_events隊列優先執行,存放普通事件的ngx_posted_events隊列最後執行,這樣就能夠保證創建鏈接的及時性,同時這也是解決驚羣和負載均衡的關鍵所在。windows

ngx_event_accept

 1void
2ngx_event_accept(ngx_event_t *ev)
3
{
4    socklen_t          socklen;
5    ngx_err_t          err;
6    ngx_log_t         *log;
7    ngx_uint_t         level;
8    ngx_socket_t       s;
9    ngx_event_t       *rev, *wev;
10    ngx_sockaddr_t     sa;
11    ngx_listening_t   *ls;
12    ngx_connection_t  *c, *lc;
13    ngx_event_conf_t  *ecf;
14#if (NGX_HAVE_ACCEPT4)
15    static ngx_uint_t  use_accept4 = 1;
16#endif
17
18    if (ev->timedout) {
19        if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) {
20            return;
21        }
22
23        ev->timedout = 0;
24    }
25
26    ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);
27
28    if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) {
29        ev->available = ecf->multi_accept;
30    }
31
32   // 事件關聯的data,也即當前事件對應的connection
33    lc = ev->data; 
34   // connection對應的監聽端口結構
35    ls = lc->listening;
36 // ready=0,表示當前事件沒有就緒,由於咱們已經消費了讀事件
37    ev->ready = 0;
38
39
40
41    do { 
42        socklen = sizeof(ngx_sockaddr_t);
43// 使用accept建立一個鏈接
44        s = accept(lc->fd, &sa.sockaddr, &socklen);
45        if (s == (ngx_socket_t-1) {
46           /*accept()失敗的時候處理邏輯,這裏能夠忽略*/
47            return;
48        }
49
50#if (NGX_STAT_STUB)
51     /*這裏是用於統計每一個worker進程的統計數據,好比accept的數量等*/
52        (void) ngx_atomic_fetch_add(ngx_stat_accepted, 1);
53#endif
54
55        ngx_accept_disabled = ngx_cycle->connection_n / 8
56                              - ngx_cycle->free_connection_n;
57
58        c = ngx_get_connection(s, ev->log);
59
60        if (c == NULL) {
61           /*獲取connection失敗*/
62            return;
63        }
64
65        c->type = SOCK_STREAM;
66
67        c->pool = ngx_create_pool(ls->pool_size, ev->log);
68        if (c->pool == NULL) {
69            ngx_close_accepted_connection(c);
70            return;
71        }
72
73        if (socklen > (socklen_tsizeof(ngx_sockaddr_t)) {
74            socklen = sizeof(ngx_sockaddr_t);
75        }
76
77        c->sockaddr = ngx_palloc(c->pool, socklen);
78        if (c->sockaddr == NULL) {
79            ngx_close_accepted_connection(c);
80            return;
81        }
82
83        ngx_memcpy(c->sockaddr, &sa, socklen);
84
85        log = ngx_palloc(c->pool, sizeof(ngx_log_t));
86        if (log == NULL) {
87            ngx_close_accepted_connection(c);
88            return;
89        }
90
91        /* set a blocking mode for iocp and non-blocking mode for others */
92
93        if (ngx_inherited_nonblocking) {
94            if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
95                /*這裏是windows的處理邏輯*/
96            }
97
98        } else {
99            if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) {
100                if (ngx_nonblocking(s) == -1) {
101
102                    ngx_close_accepted_connection(c);
103                    return;
104                }
105            }
106        }
107
108        *log = ls->log;
109
110        c->recv = ngx_recv;
111        c->send = ngx_send;
112        c->recv_chain = ngx_recv_chain;
113        c->send_chain = ngx_send_chain;
114
115        c->log = log;
116        c->pool->log = log;
117
118        c->socklen = socklen;
119        c->listening = ls;
120        c->local_sockaddr = ls->sockaddr;
121        c->local_socklen = ls->socklen;
122
123#if (NGX_HAVE_UNIX_DOMAIN)
124        if (c->sockaddr->sa_family == AF_UNIX) {
125            c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;
126            c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;
127#if (NGX_SOLARIS)
128            /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */
129            c->sendfile = 0;
130#endif
131        }
132#endif
133
134        rev = c->read;
135        wev = c->write;
136
137        wev->ready = 1;
138
139        if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
140            rev->ready = 1;
141        }
142
143        if (ev->deferred_accept) {
144            rev->ready = 1;
145#if (NGX_HAVE_KQUEUE || NGX_HAVE_EPOLLRDHUP)
146            rev->available = 1;
147#endif
148        }
149
150        rev->log = log;
151        wev->log = log;
152
153        c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
154
155        if (ls->addr_ntop) {
156            c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);
157            if (c->addr_text.data == NULL) {
158                ngx_close_accepted_connection(c);
159                return;
160            }
161
162            c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen,
163                                             c->addr_text.data,
164                                             ls->addr_text_max_len, 0);
165            if (c->addr_text.len == 0) {
166                ngx_close_accepted_connection(c);
167                return;
168            }
169        }
170
171
172        if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {
173            if (ngx_add_conn(c) == NGX_ERROR) {
174                ngx_close_accepted_connection(c);
175                return;
176            }
177        }
178
179        log->data = NULL;
180        log->handler = NULL;
181
182        ls->handler(c);
183
184    } while (ev->available);
185}
複製代碼

上面的代碼我作了刪減,把一些錯誤判斷和非Linux平臺代碼刪除,這個並不影響代碼的功能和閱讀,如今咱們簡單的分析一下這個源碼:微信

  • 首先這個是一個do…while循環,循環的條件就是ev->available,該條件控制是否一次創建多個鏈接。
  • 循環體首先使用accept()函數建立一個鏈接
  • 設置ngx_accept_disabled變量,而後調用ngx_get_connection()獲取一個鏈接。
  • 下面就是設置咱們上一步獲取的鏈接的各個屬性。

最重要的一步是最後的代碼:
ls->handler(c);咱們前面知道,ls->handler其實就是ngx_http_init_connection,從這個函數開始,咱們就進入了http初始化的過程了。app

multi_accept的做用

Syntax: multi_accept on | off;
Default: multi_accept off;
Context: events負載均衡

If multi_accept is disabled, a worker process will accept one new connection at a time. Otherwise, a worker process will accept all new connections at a time.
The directive is ignored if kqueue connection processing method is used, because it reports the number of new connections waiting to be accepted.
中文翻譯:
若是multi_accept被禁止了,nginx一個工做進程只能同時接受一個新的鏈接。不然,一個工做進程能夠同時接受全部的新鏈接。
若是nginx使用kqueue鏈接方法,那麼這條指令會被忽略,由於這個方法會報告在等待被接受的新鏈接的數量。框架

若是啓用了這個參數,那麼當一個worker進程獲取到accept_mutex以後,會一次儘量的多的創建鏈接。socket


喜歡本文的朋友們,歡迎長按下圖關注訂閱號鄭爾多斯,更多精彩內容第一時間送達
tcp

鄭爾多斯
鄭爾多斯
相關文章
相關標籤/搜索