Nginx的stream模塊提供了TCP負載均衡的功能,最初的stream模塊比較簡單,在nginx-1.11.4後也開始採用相似HTTP模塊中分階段處理請求的方式。node
在ngx_stream.h中定義了stream模塊的7個階段。以下面所示nginx
typedef enum { NGX_STREAM_POST_ACCEPT_PHASE = 0, NGX_STREAM_PREACCESS_PHASE, NGX_STREAM_ACCESS_PHASE, NGX_STREAM_SSL_PHASE, NGX_STREAM_PREREAD_PHASE, NGX_STREAM_CONTENT_PHASE, NGX_STREAM_LOG_PHASE } ngx_stream_phases;
與HTTP模塊相同,每一個階段有相應的checker檢查方法和handler回調方法,每一個階段都有零個或多個ngx_stream_phase_handler_t結構體。服務器
typedef struct ngx_stream_phase_handler_s ngx_stream_phase_handler_t; typedef ngx_int_t (*ngx_stream_phase_handler_pt)(ngx_stream_session_t *s, ngx_stream_phase_handler_t *ph); typedef ngx_int_t (*ngx_stream_handler_pt)(ngx_stream_session_t *s); typedef void (*ngx_stream_content_handler_pt)(ngx_stream_session_t *s); struct ngx_stream_phase_handler_s { ngx_stream_phase_handler_pt checker; ngx_stream_handler_pt handler; ngx_uint_t next; };
stream模塊接到請求後,初始化鏈接後調用ngx_stream_core_run_phases依次執行各個階段的處理函數。session
void ngx_stream_core_run_phases(ngx_stream_session_t *s) { ngx_int_t rc; ngx_stream_phase_handler_t *ph; ngx_stream_core_main_conf_t *cmcf; cmcf = ngx_stream_get_module_main_conf(s, ngx_stream_core_module); ph = cmcf->phase_engine.handlers; while (ph[s->phase_handler].checker) { rc = ph[s->phase_handler].checker(s, &ph[s->phase_handler]); if (rc == NGX_OK) { return; } } }
POST_ACCEPT、PREACCESS、ACCESS階段的checker檢查方法都是ngx_stream_core_generic_phase,這三個階段主要進行訪問控制的一些工做。由於stream模塊處理TCP請求,階段之間關係比較簡單,將模塊掛載在哪一個階段只會影響執行的順序。負載均衡
下面是ngx_stream_core_generic_phase函數tcp
ngx_int_t ngx_stream_core_generic_phase(ngx_stream_session_t *s, ngx_stream_phase_handler_t *ph) { ngx_int_t rc; /* * generic phase checker, * used by all phases, except for preread and content */ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, "generic phase: %ui", s->phase_handler); rc = ph->handler(s); if (rc == NGX_OK) { s->phase_handler = ph->next; return NGX_AGAIN; } if (rc == NGX_DECLINED) { s->phase_handler++; return NGX_AGAIN; } if (rc == NGX_AGAIN || rc == NGX_DONE) { return NGX_OK; } if (rc == NGX_ERROR) { rc = NGX_STREAM_INTERNAL_SERVER_ERROR; } ngx_stream_finalize_session(s, rc); return NGX_OK; }
與HTTP模塊相似,根據rc = ph->handler(s)
結果進行處理。函數
注意,TCP請求中沒有HTTP請求那樣的狀態碼,這裏的狀態碼只是表示鏈接處理的信息,如源超時狀態碼爲502。ui
SSL階段的checker檢查方法也是ngx_stream_core_generic_phase,這裏掛載了ngx_stream_ssl_module,開啓SSL時這裏進行SSL握手的處理。debug
這個階段的特色是會讀取下游的請求包體。讀取後調用handler回調函數處理。讀取的數據會保存在c->buffer中。如CONTENT階段中ngx_stream_proxy_module在處理時會將c->buffer中的內容發送給上游源服務器。若是你在CONTENT階段以前讀取了下游的數據又想將這些數據經過proxy發送給上游,就能夠加數據放到c->buffer中。代理
這裏官方掛載了ngx_stream_ssl_preread_module模塊,當TCP鏈接採用SSL通訊時用這個模塊能夠解析client hello握手包,從extensions字段中獲得server_name賦值給變量$ssl_preread_server_name中。這個模塊只能在監聽地址沒有開啓SSL,可是上下游倒是經過SSL進行通訊的狀況下使用。
PREREAD階段checker檢查方法是ngx_stream_core_preread_phase。以下所示
ngx_int_t ngx_stream_core_preread_phase(ngx_stream_session_t *s, ngx_stream_phase_handler_t *ph) { size_t size; ssize_t n; ngx_int_t rc; ngx_connection_t *c; ngx_stream_core_srv_conf_t *cscf; c = s->connection; c->log->action = "prereading client data"; cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module); if (c->read->timedout) { rc = NGX_STREAM_OK; } else if (c->read->timer_set) { rc = NGX_AGAIN; } else { rc = ph->handler(s); } while (rc == NGX_AGAIN) { if (c->buffer == NULL) { c->buffer = ngx_create_temp_buf(c->pool, cscf->preread_buffer_size); if (c->buffer == NULL) { rc = NGX_ERROR; break; } } size = c->buffer->end - c->buffer->last; if (size == 0) { ngx_log_error(NGX_LOG_ERR, c->log, 0, "preread buffer full"); rc = NGX_STREAM_BAD_REQUEST; break; } if (c->read->eof) { rc = NGX_STREAM_OK; break; } if (!c->read->ready) { if (ngx_handle_read_event(c->read, 0) != NGX_OK) { rc = NGX_ERROR; break; } if (!c->read->timer_set) { ngx_add_timer(c->read, cscf->preread_timeout); } c->read->handler = ngx_stream_session_handler; return NGX_OK; } n = c->recv(c, c->buffer->last, size); if (n == NGX_ERROR) { rc = NGX_STREAM_OK; break; } if (n > 0) { c->buffer->last += n; } rc = ph->handler(s); } if (c->read->timer_set) { ngx_del_timer(c->read); } if (rc == NGX_OK) { s->phase_handler = ph->next; return NGX_AGAIN; } if (rc == NGX_DECLINED) { s->phase_handler++; return NGX_AGAIN; } if (rc == NGX_DONE) { return NGX_OK; } if (rc == NGX_ERROR) { rc = NGX_STREAM_INTERNAL_SERVER_ERROR; } ngx_stream_finalize_session(s, rc); return NGX_OK; }
CONTENT階段的檢查方法是ngx_stream_core_content_phase
ngx_int_t ngx_stream_core_content_phase(ngx_stream_session_t *s, ngx_stream_phase_handler_t *ph) { ngx_connection_t *c; ngx_stream_core_srv_conf_t *cscf; c = s->connection; c->log->action = NULL; cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module); if (c->type == SOCK_STREAM && cscf->tcp_nodelay && ngx_tcp_nodelay(c) != NGX_OK) { ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR); return NGX_OK; } cscf->handler(s); return NGX_OK; }
很顯然這一階段只能有一個處理方法,如進行TCP代理時須要ngx_stream_proxy_module,這裏掛載的就是proxy模塊的handler函數
LOG階段雖然是一個獨立的階段,倒是在鏈接結束時調用的。在CONTENT階段結束時就會調用ngx_stream_finalize_session結束請求。這部分在ngx_stream_handler.c中
void ngx_stream_finalize_session(ngx_stream_session_t *s, ngx_uint_t rc) { ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, "finalize stream session: %i", rc); s->status = rc; ngx_stream_log_session(s); ngx_stream_close_connection(s->connection); } static void ngx_stream_log_session(ngx_stream_session_t *s) { ngx_uint_t i, n; ngx_stream_handler_pt *log_handler; ngx_stream_core_main_conf_t *cmcf; cmcf = ngx_stream_get_module_main_conf(s, ngx_stream_core_module); log_handler = cmcf->phases[NGX_STREAM_LOG_PHASE].handlers.elts; n = cmcf->phases[NGX_STREAM_LOG_PHASE].handlers.nelts; for (i = 0; i < n; i++) { log_handler[i](s); } }