Nginx之進程間的通訊機制(Nginx頻道)

1. Nginx 頻道

ngx_channel_t 頻道是 Nginx master 進程與 worker 進程之間通訊的經常使用工具,它是使用本機套接字實現的,即 socketpair 方法,它用於建立父子進程間使用的套接字。數組

#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>

int socketpair(int domain, int type, int protocol, int sv[2]);

這個方法能夠建立一對關聯的套接字 sv[2]。dom

  • domain:表示域,在 Linux 下一般取值爲 AF_UNIX;
  • type:取值爲 SOCK_STREAM 或 SOCK_DGRAM,它表示在套接字上使用的是 TCP 仍是 UDP;
  • protocol:必須傳遞 0;
  • sv[2]:是一個含有兩個元素的整型數組,實際上就是兩個套接字。
  • 當 socketpair 返回 0 時,sv[2] 這兩個套接字建立成功,不然 sockpair 返回 -1 表示失敗.

當 socketpair 執行成功時,sv[2] 這兩個套接字具有下列關係:socket

  • 向 sv[0] 套接字寫入數據,將能夠從 sv[1] 套接字中讀取到剛寫入的數據;函數

  • 一樣,向 sv[1] 套接字寫入數據,也能夠從 sv[0] 中讀取到寫入的數據。工具

  • 一般,在父、子進程通訊前,會先調用 socketpair 方法建立這樣一組套接字,在調用 fork 方法建立出子進程後,將會在父進程中關閉 sv[1] 套接字,僅使用 sv[0] 套接字用於向子進程發送數據以及接收子進程發送來的數據;ui

  • 而在子進程中則關閉 sv[0] 套接字,僅使用 sv[1] 套接字既能夠接收父進程發送來的數據,也能夠向父進程發送數據。this

ngx_channel_t 結構體是 Nginx 定義的 master 父進程與 worker 子進程間的消息格式,以下:spa

typedef struct {
    // 傳遞的 TCP 消息中的命令
    ngx_uint_t  command;
    // 進程 ID,通常是發送命令方的進程 ID
    ngx_pid_t   pid;
    // 表示發送命令方在 ngx_processes 進程數組間的序號
    ngx_int_t   slot;
    // 通訊的套接字句柄
    ngx_fd_t    fd;
}ngx_channel_t;

Nginx 針對 command 成員定義了以下命令:code

// 打開頻道,使用頻道這種方式通訊前必須發送的命令
#define NGX_CMD_OPEN_CHANNEL 1

// 關閉已經打開的頻道,實際上也就是關閉套接字
#define NGX_CMD_CLOSE_CHANNEL 2

// 要求接收方正常地退出進程
#define NGX_CMD_QUIT 3

// 要求接收方強制地結束進程
#define NGX_CMD_TERMINATE 4

// 要求接收方從新打開進程已經打開過的文件
#define NGX_CMD_REOPEN 5

問:master 是如何啓動、中止 worker 子進程的?
答:正是經過 socketpair 產生的套接字發送命令的,即每次要派生一個子進程以前,都會先調用 socketpair 方法。進程

在 Nginx 派生子進程的 ngx_spawn_process 方法中,會首先派生基於 TCP 的套接字,以下:

ngx_pid_t 
ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, 
    char *name, ngx_int_t respawn)
{

    if (respawn != NGX_PROCESS_DETACHED) {
    
        /* Solaris 9 still has no AF_LOCAL */
        
        // ngx_processes[s].channel 數組正是將要用於父、子進程間通訊的套接字對
        if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1)
        {
            return NGX_INVALID_PID;
        }
        
        // 將 channel 套接字對都設置爲非阻塞模式
        if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {
            ngx_close_channel(ngx_processes[s].channel, cycle->log);
            return NGX_INVALID_PID;
        }
        
        if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {
            ngx_close_channel(ngx_processes[s].channel, cycle->log);
            return NGX_INVALID_PID;
        }
    ...
}

ngx_processes 數組定義了 Nginx 服務中全部的進程,包括 master 進程和 worker 進程,以下:

#define NGX_MAX_PROCESSES 1024

