《Linux高性能服務器編程》學習總結(十三)——多進程編程

  在多進程編程中,咱們用fork系統調用建立子進程,值得注意的是,fork函數複製當前進程並在內核進程表中建立一個新的表項,其堆、棧指針,標誌寄存器的值都和父進程相同,可是其ppid被設置成父進程pid,信號位圖被清除。而子進程代碼和父進程徹底相同,其數據也會複製自父進程,可是其複製過程是寫時複製,即父子任意進程對數據執行寫操做時纔會複製,首先是缺頁中斷,而後操做系統給子進程分配空間並複製數據。此外,建立子進程後父進程中打開的文件描述符在子進程中也是默認打開的,其文件描述符引用計數加一,父進程的用戶根目錄,當前工做目錄等變量的引用計數均會加一。編程

  殭屍進程是多進程編程中須要瞭解和避免的狀況,殭屍進程的產生是因爲子進程先於父進程退出,而此時父進程沒有正常接收子進程退出狀態,致使子進程脫離父進程存在於操做系統中,還佔據了原有的進程描述符和資源,形成了資源的浪費。避免殭屍進程的產生共有三種方法:第一種是處理SIGCHLD信號,當子進程退出時會向父進程發送SIGCHLD信號,咱們能夠指明信號處理函數來進行處理;第二種是拖管法,由父進程建立一箇中間進程,再由這個中間進程建立子進程後直接退出,這樣子進程變成了孤兒進程,會由init進程託管,由init進程負責回收其資源,可是這樣就破壞了父子進程之間的血緣關係;第三種就是阻塞法,調用wait函數等待子進程退出,缺點就是會使父進程進入阻塞狀態。對於wait函數,其進入阻塞狀態顯然是咱們所不容許的,因此waitpid函數就解決了這個問題。服務器

  在多進程編程中,一個最爲重要的須要解決的問題就是進程間通訊IPC,咱們經常使用的IPC方法有:管道、信號量、共享內存和消息隊列。其系統調用較爲簡單,咱們只來簡單說明一下這四種方式的區別和異同。首先管道是咱們在前文就提到過的,並且講過了再網絡編程中常常使用socketpair函數建立一個雙向管道,管道分爲無名管道和有名管道,無名管道只能用於有親緣關係的進程之間進行通訊,並且只能用低級文件編程庫中的讀寫函數,而有名管道就是建立一個管道文件,經過文件進行信息交流,因此沒有親緣關係的限制。信號量是用來實現多進程之間的互斥與同步的,其本質是一個整形的數,咱們用pv操做來對其進行操做,若是是二進制的信號量,則能夠用信號量保證對於臨界資源來說同一時刻只有一段代碼能對其進行訪問。共享內存是在內存空間建立或獲取一段空間來讓各個進程進行共享,經過這段空間進行信息交流,而這種方式就比較靈活,能夠自定其數據結構,完成各式各樣的功能。消息隊列就是一個能存放消息的隊列,各個進程將消息發送到消息隊列中,並指明要發送的對象,其餘程序能夠直接監聽這個隊列,收取發給本身的消息。幾種進程間通訊的方式都很廣泛,咱們來看一個用共享內存實現的聊天室服務器程序:網絡

  

 1 /*************************************************************************  2  > File Name: 13-4.cpp  3  > Author: Torrance_ZHANG  4  > Mail: 597156711@qq.com  5  > Created Time: Wed 14 Feb 2018 04:18:04 PM PST  6  ************************************************************************/
 7 
 8 #include"head.h"
 9 using namespace std;  10 
 11 #define USER_LIMIT 5
 12 #define BUFFER_SIZE 1024
 13 #define FD_LIMIT 65535
 14 #define MAX_EVENT_NUMBER 1024
 15 #define PROCESS_LIMIT 65536
 16 
 17 struct client_data {  18  sockaddr_in address;  19     int connfd;  20  pid_t pid;  21     int pipefd[2];  22 };  23 
 24 static const char* shm_name = "/my_shm";  25 int sig_pipefd[2];  26 int epollfd;  27 int listenfd;  28 int shmfd;  29 char* share_mem = 0;  30 client_data* users = 0;  31 int* sub_process = 0;  32 int user_count = 0;  33 bool stop_child = false;  34 
 35 int setnonblocking(int fd) {  36     int old_option = fcntl(fd, F_GETFL);  37     int new_option = old_option | O_NONBLOCK;  38  fcntl(fd, F_SETFL, new_option);  39     return old_option;  40 }  41 
 42 void addfd(int epollfd, int fd) {  43     epoll_event event;  44     event.data.fd = fd;  45     event.events = EPOLLIN | EPOLLET;  46     epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);  47  setnonblocking(fd);  48 }  49 
 50 void sig_handler(int sig) {  51     int save_errno = errno;  52     int msg = sig;  53     send(sig_pipefd[1], (char*)&msg, 1, 0);  54     errno = save_errno;  55 }  56 
 57 void addsig(int sig, void(*handler)(int), bool restart = true) {  58     struct sigaction sa;  59     memset(&sa, 0, sizeof(sa));  60     sa.sa_handler = handler;  61     if(restart) sa.sa_flags |= SA_RESTART;  62     sigfillset(&sa.sa_mask);  63     assert(sigaction(sig, &sa, NULL) != -1);  64 }  65 
 66 void del_resource() {  67     close(sig_pipefd[0]);  68     close(sig_pipefd[1]);  69  close(listenfd);  70  close(epollfd);  71  shm_unlink(shm_name);  72     delete [] users;  73     delete [] sub_process;  74 }  75 
 76 void child_term_handler(int sig) {  77     stop_child = true;  78 }  79 
 80 int run_child(int idx, client_data* users, char* share_mem) {  81  epoll_event events[MAX_EVENT_NUMBER];  82     int child_epollfd = epoll_create(5);  83     assert(child_epollfd != -1);  84     int connfd = users[idx].connfd;  85  addfd(child_epollfd, connfd);  86     int pipefd = users[idx].pipefd[1];  87  addfd(child_epollfd, pipefd);  88     int ret;  89     addsig(SIGTERM, child_term_handler, false);  90 
 91     while(!stop_child) {  92         int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1);  93         if((number < 0) && (errno != EINTR)) {  94             printf("epoll failure\n");  95             break;  96  }  97         for(int i = 0; i < number; i ++) {  98             int sockfd = events[i].data.fd;  99             if((sockfd == connfd) && (events[i].events & EPOLLIN)) { 100                 memset(share_mem + idx * BUFFER_SIZE, 0, BUFFER_SIZE); 101                 ret = recv(connfd, share_mem + idx * BUFFER_SIZE, BUFFER_SIZE - 1, 0); 102                 if(ret < 0) { 103                     if(errno != EAGAIN) { 104                         stop_child = true; 105  } 106  } 107                 else if(ret == 0) { 108                     stop_child = true; 109  } 110                 else { 111                     send(pipefd, (char*)&idx, sizeof(idx), 0); 112  } 113  } 114             else if((sockfd == pipefd) && (events[i].events & EPOLLIN)) { 115                 int client = 0; 116                 ret = recv(sockfd, (char*)&client, sizeof(client), 0); 117                 if(ret < 0) { 118                     if(errno != EAGAIN) { 119                         stop_child = true; 120  } 121  } 122                 else if(ret == 0) { 123                     stop_child = true; 124  } 125                 else { 126                     send(connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0); 127  } 128  } 129             else continue; 130  } 131  } 132  close(connfd); 133  close(pipefd); 134  close(child_epollfd); 135     return 0; 136 } 137 
138 int main(int argc, char** argv) { 139     if(argc <= 2) { 140         printf("usage: %s ip_address port_number\n", basename(argv[0])); 141         return 1; 142  } 143     const char* ip = argv[1]; 144     int port = atoi(argv[2]); 145 
146     int ret = 0; 147     struct sockaddr_in address; 148     bzero(&address, sizeof(address)); 149     address.sin_family = AF_INET; 150     inet_pton(AF_INET, ip, &address.sin_addr); 151     address.sin_port = htons(port); 152 
153     listenfd = socket(AF_INET, SOCK_STREAM, 0); 154     assert(listenfd >= 0); 155 
156     ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); 157     assert(ret != -1); 158 
159     ret = listen(listenfd, 5); 160     assert(ret != -1); 161 
162     user_count = 0; 163     users = new client_data[USER_LIMIT + 1]; 164     sub_process = new int[PROCESS_LIMIT]; 165     for(int i = 0; i < PROCESS_LIMIT; i ++) { 166         sub_process[i] = -1; 167  } 168  epoll_event events[MAX_EVENT_NUMBER]; 169     epollfd = epoll_create(5); 170     assert(epollfd != -1); 171  addfd(epollfd, listenfd); 172     
173     ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sig_pipefd); 174     assert(ret != -1); 175     setnonblocking(sig_pipefd[1]); 176     addfd(epollfd, sig_pipefd[0]); 177 
178  addsig(SIGCHLD, sig_handler); 179  addsig(SIGTERM, sig_handler); 180  addsig(SIGINT, sig_handler); 181  addsig(SIGPIPE, SIG_IGN); 182     bool stop_server = false; 183     bool terminate = false; 184 
185     shmfd = shm_open(shm_name, O_CREAT | O_RDWR, 0666); 186     assert(shmfd != -1); 187     ret = ftruncate(shmfd, USER_LIMIT * BUFFER_SIZE); 188     assert(ret != -1); 189     
190     share_mem = (char*)mmap(NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0); 191     assert(share_mem != MAP_FAILED); 192  close(shmfd); 193 
194     while(!stop_server) { 195         int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); 196         if((number < 0) && (errno != EINTR)) { 197             printf("epoll failure\n"); 198             break; 199  } 200         for(int i = 0; i < number; i ++) { 201             int sockfd = events[i].data.fd; 202             if(sockfd == listenfd) { 203                 struct sockaddr_in client_address; 204                 socklen_t client_addrlength = sizeof(client_address); 205                 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); 206                 if(connfd < 0) { 207                     printf("errno is: %d\n", errno); 208                     continue; 209  } 210                 if(user_count >= USER_LIMIT) { 211                     const char* info = "too many users\n"; 212                     printf("%s", info); 213                     send(connfd, info, strlen(info), 0); 214  close(connfd); 215                     continue; 216  } 217                 users[user_count].address = client_address; 218                 users[user_count].connfd = connfd; 219                 ret = socketpair(AF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd); 220                 assert(ret != -1); 221                 pid_t pid = fork(); 222                 if(pid < 0) { 223  close(connfd); 224                     continue; 225  } 226                 else if(pid == 0) { 227  close(epollfd); 228  close(listenfd); 229                     close(users[user_count].pipefd[0]); 230                     close(sig_pipefd[0]); 231                     close(sig_pipefd[1]); 232  run_child(user_count, users, share_mem); 233                     munmap((void*)share_mem, USER_LIMIT* BUFFER_SIZE); 234                     exit(0); 235  } 236                 else { 237  close(connfd); 238                     close(users[user_count].pipefd[1]); 239                     addfd(epollfd, users[user_count].pipefd[0]); 240                     users[user_count].pid = pid; 241                     sub_process[pid] = user_count; 242                     user_count ++; 243  } 244  } 245             else if((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN)) { 246                 int sig; 247                 char signals[1024]; 248                 ret = recv(sig_pipefd[0], signals, sizeof(signals), 0); 249                 if(ret == -1) continue; 250                 else if(ret == 0) continue; 251                 else { 252                     for(int i = 0; i < ret; i ++) { 253                         switch(signals[i]) { 254                             case SIGCHLD: { 255  pid_t pid; 256                                 int stat; 257                                 while((pid = waitpid(-1, &stat, WNOHANG)) > 0) { 258                                     int del_user = sub_process[pid]; 259                                     sub_process[pid] = -1; 260                                     if((del_user < 0) || (del_user > USER_LIMIT)) { 261                                         continue; 262  } 263                                     epoll_ctl(epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0); 264                                     close(users[del_user].pipefd[0]); 265                                     users[del_user] = users[-- user_count]; 266                                     sub_process[users[del_user].pid] = del_user; 267  } 268                                 if(terminate && user_count == 0) { 269                                     stop_server = true; 270  } 271                                 break; 272  } 273                             case SIGTERM: 274                             case SIGINT: { 275                                 printf("kill all the child now\n"); 276                                 if(user_count == 0) { 277                                     stop_server = true; 278                                     break; 279  } 280                                 for(int i = 0; i < user_count; i ++) { 281                                     int pid = users[i].pid; 282  kill(pid, SIGTERM); 283  } 284                                 terminate = true; 285                                 break; 286  } 287                             default : break; 288  } 289  } 290  } 291  } 292             else if(events[i].events & EPOLLIN) { 293                 int child = 0; 294                 ret = recv(sockfd, (char*)&child, sizeof(child), 0); 295                 printf("read data from child accross pipe\n"); 296                 if(ret == -1) continue; 297                 else if(ret == 0) continue; 298                 else { 299                     for(int j = 0; j < user_count; j ++) { 300                         if(users[j].pipefd[0] != sockfd) { 301                             printf("send data to child accross pipe\n"); 302                             send(users[j].pipefd[0], (char*)&child, sizeof(child), 0); 303  } 304  } 305  } 306  } 307  } 308  } 309  del_resource(); 310     return 0; 311 }

  服務器接收到客戶端數據時,經過管道告知專門爲其餘服務器服務的子進程向對應客戶端發送數據。數據結構

  咱們知道,當父進程建立子進程以後,父進程中打開的文件描述符仍然保持打開,因此文件描述符能夠很方便地從父進程傳遞到子進程。須要特別注意的是進程描述符的傳遞並非單純地傳送一個值,而是要在接收進程中建立一個新的文件描述符,而且該文件描述符和發送進程中被傳遞的文件描述符指向內核中的同一個文件表項。那麼問題就來了,咱們如何從子進程將文件描述符傳送給父進程呢?推廣來講,如何在不相關的兩個進程之間傳送文件描述符呢?這時咱們就須要利用UNIX域socket在進程間傳遞特殊的輔助數據,咱們來看一個例子:socket

 1 /*************************************************************************  2  > File Name: 13-5.cpp  3  > Author: Torrance_ZHANG  4  > Mail: 597156711@qq.com  5  > Created Time: Tue 27 Feb 2018 03:35:13 AM PST  6  ************************************************************************/
 7 
 8 #include"head.h"
 9 using namespace std; 10 
