1. Nginx 頻道
ngx_channel_t 頻道是 Nginx master 進程與 worker 進程之間通訊的經常使用工具,它是使用本機套接字實現的,即 socketpair 方法,它用於建立父子進程間使用的套接字。數組
#include <sys/types.h> /* See NOTES */ #include <sys/socket.h> int socketpair(int domain, int type, int protocol, int sv[2]);
這個方法能夠建立一對關聯的套接字 sv[2]。dom
- domain:表示域,在 Linux 下一般取值爲 AF_UNIX;
- type:取值爲 SOCK_STREAM 或 SOCK_DGRAM,它表示在套接字上使用的是 TCP 仍是 UDP;
- protocol:必須傳遞 0;
- sv[2]:是一個含有兩個元素的整型數組,實際上就是兩個套接字。
- 當 socketpair 返回 0 時,sv[2] 這兩個套接字建立成功,不然 sockpair 返回 -1 表示失敗.
當 socketpair 執行成功時,sv[2] 這兩個套接字具有下列關係:socket
-
向 sv[0] 套接字寫入數據,將能夠從 sv[1] 套接字中讀取到剛寫入的數據;函數
-
一樣,向 sv[1] 套接字寫入數據,也能夠從 sv[0] 中讀取到寫入的數據。工具
-
一般,在父、子進程通訊前,會先調用 socketpair 方法建立這樣一組套接字,在調用 fork 方法建立出子進程後,將會在父進程中關閉 sv[1] 套接字,僅使用 sv[0] 套接字用於向子進程發送數據以及接收子進程發送來的數據;ui
-
而在子進程中則關閉 sv[0] 套接字,僅使用 sv[1] 套接字既能夠接收父進程發送來的數據,也能夠向父進程發送數據。this
ngx_channel_t 結構體是 Nginx 定義的 master 父進程與 worker 子進程間的消息格式,以下:spa
typedef struct { // 傳遞的 TCP 消息中的命令 ngx_uint_t command; // 進程 ID,通常是發送命令方的進程 ID ngx_pid_t pid; // 表示發送命令方在 ngx_processes 進程數組間的序號 ngx_int_t slot; // 通訊的套接字句柄 ngx_fd_t fd; }ngx_channel_t;
Nginx 針對 command 成員定義了以下命令:code
// 打開頻道,使用頻道這種方式通訊前必須發送的命令 #define NGX_CMD_OPEN_CHANNEL 1 // 關閉已經打開的頻道,實際上也就是關閉套接字 #define NGX_CMD_CLOSE_CHANNEL 2 // 要求接收方正常地退出進程 #define NGX_CMD_QUIT 3 // 要求接收方強制地結束進程 #define NGX_CMD_TERMINATE 4 // 要求接收方從新打開進程已經打開過的文件 #define NGX_CMD_REOPEN 5
問:master 是如何啓動、中止 worker 子進程的?
答:正是經過 socketpair 產生的套接字發送命令的,即每次要派生一個子進程以前,都會先調用 socketpair 方法。進程
在 Nginx 派生子進程的 ngx_spawn_process 方法中,會首先派生基於 TCP 的套接字,以下:
ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn) { if (respawn != NGX_PROCESS_DETACHED) { /* Solaris 9 still has no AF_LOCAL */ // ngx_processes[s].channel 數組正是將要用於父、子進程間通訊的套接字對 if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1) { return NGX_INVALID_PID; } // 將 channel 套接字對都設置爲非阻塞模式 if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) { ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) { ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } ... }
ngx_processes 數組定義了 Nginx 服務中全部的進程,包括 master 進程和 worker 進程,以下:
#define NGX_MAX_PROCESSES 1024 // 雖然定義了 NGX_MAX_PROCESSES 個成員,可是已經使用的元素僅與啓動的進程個數有關 ngx_processes_t ngx_processes[NGX_MAX_PROCESSES];
ngx_processes 數組的類型是 ngx_processes_t,對於頻道來講,這個結構體只關心它的 channel 成員:
typedef struct { ... // socketpair 建立的套接字對 ngx_socket_t channel[2]; }ngx_processes_t;
1. ngx_write_channel:使用頻道發送 ngx_channel_t 消息
ngx_int_t ngx_write_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size, ngx_log_t *log) { ssize_t n; ngx_err_t err; struct iovec iov[1]; struct msghdr msg; #if (NGX_HAVE_MSGHDR_MSG_CONTROL) union { struct cmsghdr cm; char space[CMSG_SPACE(sizeof(int))]; }cmsg; if (ch->fd == -1) { msg.msg_control = NULL; msg.msg_controllen = 0; } else { // 輔助數據 msg.msg_control = (caddr_t)&cmsg; msg.msg_controllen = sizeof(cmsg); ngx_memzero(&cmsg, sizeof(cmsg)); cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); cmsg.cm.cmsg_level = SOL_SOCKET; cmsg.cm.cmsg_type = SCM_RIGHTS; /* * We have to use ngx_memcpy() instead of simple * *(int *) CMSG_DATA(&cmsg.cm) = ch->fd; * because some gcc 4.4 with -O2/3/s optimization issues the warning: * dereferencing type-punned pointer will break strict-aliasing rules * * Fortunately, gcc with -O1 compiles this ngx_memcpy() * in the same simple assignment as in the code above */ ngx_memcpy(CMSG_DATA(&cmsg.cm), &ch->fd, sizeof(int)); } msg.msg_flags = 0; #else if (ch->fd == -1) { msg.msg_accrights = NULL; msg.msg_accrightslen = 0; } else { msg.msg_accrights = (caddr_t) &ch->fd; msg.msg_accrightslen = sizeof(int); } #endif // 指向要發送的 ch 起始地址 iov[0].iov_base = (char *) ch; iov[0].iov_len = size; // msg_name 和 msg_namelen 僅用於未鏈接套接字(如UDP) msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; // 將該 ngx_channel_t 消息發出去 n = sendmsg(s, &msg, 0); if (n == -1) { err = ngx_errno; if (err == NGX_EAGAIN) { return NGX_AGAIN; } return NGX_ERROR; } return NGX_OK; }
2. ngx_read_channel: 讀取消息
ngx_int_t ngx_read_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size, ngx_log_t *log) { ssize_t n; ngx_err_t err; struct iovec iov[1]; struct msghdr msg; #if (NGX_HAVE_MSGHDR_MSG_CONTROL) union { struct cmsghdr cm; char space[CMSG_SPACE(sizeof(int))]; } cmsg; #else int fd; #endif iov[0].iov_base = (char *)ch; iov[0].iov_len = size; msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; #if (NGX_HAVE_MSGHDR_MSG_CONTROL) msg.msg_control = (caddr_t) &cmsg; msg.msg_controllen = sizeof(cmsg); #else msg.msg_accrights = (caddr_t) &fd; msg.msg_accrightslen = sizeof(int); #endif // 接收命令 n = recvmsg(s, &msg, 0); if (n == -1) { err = ngx_errno; if (err == NGX_EAGAIN) { return NGX_AGAIN; } return NGX_ERROR; } if (n == 0) { return NGX_ERROR; } // 接收的數據不足 if ((size_t) n < sizeof(ngx_channel_t)) { return NGX_ERROR; } #if (NGX_HAVE_MSGHDR_MSG_CONTROL) // 若接收到的命令爲"打開頻道,使用頻道這種方式通訊前必須發送的命令" if (ch->command == NGX_CMD_OPEN_CHANNEL) { if (cmsg.cm.cmsg_len < (socklen_t) CMSG_LEN(sizeof(int))) { return NGX_ERROR; } if (cmsg.cm.cmsg_level != SOL_SOCKET || cmsg.cm.cmsg_type != SCM_RIGHTS) { return NGX_ERROR; } /* ch->fd = *(int *) CMSG_DATA(&cmsg.cm); */ ngx_memcpy(&ch->fd, CMSG_DATA(&cmsg.cm), sizeof(int)); } // 若接收到的消息是被截斷的 if (msg.msg_flags & (MSG_TRUNC|MSG_CTRUNC)) { ngx_log_error(NGX_LOG_ALERT, log, 0, "recvmsg() truncated data"); } #else if (ch->command == NGX_CMD_OPEN_CHANNEL) { if (msg.msg_accrightslen != sizeof(int)) { return NGX_ERROR; } ch->fd = fd; } #endif return n; }
在 Nginx 中,目前僅存在 master 進程向 worker 進程發送消息的場景,這時對於 socketpair 方法建立的 channel[2] 套接字來講,master 進程會使用 channel[0] 套接字來發送消息,而 worker 進程會使用 channel[1] 套接字來接收消息。
3. ngx_add_channel_event: 把接收頻道消息的套接字添加到 epoll 中
worker 進程調度 ngx_read_channel 方法接收頻道消息是經過該 ngx_add_channel_event 函數將接收頻道消息的套接字(對於 worker 即爲channel[1])添加到 epoll 中,當接收到父進程消息時子進程會經過 epoll 的事件回調相應的 handler 方法來處理這個頻道消息,以下:
ngx_int_t ngx_add_channel_event(ngx_cycle_t *cycle, ngx_fd_t fd, ngx_int_t event, ngx_event_handler_pt handler) { ngx_event_t *ev, *rev, *wev; ngx_connection_t *c; // 獲取一個空閒鏈接 c = ngx_get_connection(fd, cycle->log); if (c == NULL) { return NGX_ERROR; } c->pool = cycle->pool; rev = c->read; wev = c->write; rev->log = cycle->log; wev->log = cycle->log; rev->channel = 1; wev->channel = 1; ev = (event == NGX_READ_EVENT) ? rev : wev; // 初始化監聽該 ev 事件時調用的回調函數 ev->handler = handler; // 將該接收頻道消息的套接字添加到 epoll 中 if (ngx_add_conn && (ngx_event_flags && NGX_USE_EPOLL_EVENT) == 0) { // 這裏是同時監聽該套接字的讀、寫事件 if (ngx_add_conn(c) == NGX_ERROR) { ngx_free_connection(c); return NGX_ERROR; } } else { // 這裏是僅監聽 ev 事件 if (ngx_add_event(ev, event, 0) == NGX_ERROR) { ngx_free_connection(c); return NGX_ERROR; } } return NGX_OK; }
4. ngx_close_channel: 關閉這個頻道通訊方式
void ngx_close_channel(ngx_fd_t *fd, ngx_log_t *log) { if (close(fd[0]) == -1) { } if (close(fd[1]) == -1) { } }