// 雖然定義了 NGX_MAX_PROCESSES 個成員,可是已經使用的元素僅與啓動的進程個數有關
ngx_processes_t ngx_processes[NGX_MAX_PROCESSES];

ngx_processes 數組的類型是 ngx_processes_t,對於頻道來講,這個結構體只關心它的 channel 成員:

typedef struct {
    ...
    // socketpair 建立的套接字對
    ngx_socket_t channel[2];
}ngx_processes_t;

1. ngx_write_channel:使用頻道發送 ngx_channel_t 消息

ngx_int_t 
ngx_write_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size,
    ngx_log_t *log)
{
    ssize_t         n;
    ngx_err_t       err;
    struct iovec    iov[1];
    struct msghdr   msg;
    
#if (NGX_HAVE_MSGHDR_MSG_CONTROL)

    union {
        struct cmsghdr  cm;
        char            space[CMSG_SPACE(sizeof(int))];
    }cmsg;
    
    if (ch->fd == -1) {
        msg.msg_control = NULL;
        msg.msg_controllen = 0;
        
    } else {
        // 輔助數據
        msg.msg_control = (caddr_t)&cmsg;
        msg.msg_controllen = sizeof(cmsg);
        
        ngx_memzero(&cmsg, sizeof(cmsg));
        
        cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
        cmsg.cm.cmsg_level = SOL_SOCKET;
        cmsg.cm.cmsg_type = SCM_RIGHTS;
        
        /*
         * We have to use ngx_memcpy() instead of simple
         *   *(int *) CMSG_DATA(&cmsg.cm) = ch->fd;
         * because some gcc 4.4 with -O2/3/s optimization issues the warning:
         *   dereferencing type-punned pointer will break strict-aliasing rules
         *
         * Fortunately, gcc with -O1 compiles this ngx_memcpy()
         * in the same simple assignment as in the code above
         */

        ngx_memcpy(CMSG_DATA(&cmsg.cm), &ch->fd, sizeof(int));
    }
    
    msg.msg_flags = 0;
    
#else

    if (ch->fd == -1) {
        msg.msg_accrights = NULL;
        msg.msg_accrightslen = 0;
        
    } else {
        msg.msg_accrights = (caddr_t) &ch->fd;
        msg.msg_accrightslen = sizeof(int);
    }
    
#endif

    // 指向要發送的 ch 起始地址
    iov[0].iov_base = (char *) ch;
    iov[0].iov_len = size;
    
    // msg_name 和 msg_namelen 僅用於未鏈接套接字(如UDP)
    msg.msg_name = NULL;
    msg.msg_namelen = 0;
    msg.msg_iov = iov;
    msg.msg_iovlen = 1;
    
    // 將該 ngx_channel_t 消息發出去
    n = sendmsg(s, &msg, 0);
    
    if (n == -1) {
        err = ngx_errno;
        if (err == NGX_EAGAIN) {
            return NGX_AGAIN;
        }
        
        return NGX_ERROR;
    }
    
    return NGX_OK;
}

2. ngx_read_channel: 讀取消息

ngx_int_t
ngx_read_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size, ngx_log_t *log)
{
    ssize_t             n;
    ngx_err_t           err;
    struct iovec        iov[1];
    struct msghdr       msg;

#if (NGX_HAVE_MSGHDR_MSG_CONTROL)
    union {
        struct cmsghdr  cm;
        char            space[CMSG_SPACE(sizeof(int))];
    } cmsg;
#else
    int                 fd;
#endif
    
    iov[0].iov_base = (char *)ch;
    iov[0].iov_len = size;
    
    msg.msg_name = NULL;
    msg.msg_namelen = 0;
    msg.msg_iov = iov;
    msg.msg_iovlen = 1;
    
#if (NGX_HAVE_MSGHDR_MSG_CONTROL)
    msg.msg_control = (caddr_t) &cmsg;
    msg.msg_controllen = sizeof(cmsg);
#else
    msg.msg_accrights = (caddr_t) &fd;
    msg.msg_accrightslen = sizeof(int);
#endif

    // 接收命令
    n = recvmsg(s, &msg, 0);
    
    if (n == -1) {
        err = ngx_errno;
        if (err == NGX_EAGAIN) {
            return NGX_AGAIN;
        }

        return NGX_ERROR;
    }
    
    if (n == 0) {
        return NGX_ERROR;
    }
    
    // 接收的數據不足
    if ((size_t) n < sizeof(ngx_channel_t)) {
        return NGX_ERROR;
    }
    
#if (NGX_HAVE_MSGHDR_MSG_CONTROL)
    
    // 若接收到的命令爲"打開頻道,使用頻道這種方式通訊前必須發送的命令"
    if (ch->command == NGX_CMD_OPEN_CHANNEL) {
        
        if (cmsg.cm.cmsg_len < (socklen_t) CMSG_LEN(sizeof(int))) {
            return NGX_ERROR;
        }
        
        if (cmsg.cm.cmsg_level != SOL_SOCKET || cmsg.cm.cmsg_type != SCM_RIGHTS)
        {
            return NGX_ERROR;
        }
        
        /* ch->fd = *(int *) CMSG_DATA(&cmsg.cm); */

        ngx_memcpy(&ch->fd, CMSG_DATA(&cmsg.cm), sizeof(int));
    }
    
    // 若接收到的消息是被截斷的
    if (msg.msg_flags & (MSG_TRUNC|MSG_CTRUNC)) {
        ngx_log_error(NGX_LOG_ALERT, log, 0,
                      "recvmsg() truncated data");
    }
    
#else
    
    if (ch->command == NGX_CMD_OPEN_CHANNEL) {
        if (msg.msg_accrightslen != sizeof(int)) {
            return NGX_ERROR;
        }
        
        ch->fd = fd;
    }
#endif

    return n;
}

在 Nginx 中,目前僅存在 master 進程向 worker 進程發送消息的場景,這時對於 socketpair 方法建立的 channel[2] 套接字來講,master 進程會使用 channel[0] 套接字來發送消息,而 worker 進程會使用 channel[1] 套接字來接收消息。

3. ngx_add_channel_event: 把接收頻道消息的套接字添加到 epoll 中

worker 進程調度 ngx_read_channel 方法接收頻道消息是經過該 ngx_add_channel_event 函數將接收頻道消息的套接字(對於 worker 即爲channel[1])添加到 epoll 中,當接收到父進程消息時子進程會經過 epoll 的事件回調相應的 handler 方法來處理這個頻道消息,以下:

ngx_int_t 
ngx_add_channel_event(ngx_cycle_t *cycle, ngx_fd_t fd, ngx_int_t event, 
    ngx_event_handler_pt handler)
{
    ngx_event_t         *ev, *rev, *wev;
    ngx_connection_t    *c;
    
    // 獲取一個空閒鏈接
    c = ngx_get_connection(fd, cycle->log);
    
    if (c == NULL) {
        return NGX_ERROR;
    }
    
    c->pool = cycle->pool;
    
    rev = c->read;
    wev = c->write;
    
    rev->log = cycle->log;
    wev->log = cycle->log;
    
    rev->channel = 1;
    wev->channel = 1;
    
    ev = (event == NGX_READ_EVENT) ? rev : wev;
    
    // 初始化監聽該 ev 事件時調用的回調函數
    ev->handler = handler;
    
    // 將該接收頻道消息的套接字添加到 epoll 中
    if (ngx_add_conn && (ngx_event_flags && NGX_USE_EPOLL_EVENT) == 0) {
        // 這裏是同時監聽該套接字的讀、寫事件
        if (ngx_add_conn(c) == NGX_ERROR) {
            ngx_free_connection(c);
            return NGX_ERROR;
        }
         
    } else {
        // 這裏是僅監聽 ev 事件
        if (ngx_add_event(ev, event, 0) == NGX_ERROR) {
            ngx_free_connection(c);
            return NGX_ERROR;
        }
    }
    
    return NGX_OK;
}

4. ngx_close_channel: 關閉這個頻道通訊方式

void
ngx_close_channel(ngx_fd_t *fd, ngx_log_t *log)
{
    if (close(fd[0]) == -1) {
        
    }

    if (close(fd[1]) == -1) {
        
    }
}
相關文章
相關標籤/搜索