11 static const int CONTROL_LEN = CMSG_LEN(sizeof(int)); 12 
13 void send_fd(int fd, int fd_to_send) { 14     struct iovec iov[1]; 15     struct msghdr msg; 16     char buf[0]; 17 
18     iov[0].iov_base = buf; 19     iov[0].iov_len = 1; 20     msg.msg_name = NULL; 21     msg.msg_namelen = 0; 22     msg.msg_iov = iov; 23     msg.msg_iovlen = 1; 24 
25  cmsghdr cm; 26     cm.cmsg_len = CONTROL_LEN; 27     cm.cmsg_level = SOL_SOCKET; 28     cm.cmsg_type = SCM_RIGHTS; 29     *(int *)CMSG_DATA(&cm) = fd_to_send; 30     msg.msg_control = &cm; 31     msg.msg_controllen = CONTROL_LEN; 32 
33     sendmsg(fd, &msg, 0); 34 } 35 
36 int recv_fd(int fd) { 37     struct iovec iov[1]; 38     struct msghdr msg; 39     char buf[0]; 40 
41     iov[0].iov_base = buf; 42     iov[0].iov_len = 1; 43     msg.msg_name = NULL; 44     msg.msg_namelen = 0; 45     msg.msg_iov = iov; 46     msg.msg_iovlen = 1; 47     
48  cmsghdr cm; 49     msg.msg_control = &cm; 50     msg.msg_controllen = CONTROL_LEN; 51     recvmsg(fd, &msg, 0); 52 
53     int fd_to_read = *(int*)CMSG_DATA(&cm); 54     return fd_to_read; 55 } 56 
57 int main() { 58     int pipefd[2]; 59     int fd_to_pass = 0; 60     int ret = socketpair(AF_UNIX, SOCK_DGRAM, 0, pipefd); 61     assert(ret != -1); 62 
63     pid_t pid = fork(); 64     assert(pid >= 0); 65 
66     if(pid == 0) { 67         close(pipefd[0]); 68         fd_to_pass = open("test.txt", O_RDWR, 0666); 69         send_fd(pipefd[1], (fd_to_pass > 0) ? fd_to_pass : 0); 70  close(fd_to_pass); 71         exit(0); 72  } 73 
74     close(pipefd[1]); 75     fd_to_pass = recv_fd(pipefd[0]); 76     char buf[1024]; 77     memset(buf, 0, sizeof(buf)); 78     read(fd_to_pass, buf, 1024); 79     printf("I got fd %d and data %s\n", fd_to_pass, buf); 80  close(fd_to_pass); 81 }

相關文章
相關標籤/搜索