多進程支持高併發

多進程結構是實現那些服務內容不相關的服務器的os層面的天然抽象。 html

1. 容錯:若系統在提供服務時,服務單元之間界限清晰沒有或不多交互(例如瀏覽兩個不相關的網頁),而服務單元有很大可能出錯,但願出錯時不影響其餘單元。 linux

2. 高性能:服務器若支持多CPU或超線程,多線程沒法徹底利用機器性能,多進程則可讓服務器滿載。 ios

最近使用多進程開發流媒體服務器,性能很不錯,預計能支持1000個客戶端併發播放1000kbps,能跑到千兆網的極限。參考這篇文章:http://blog.csdn.net/winlinvip/article/details/7840285 。 nginx

支持高併發的多進程服務器,應遵循如下設計要點: chrome

  1.  並不是每一個connection新建一個進程,這樣支持的併發並不高。
  2. 每一個進程處理多個connection,使用event-base來管理這些鏈接。
  3. master由信號驅動,worker由epoll驅動(固然信號會讓epoll_wait返回)。
  4. 多進程用來利用多CPU硬件,因此按照業務邊界來劃分進程,或者就按CPU個數配置。
  5. 如有特殊業務邏輯,譬如chrome的站點抽象,多進程有提升系統容錯性的優勢。
  6. 每一個connection的fd(socket)使用非阻塞方式(即異步方式,參考 http://blog.csdn.net/winlinvip/article/details/7843000 )。
  7. 每一個進程是單線程的:event-based, non-blocking, async-io 這些足以免多線程。
  8. 關鍵點就是儘可能使用異步。 

關於chrome的多進程結構的背景: shell

http://blog.chromium.org/2008/09/multi-process-architecture.html 編程

http://www.charlesreis.com/research/publications/eurosys-2009-talk.pdf?attredirects=0 瀏覽器

總結以下: bash

1. 應該對系統分析,先搞清楚系統的目標,根據目標劃分邊界,再設計系統結構。
例如瀏覽器模型中,多進程模型分爲:核心進程(存儲,網絡,UI)、插件進程、展示進程(DOM,JS引擎,HTML展現)。
這樣設計的緣由在於對於瀏覽器系統的分析————系統的邊界能夠定義在相關的頁面,即將可相互操做的頁面分爲Site組。
區分系統邊界的緣由在於瀏覽器的實際問題「網頁和腳本極易出錯致使崩潰「、」加載速度隨網頁增多變慢「。
因此,先應分析系統的主要設計目標(要解決的問題),而後分析邊界(可分解的子系統),最後才能設計系統的結構(進程的模型之類的)。
插件進程做爲單獨進程的緣由,也是由系統的目標決定的:插件在邊界上很明顯,獨立於HTML展示,並且也有崩潰的可能性。
核心進程一旦崩潰,整個系統也就崩潰了,因此核心進程的邊界是也避開那些容易崩潰的系統(譬如把插件獨立出去)。
2. 系統的目標和內涵:穩定性,性能。
穩定性:包括容錯性,內存管理,可問責性。
    容錯性:即某個子系統崩潰時是否能不影響其餘系統。譬如某個頁面崩潰是否其餘頁面還能不受影響。
    可問責性:即當系統出問題時,是否能快速定位是某個子系統,並採起措施解決問題。譬如,當瀏覽器佔用過多資源時(內存泄漏啦之類),是否能快速找到出問題的頁面,關閉它解決問題。
    內存管理:是否能彙報各個子系統的內存使用狀況,內存泄漏時是否能重啓或關閉子系統解決問題。
性能:包括實時性,吞吐速度,多CPU加速。
    實時性:即系統對於用戶的響應是否實時。譬如,A頁面如今很慢,但用戶在B頁面中操做是否能實時響應。
    吞吐速度:當任務增多時系統的總體吞吐能力。譬如,開不少個頁面,系統是否還能性能良好。
    多CPU加速:系統是否能利用多CPU的服務器加速。譬如,是否能利用24CPU的機器能力,仍是說24CPU能力和4CPU能力同樣?
3. 多進程架構會佔用更多內存。
4. Chrome其實支持單進程和多進程結構。 服務器


nginx也是多進程,雖然單個進程也能夠支持高併發,但在實際服務器上,通常仍是會使用多進程——這也是爲何用多進程的緣由之一。參考nginx的系統結構一文:http://www.aosabook.org/en/nginx.html (個人朋友雷總分享給個人好東西,感謝他)。

參考《Unix環境高級編程》中對fork的總結,即多進程的使用場景:

fork有兩種用法:
(1) 一個父進程但願複製本身,使父、子進程同時執行不一樣的代碼段。父進程偵聽請求,當請求到達時,父進程fork子進程處理此請求,父進程繼續偵聽下一個請求。
(2) 一個進程要執行一個不一樣的程序。這對shell是常見的狀況。在這種狀況下,子進程在從fork返回後當即調用exec。

其實也提到了服務器的應用。可見網絡服務器使用多進程是有很長曆史了。


兩個多進程服務器原型:http://blog.csdn.net/winlinvip/article/details/7764526

linux多進程服務器真的很給力,贊一個!


寫了一個原型,讓服務器保持10200個穩定鏈接的原型。

開啓了15個worker進程(伺服進程):
ps -axf
 1540 ?        Ss     0:00 /usr/sbin/sshd
 1959 ?        Ss     0:00  \_ sshd: winlin [priv]
 1961 ?        S      0:00      \_ sshd: winlin@pts/0
 1962 pts/0    Ss     0:00          \_ -bash
30127 pts/0    S+     0:00              \_ ./multiple_process multi
30128 pts/0    S+     0:00                  \_ ./multiple_process multi
30129 pts/0    S+     0:00                  \_ ./multiple_process multi
30130 pts/0    S+     0:00                  \_ ./multiple_process multi
30131 pts/0    S+     0:00                  \_ ./multiple_process multi
30132 pts/0    S+     0:00                  \_ ./multiple_process multi
30133 pts/0    S+     0:00                  \_ ./multiple_process multi
30134 pts/0    S+     0:00                  \_ ./multiple_process multi
30135 pts/0    S+     0:00                  \_ ./multiple_process multi
30136 pts/0    S+     0:00                  \_ ./multiple_process multi
30137 pts/0    S+     0:00                  \_ ./multiple_process multi
30138 pts/0    S+     0:00                  \_ ./multiple_process multi
30139 pts/0    S+     0:00                  \_ ./multiple_process multi
30140 pts/0    S+     0:00                  \_ ./multiple_process multi
30141 pts/0    S+     0:00                  \_ ./multiple_process multi
30142 pts/0    S+     0:00                  \_ ./multiple_process multi


process #30128 serving 387 clients
process #30129 serving 732 clients
process #30130 serving 1020 clients
process #30131 serving 443 clients
process #30132 serving 437 clients
process #30133 serving 434 clients
process #30134 serving 883 clients
process #30135 serving 678 clients
process #30136 serving 958 clients
process #30137 serving 329 clients
process #30138 serving 758 clients
process #30139 serving 1020 clients
process #30140 serving 1020 clients
process #30141 serving 411 clients
process #30142 serving 690 clients
共有10200個client。


開啓了10個程序,每一個創建1020個鏈接:
[1]   Running                 ./test_client 127.0.0.1 1990 1020 > /dev/null 2>&1 &
[2]   Running                 ./test_client 127.0.0.1 1990 1020 > /dev/null 2>&1 &
[3]   Running                 ./test_client 127.0.0.1 1990 1020 > /dev/null 2>&1 &
[4]   Running                 ./test_client 127.0.0.1 1990 1020 > /dev/null 2>&1 &
[5]   Running                 ./test_client 127.0.0.1 1990 1020 > /dev/null 2>&1 &
[6]   Running                 ./test_client 127.0.0.1 1990 1020 > /dev/null 2>&1 &
[7]   Running                 ./test_client 127.0.0.1 1990 1020 > /dev/null 2>&1 &
[8]   Running                 ./test_client 127.0.0.1 1990 1020 > /dev/null 2>&1 &
[9]-  Running                 ./test_client 127.0.0.1 1990 1020 > /dev/null 2>&1 &
[10]+  Running                 ./test_client 127.0.0.1 1990 1020 > /dev/null 2>&1 &

該模型能保持穩定的10200個鏈接,因爲每一個進程只支持1024個fd(可設置),因此每一個進程限定最多鏈接爲1020個fd。

[cpp]  view plain copy
  1. //multiple_process.cpp  
  2.   
  3. #include <stdio.h>  
  4. #include <stdlib.h>  
  5. #include <string.h>  
  6. #include <iostream>  
  7. #include <vector>  
  8. using namespace std;  
  9.   
  10. // for socket  
  11. #include <sys/types.h>  
  12. #include <sys/socket.h>  
  13. #include <netinet/in.h>  
  14. // for select  
  15. #include <sys/time.h>  
  16. // for process control  
  17. #include <unistd.h>  
  18. #include <sys/wait.h>  
  19.   
  20. int prepare_server_socket(int server_port){  
  21.     int server_fd = socket(AF_INET, SOCK_STREAM, 0);  
  22.     if(server_fd == -1){  
  23.         cout << "init socket error" << endl;  
  24.         exit(1);  
  25.     }  
  26.     cout << "socket init success" << endl;  
  27.       
  28.     int reuse_socket = 1;  
  29.     if(setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1){  
  30.         cout << "reuse socket error" << endl;  
  31.         exit(1);  
  32.     }  
  33.     cout << "socket set to reuse success" << endl;  
  34.       
  35.     struct sockaddr_in addr;  
  36.     memset(&addr, 0, sizeof(sockaddr_in));  
  37.     addr.sin_family = AF_INET;  
  38.     addr.sin_port = htons(server_port);  
  39.     addr.sin_addr.s_addr = INADDR_ANY;  
  40.     if(bind(server_fd, (const struct sockaddr*)&addr, sizeof(addr)) == -1){  
  41.         cout << "bind socket error" << endl;  
  42.         exit(1);  
  43.     }  
  44.     cout << "socket bind success" << endl;  
  45.       
  46.     if(listen(server_fd, 10) == -1){  
  47.         cout << "listen socket error" << endl;  
  48.         exit(1);  
  49.     }  
  50.     cout << "socket listen success" << endl;  
  51.       
  52.     return server_fd;  
  53. }  
  54.   
  55. bool can_socket_read(int socket_fd, int timeout_ms){  
  56.     fd_set set;  
  57.     FD_ZERO(&set);  
  58.     FD_SET(socket_fd, &set);  
  59.       
  60.     timeval timeout;  
  61.     timeout.tv_sec = 0;  
  62.     timeout.tv_usec = timeout_ms * 1000;  
  63.       
  64.     if(select(socket_fd + 1, &set, NULL, NULL, &timeout) == -1){  
  65.         cout << "select socket error " << endl;  
  66.         exit(1);  
  67.     }  
  68.       
  69.     if(!FD_ISSET(socket_fd, &set)){  
  70.         return false;  
  71.     }  
  72.       
  73.     return true;  
  74. }  
  75.   
  76. vector<int> clients;  
  77. void process_cycle(int server_fd){  
  78.     cout << "process #" << getpid() << " accept and process client from socket " << server_fd << endl;  
  79.       
  80.     while(true){  
  81.         // accept  
  82.         while(clients.size() < 1020){  
  83.             if(!can_socket_read(server_fd, 500)){  
  84.                 break;  
  85.             }  
  86.               
  87.             //cout << "get a client, accept it" << endl;  
  88.             int client_fd = accept(server_fd, NULL, 0);  
  89.             if(client_fd == -1){  
  90.                 cout << "failed to accept client" << endl;  
  91.                 exit(1);  
  92.             }  
  93.             //cout << "accept client success: #" << client_fd << endl;  
  94.             clients.push_back(client_fd);  
  95.         }  
  96.         // process: PING.  
  97.         if(true){  
  98.             vector<int>::iterator ite;  
  99.             for(ite = clients.begin(); ite != clients.end(); ite++){  
  100.                 int client_fd = *ite;  
  101.                   
  102.                 if(!can_socket_read(client_fd, 1)){  
  103.                     continue;  
  104.                 }  
  105.               
  106.                 char buf[1024];  
  107.                 if(recv(client_fd, buf, sizeof(buf), 0) <= 0){  
  108.                     cout << "read client error, exit" << endl;  
  109.                     exit(1);  
  110.                 }  
  111.                 //cout << "process #" << getpid() << " (" << clients.size() << " clients) recv from client: " << buf << endl;  
  112.                 if(send(client_fd, buf, strlen(buf), 0) <= 0){  
  113.                     cout << "send data error, exit" << endl;  
  114.                     exit(1);  
  115.                 }  
  116.             }  
  117.         }  
  118.         cout << "process #" << getpid() << " serving " << clients.size() << " clients" << endl;  
  119.     }  
  120. }  
  121.   
  122. void start_worker_process(int server_fd, int process_num){  
  123.     for(int i = 0; i < process_num; i++){  
  124.         pid_t pid = fork();  
  125.         // child process  
  126.         if(pid == 0){  
  127.             process_cycle(server_fd);  
  128.             exit(0);  
  129.         }  
  130.         // parent process  
  131.         else if(pid == -1){  
  132.             cout << "fork child process error" << endl;  
  133.             exit(1);  
  134.         }  
  135.         else{  
  136.             cout << "fork child process success: #" << pid << endl;  
  137.         }  
  138.     }  
  139. }  
  140.   
  141. int main(int argc, char** argv){  
  142.     if(argc <= 3){  
  143.         cout << "usage: " << argv[0] << " <port> <single_process_mode> <num_of_processes> " << endl  
  144.             << "port: the port to listen at" << endl  
  145.             << "single_process_mode: whether use the single process mode" << endl  
  146.             << "num_of_processes: if multiple processes mode, how many process we will create" << endl  
  147.             << "for example: " << argv[0] << " 1990 false 15" << endl;  
  148.         exit(1);  
  149.     }  
  150.     int server_port = atoi(argv[1]);  
  151.     char* single_process_mode = argv[2];  
  152.     int num_of_processes = atoi(argv[3]);  
  153.     bool is_single_process = !strcmp(single_process_mode, "true");  
  154.     cout << "[config] " << (is_single_process? "single process" : "multiple processes") << " mode, "   
  155.         << "listening at " << server_port  
  156.         << ", create " << num_of_processes << " worker processes" << endl;  
  157.       
  158.     char* server_ip = argv[1];  
  159.     cout << "[remark] multiple processes prototype. " << endl  
  160.         << "[remark] master process listen to get a socket fd(file descriptor), worker process accept and serve client." << endl  
  161.         << "[remark] this prototype also show the architecture swithing between multiple and single process." << endl;  
  162.       
  163.     // master process: bind and listen.  
  164.     int server_fd = prepare_server_socket(server_port);  
  165.       
  166.     // worker process: accept and process.  
  167.     if(is_single_process){  
  168.         process_cycle(server_fd);  
  169.     }  
  170.     else{  
  171.         start_worker_process(server_fd, num_of_processes);  
  172.     }  
  173.       
  174.     for(int i = 0; i < num_of_processes; i++){  
  175.         int status = 0;  
  176.         pid_t pid = waitpid(-1, &status, 0);  
  177.         cout << "child process #" << pid << " terminated" << endl;  
  178.     }  
  179.       
  180.     cout << "server terminated" << endl;  
  181.     close(server_fd);  
  182.       
  183.     return 0;  
  184. }  

[cpp]  view plain copy
  1. // test_client.cpp  
  2.   
  3. #include <stdio.h>  
  4. #include <stdlib.h>  
  5. #include <string.h>  
  6. #include <iostream>  
  7. #include <vector>  
  8. using namespace std;  
  9.   
  10. #include <sys/types.h>  
  11. #include <sys/socket.h>  
  12. #include <netinet/ip.h>  
  13. #include <arpa/inet.h>  
  14.   
  15. int main(int argc, char** argv)  
  16. {  
  17.     if(argc <= 3){  
  18.         cout << "usage: " << argv[0] << " <server_ip> <server_port> <num_of_clients> " << endl  
  19.             << "server_ip: the ip of server to connect" << endl  
  20.             << "server_port: the port of server to connect" << endl  
  21.             << "num_of_clients: how many client we will start" << endl  
  22.             << "for example: " << argv[0] << " 127.0.0.1 1990 1020" << endl;  
  23.         exit(1);  
  24.     }  
  25.     char* server_ip = argv[1];  
  26.     int server_port = atoi(argv[2]);  
  27.     int num_of_clients = atoi(argv[3]);  
  28.       
  29.     vector<int> clients;  
  30.     for(int i = 0; i < num_of_clients; i++){  
  31.         int client = socket(AF_INET, SOCK_STREAM, 0);  
  32.         if(client == -1){  
  33.             cout << "create socket error" << endl;  
  34.             exit(1);  
  35.         }  
  36.         cout << "create socket success" << endl;  
  37.           
  38.         sockaddr_in addr;  
  39.         memset(&addr, 0, sizeof(addr));  
  40.         addr.sin_family = AF_INET;  
  41.         addr.sin_port = htons(server_port);  
  42.         addr.sin_addr.s_addr = inet_addr(server_ip);  
  43.           
  44.         if(connect(client, (const struct sockaddr*)&addr, sizeof(addr)) == -1){  
  45.             cout << "connect to server error" << endl;  
  46.             exit(1);  
  47.         }  
  48.         cout << "connect server success" << endl;  
  49.           
  50.         clients.push_back(client);  
  51.     }  
  52.       
  53.     const char msg[] = "hello, this is client";  
  54.     char buf[1024];  
  55.     memset(buf, 0, sizeof(buf));  
  56.     memcpy(buf, msg, sizeof(msg));  
  57.       
  58.     while(true){  
  59.         for(vector<int>::iterator ite = clients.begin(); ite != clients.end(); ite++){  
  60.             int client = *ite;  
  61.               
  62.             if(send(client, buf, sizeof(msg), 0) < 0){  
  63.                 cout << "send error" << endl;  
  64.                 exit(1);  
  65.             }  
  66.             if(recv(client, buf, sizeof(buf), 0) < 0){  
  67.                 cout << "recv error" << endl;  
  68.                 exit(1);  
  69.             }  
  70.             cout << "recv from server: " << buf << endl;  
  71.               
  72.             sleep(3);  
  73.         }  
  74.           
  75.         sleep(120);  
  76.     }  
  77.       
  78.     return 0;  
  79. }  
相關文章
相關標籤/搜索