傳送文件描述符是高併發網絡服務編程的一種常見實現方式。Nebula 高性能通用網絡框架即採用了UNIX域套接字傳遞文件描述符設計和實現。本文詳細說明一下傳送文件描述符的應用。html
開發一個服務器程序,有較多的的程序設計範式可供選擇,不一樣範式有其自身的特色和實用範圍,明瞭不一樣範式的特性有助於咱們服務器程序的開發。常見的TCP服務器程序設計範式有如下幾種:linux
當系統負載較輕時,傳統的併發服務器程序模型就夠了。相對於傳統的每一個客戶一次fork設計,預先建立一個進程池或線程池能夠減小進程控制CPU時間,大約可減小10倍以上。nginx
某些實現容許多個子進程或線程阻塞在accept上,然而在另外一些實現中,咱們必須使用文件鎖、線程互斥鎖或其餘類型的鎖來確保每次只有一個子進程或線程在accept。c++
通常來說,全部子進程或線程都調用accept要比父進程或主線程調用accept後將描述字傳遞個子進程或線程來得快且簡單。git
Nebula框架是預先建立多進程,由Manager主進程accept後傳遞文件描述符到Worker子進程的服務模型(Nebula進程模型)。爲何不採用像nginx那樣多線程由子線程使用互斥鎖上鎖保護accept的服務模型?並且這種服務模型的實現比傳遞文件描述符來得還簡單一些。github
Nebula框架採用無鎖設計,進程以前徹底不共享數據,不存在須要互斥訪問的地方。沒錯,會存在數據多副本問題,但這些多副本每每只是些配置數據,佔用不了太大內存,與加鎖解鎖帶來的代碼複雜度及鎖開銷相比這點內存代價更划算也更簡單。編程
同一個Nebula服務的工做進程間不相互通訊,採用進程和線程並沒有太大差別,之因此採用進程而不是線程的最重要考慮是Nebula是出於穩定性和容錯性考慮。Nebula是通用框架,徹底業務無關,業務都是經過動態加載的方式或經過將Nebula連接進業務Server的方式來實現。Nebula框架沒法預知業務代碼的質量,但能夠保證在服務因業務代碼致使coredump或其餘狀況時,框架能夠實時監控到並馬上拉起服務進程,最大程度保障服務可用性。數組
決定Nebula採用傳遞文件描述符方式的最重要一點是:Nebula定位是高性能分佈式服務集羣解決方案的基礎通訊框架,其設計更多要爲構建分佈式服務集羣而考慮。集羣不一樣服務節點之間經過TCP通訊,而全部邏輯都是Worker進程負責,這意味着節點之間通訊須要指定到Worker進程,而若是採用子進程競爭accept的方式沒法保證指定的子進程得到資源,那麼第一個通訊數據包將會路由錯誤。採用傳遞文件描述符方式能夠很完美地解決這個問題,並且傳遞文件描述符也很是高效。服務器
文件描述符傳遞經過調用sendmsg()函數發送,調用recvmsg()函數接收:網絡
#include <sys/types.h> #include <sys/socket.h> ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags); ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
這兩個函數與sendto和recvfrom函數類似,只不過能夠傳輸更復雜的數據結構,不只能夠傳輸通常數據,還能夠傳輸額外的數據,即文件描述符。下面來看結構體msghdr及其相關結構體 :
struct msghdr { void *msg_name; /* optional address */ socklen_t msg_namelen; /* size of address */ struct iovec *msg_iov; /* scatter/gather array */ size_t msg_iovlen; /* # elements in msg_iov */ void *msg_control; /* ancillary data, see below */ size_t msg_controllen; /* ancillary data buffer len */ int msg_flags; /* flags on received message */ }; /* iovec結構體 */ struct iovec { void *iov_base; /* Starting address */ size_t iov_len; /* Number of bytes to transfer */ }; /* cmsghdr結構體 */ struct cmsghdr { socklen_t cmsg_len; /* data byte count, including header */ int cmsg_level; /* originating protocol */ int cmsg_type; /* protocol-specific type */ /* followed by unsigned char cmsg_data[]; */ };
msghdr結構成員說明:
爲了對齊,可能存在一些填充字節,跟不一樣系統的實現有關控制信息的數據部分,是直接存儲在cmsghdr結構體的cmsg_type以後的。但中間可能有一些因爲對齊產生的填充字節,因爲這些填充數據的存在,對於這些控制數據的訪問,必須使用Linux提供的一些專用宏來完成:
#include <sys/socket.h> /* 返回msgh所指向的msghdr類型的緩衝區中的第一個cmsghdr結構體的指針。*/ struct cmsghdr *CMSG_FIRSTHDR(struct msghdr *msgh); /* 返回傳入的cmsghdr類型的指針的下一個cmsghdr結構體的指針。 */ struct cmsghdr *CMSG_NXTHDR(struct msghdr *msgh, struct cmsghdr *cmsg); /* 根據傳入的length大小,返回一個包含了添加對齊做用的填充數據後的大小。 */ size_t CMSG_ALIGN(size_t length); /* 傳入的參數length指的是一個控制信息元素(即一個cmsghdr結構體)後面數據部分的字節數,返回的是這個控制信息的總的字節數,即包含了頭部(即cmsghdr各成員)、數據部分和填充數據的總和。*/ size_t CMSG_SPACE(size_t length); /* 根據傳入的cmsghdr指針參數,返回其後面數據部分的指針。*/ size_t CMSG_LEN(size_t length); /* 傳入的參數是一個控制信息中的數據部分的大小,返回的是這個根據這個數據部分大小,須要配置的cmsghdr結構體中cmsg_len成員的值。這個大小將爲對齊添加的填充數據也包含在內。*/ unsigned char *CMSG_DATA(struct cmsghdr *cmsg);
sendmsg提供了能夠傳遞控制信息的功能,要實現的傳遞描述符這一功能必需要用到這個控制信息。在msghdr變量的cmsghdr成員中,由控制頭cmsg_level和cmsg_type來設置傳遞文件描述符這一屬性,並將要傳遞的文件描述符做爲數據部分,保存在cmsghdr變量的後面。這樣就能夠實現傳遞文件描述符這一功能,這種狀況是不須要使用msg_iov來傳遞數據的。
具體地說,爲msghdr的成員msg_control分配一個cmsghdr的空間,將該cmsghdr結構的cmsg_level設置爲SOL_SOCKET,cmsg_type設置爲SCM_RIGHTS,並將要傳遞的文件描述符做爲數據部分,調用sendmsg便可。其中SCM表示socket-level control message,SCM_RIGHTS表示咱們要傳遞訪問權限。
跟發送部分同樣,爲控制信息配置好屬性,並在其後分配一個文件描述符的數據部分後,在成功調用recvmsg後,控制信息的數據部分就是在接收進程中的新的文件描述符了,接收進程可直接對該文件描述符進行操做。
文件描述符傳遞並非將文件描述符數字傳遞,而是文件描述符對應數據結構。在主進程accept的到的文件描述符7傳遞到子進程後文件描述符有多是7,更有多是7之外的其餘數值,但不管是什麼數值並不重要,重要的是傳遞以後的鏈接跟傳遞以前的鏈接是同一個鏈接。
一般在完成文件描述符傳遞後,接收進程接管文件描述符,發送進程則應調用close關閉已傳遞的文件描述符。發送進程關閉描述符並不形成關閉該文件或設備,由於該描述符對應的文件仍被視爲由接收者進程打開(即便接收進程還沒有接收到該描述符)。
文件描述符傳遞可經由基於STREAMS的管道,也可經由UNIX域套接字。兩種方式在《UNIX網絡編程》中均有描述,Nebula採用的UNIX域套接字傳遞文件描述符。
建立用於傳遞文件描述符的UNIX域套接字用到socketpair函數:
#include <sys/types.h> #include <sys/socket.h> int socketpair(int d, int type, int protocol, int sv[2]);
傳入的參數sv爲一個整型數組,有兩個元素。當調用成功後,這個數組的兩個元素即爲2個文件描述符。一對鏈接起來的Unix匿名域套接字就創建起來了,它們就像一個全雙工的管道,每一端都既可讀也可寫。
Nebula框架的文件描述符屬於SocketChannel的基本屬性,文件描述符傳遞方法是SocketChannel的靜態方法。
文件描述符傳遞方法聲明:
static int SendChannelFd(int iSocketFd, int iSendFd, int iCodecType, std::shared_ptr<NetLogger> pLogger); static int RecvChannelFd(int iSocketFd, int& iRecvFd, int& iCodecType, std::shared_ptr<NetLogger> pLogger);
文件描述符發送方法實現:
/** * @brief 發送文件描述符 * @param iSocketFd 由socketpair()建立的UNIX域套接字,用於傳遞文件描述符 * @param iSendFd 待發送的文件描述符 * @param iCodecType 通訊通道編解碼類型 * @param pLogger 日誌類指針 * @return errno 錯誤碼 */ int SocketChannel::SendChannelFd(int iSocketFd, int iSendFd, int iCodecType, std::shared_ptr<NetLogger> pLogger) { ssize_t n; struct iovec iov[1]; struct msghdr msg; tagChannelCtx stCh; int iError = 0; stCh.iFd = iSendFd; stCh.iCodecType = iCodecType; union { struct cmsghdr cm; char space[CMSG_SPACE(sizeof(int))]; } cmsg; if (stCh.iFd == -1) { msg.msg_control = NULL; msg.msg_controllen = 0; } else { msg.msg_control = (caddr_t) &cmsg; msg.msg_controllen = sizeof(cmsg); memset(&cmsg, 0, sizeof(cmsg)); cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); cmsg.cm.cmsg_level = SOL_SOCKET; cmsg.cm.cmsg_type = SCM_RIGHTS; *(int *) CMSG_DATA(&cmsg.cm) = stCh.iFd; } msg.msg_flags = 0; iov[0].iov_base = (char*)&stCh; iov[0].iov_len = sizeof(tagChannelCtx); msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; n = sendmsg(iSocketFd, &msg, 0); if (n == -1) { pLogger->WriteLog(neb::Logger::ERROR, __FILE__, __LINE__, __FUNCTION__, "sendmsg() failed, errno %d", errno); iError = (errno == 0) ? ERR_TRANSFER_FD : errno; return(iError); } return(ERR_OK); }
文件描述符接收方法實現:
/** * @brief 接收文件描述符 * @param iSocketFd 由socketpair()建立的UNIX域套接字,用於傳遞文件描述符 * @param iRecvFd 接收到的文件描述符 * @param iCodecType 接收到的通訊通道編解碼類型 * @param pLogger 日誌類指針 * @return errno 錯誤碼 */ int SocketChannel::RecvChannelFd(int iSocketFd, int& iRecvFd, int& iCodecType, std::shared_ptr<NetLogger> pLogger) { ssize_t n; struct iovec iov[1]; struct msghdr msg; tagChannelCtx stCh; int iError = 0; union { struct cmsghdr cm; char space[CMSG_SPACE(sizeof(int))]; } cmsg; iov[0].iov_base = (char*)&stCh; iov[0].iov_len = sizeof(tagChannelCtx); msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; msg.msg_control = (caddr_t) &cmsg; msg.msg_controllen = sizeof(cmsg); n = recvmsg(iSocketFd, &msg, 0); if (n == -1) { pLogger->WriteLog(neb::Logger::ERROR, __FILE__, __LINE__, __FUNCTION__, "recvmsg() failed, errno %d", errno); iError = (errno == 0) ? ERR_TRANSFER_FD : errno; return(iError); } if (n == 0) { pLogger->WriteLog(neb::Logger::ERROR, __FILE__, __LINE__, __FUNCTION__, "recvmsg() return zero, errno %d", errno); iError = (errno == 0) ? ERR_TRANSFER_FD : errno; return(ERR_CHANNEL_EOF); } if ((size_t) n < sizeof(tagChannelCtx)) { pLogger->WriteLog(neb::Logger::ERROR, __FILE__, __LINE__, __FUNCTION__, "rrecvmsg() returned not enough data: %z, errno %d", n, errno); iError = (errno == 0) ? ERR_TRANSFER_FD : errno; return(iError); } if (cmsg.cm.cmsg_len < (socklen_t) CMSG_LEN(sizeof(int))) { pLogger->WriteLog(neb::Logger::ERROR, __FILE__, __LINE__, __FUNCTION__, "recvmsg() returned too small ancillary data"); iError = (errno == 0) ? ERR_TRANSFER_FD : errno; return(iError); } if (cmsg.cm.cmsg_level != SOL_SOCKET || cmsg.cm.cmsg_type != SCM_RIGHTS) { pLogger->WriteLog(neb::Logger::ERROR, __FILE__, __LINE__, __FUNCTION__, "recvmsg() returned invalid ancillary data level %d or type %d", cmsg.cm.cmsg_level, cmsg.cm.cmsg_type); iError = (errno == 0) ? ERR_TRANSFER_FD : errno; return(iError); } stCh.iFd = *(int *) CMSG_DATA(&cmsg.cm); if (msg.msg_flags & (MSG_TRUNC|MSG_CTRUNC)) { pLogger->WriteLog(neb::Logger::ERROR, __FILE__, __LINE__, __FUNCTION__, "recvmsg() truncated data"); iError = (errno == 0) ? ERR_TRANSFER_FD : errno; return(iError); } iRecvFd = stCh.iFd; iCodecType = stCh.iCodecType; return(ERR_OK); }
Manager進程的void Manager::CreateWorker()方法建立用於傳遞文件描述符的UNIX域套接字:
int iControlFds[2]; int iDataFds[2]; if (socketpair(PF_UNIX, SOCK_STREAM, 0, iControlFds) < 0) { LOG4_ERROR("error %d: %s", errno, strerror_r(errno, m_szErrBuff, 1024)); } if (socketpair(PF_UNIX, SOCK_STREAM, 0, iDataFds) < 0) { LOG4_ERROR("error %d: %s", errno, strerror_r(errno, m_szErrBuff, 1024)); }
Manager進程發送文件描述符:
int iCodec = m_stManagerInfo.eCodec; // 將編解碼方式和文件描述符一同發送給Worker進程 int iErrno = SocketChannel::SendChannelFd(worker_pid_fd.second, iAcceptFd, iCodec, m_pLogger); if (iErrno == 0) { AddWorkerLoad(worker_pid_fd.first); } else { LOG4_ERROR("error %d: %s", iErrno, strerror_r(iErrno, m_szErrBuff, 1024)); } close(iAcceptFd); // 發送完畢,關閉文件描述符
Worker進程接收文件描述符:
int iAcceptFd = -1; int iCodec = 0; // 這裏的編解碼方式在RecvChannelFd方法中得到 int iErrno = SocketChannel::RecvChannelFd(m_stWorkerInfo.iManagerDataFd, iAcceptFd, iCodec, m_pLogger);
至此,Nebula框架的文件描述符傳遞分享完畢,下面再看看nginx中的文件描述符傳遞實現。
Nginx的文件描述符傳遞代碼在os/unix/ngx_channel.c文件中。
nginx中發送文件描述符代碼:
ngx_int_t ngx_write_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size, ngx_log_t *log) { ssize_t n; ngx_err_t err; struct iovec iov[1]; struct msghdr msg; #if (NGX_HAVE_MSGHDR_MSG_CONTROL) union { struct cmsghdr cm; char space[CMSG_SPACE(sizeof(int))]; } cmsg; if (ch->fd == -1) { msg.msg_control = NULL; msg.msg_controllen = 0; } else { msg.msg_control = (caddr_t) &cmsg; msg.msg_controllen = sizeof(cmsg); ngx_memzero(&cmsg, sizeof(cmsg)); cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); cmsg.cm.cmsg_level = SOL_SOCKET; cmsg.cm.cmsg_type = SCM_RIGHTS; /* * We have to use ngx_memcpy() instead of simple * *(int *) CMSG_DATA(&cmsg.cm) = ch->fd; * because some gcc 4.4 with -O2/3/s optimization issues the warning: * dereferencing type-punned pointer will break strict-aliasing rules * * Fortunately, gcc with -O1 compiles this ngx_memcpy() * in the same simple assignment as in the code above */ ngx_memcpy(CMSG_DATA(&cmsg.cm), &ch->fd, sizeof(int)); } msg.msg_flags = 0; #else if (ch->fd == -1) { msg.msg_accrights = NULL; msg.msg_accrightslen = 0; } else { msg.msg_accrights = (caddr_t) &ch->fd; msg.msg_accrightslen = sizeof(int); } #endif iov[0].iov_base = (char *) ch; iov[0].iov_len = size; msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; n = sendmsg(s, &msg, 0); if (n == -1) { err = ngx_errno; if (err == NGX_EAGAIN) { return NGX_AGAIN; } ngx_log_error(NGX_LOG_ALERT, log, err, "sendmsg() failed"); return NGX_ERROR; } return NGX_OK; }
nginx中接收文件描述符代碼:
ngx_int_t ngx_read_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size, ngx_log_t *log) { ssize_t n; ngx_err_t err; struct iovec iov[1]; struct msghdr msg; #if (NGX_HAVE_MSGHDR_MSG_CONTROL) union { struct cmsghdr cm; char space[CMSG_SPACE(sizeof(int))]; } cmsg; #else int fd; #endif iov[0].iov_base = (char *) ch; iov[0].iov_len = size; msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; #if (NGX_HAVE_MSGHDR_MSG_CONTROL) msg.msg_control = (caddr_t) &cmsg; msg.msg_controllen = sizeof(cmsg); #else msg.msg_accrights = (caddr_t) &fd; msg.msg_accrightslen = sizeof(int); #endif n = recvmsg(s, &msg, 0); if (n == -1) { err = ngx_errno; if (err == NGX_EAGAIN) { return NGX_AGAIN; } ngx_log_error(NGX_LOG_ALERT, log, err, "recvmsg() failed"); return NGX_ERROR; } if (n == 0) { ngx_log_debug0(NGX_LOG_DEBUG_CORE, log, 0, "recvmsg() returned zero"); return NGX_ERROR; } if ((size_t) n < sizeof(ngx_channel_t)) { ngx_log_error(NGX_LOG_ALERT, log, 0, "recvmsg() returned not enough data: %z", n); return NGX_ERROR; } #if (NGX_HAVE_MSGHDR_MSG_CONTROL) if (ch->command == NGX_CMD_OPEN_CHANNEL) { if (cmsg.cm.cmsg_len < (socklen_t) CMSG_LEN(sizeof(int))) { ngx_log_error(NGX_LOG_ALERT, log, 0, "recvmsg() returned too small ancillary data"); return NGX_ERROR; } if (cmsg.cm.cmsg_level != SOL_SOCKET || cmsg.cm.cmsg_type != SCM_RIGHTS) { ngx_log_error(NGX_LOG_ALERT, log, 0, "recvmsg() returned invalid ancillary data " "level %d or type %d", cmsg.cm.cmsg_level, cmsg.cm.cmsg_type); return NGX_ERROR; } /* ch->fd = *(int *) CMSG_DATA(&cmsg.cm); */ ngx_memcpy(&ch->fd, CMSG_DATA(&cmsg.cm), sizeof(int)); } if (msg.msg_flags & (MSG_TRUNC|MSG_CTRUNC)) { ngx_log_error(NGX_LOG_ALERT, log, 0, "recvmsg() truncated data"); } #else if (ch->command == NGX_CMD_OPEN_CHANNEL) { if (msg.msg_accrightslen != sizeof(int)) { ngx_log_error(NGX_LOG_ALERT, log, 0, "recvmsg() returned no ancillary data"); return NGX_ERROR; } ch->fd = fd; } #endif return n; }
Nebula框架系列技術分享 之 《經過UNIX域套接字傳遞文件描述符》。 若是以爲這篇文章對你有用,若是以爲Nebula框架還能夠,幫忙到Nebula的Github或碼雲給個star,謝謝。Nebula不只是一個框架,還提供了一系列基於這個框架的應用,目標是打造一個高性能分佈式服務集羣解決方案。
參考資料: