epoll實現Reactor模式

轉自:http://blog.csdn.net/analogous_love/article/details/53319815react

 

最近一直在看遊雙的《高性能Linux服務器編程》一書,下載連接: http://download.csdn.net/detail/analogous_love/9673008linux

書上是這麼介紹Reactor模式的:ios

 

 

按照這個思路,我寫個簡單的練習:c++

 

[cpp] view plain copy編程

  在CODE上查看代碼片派生到個人代碼片

  1. /**  
  2.  *@desc:   用reactor模式練習服務器程序,main.cpp 
  3.  *@author: zhangyl 
  4.  *@date:   2016.11.23 
  5.  */  
  6.   
  7. #include <iostream>  
  8. #include <string.h>  
  9. #include <sys/types.h>  
  10. #include <sys/socket.h>  
  11. #include <netinet/in.h>  
  12. #include <arpa/inet.h>  //for htonl() and htons()  
  13. #include <unistd.h>  
  14. #include <fcntl.h>  
  15. #include <sys/epoll.h>  
  16. #include <signal.h>     //for signal()  
  17. #include <pthread.h>  
  18. #include <semaphore.h>  
  19. #include <list>  
  20. #include <errno.h>  
  21. #include <time.h>  
  22. #include <sstream>  
  23. #include <iomanip> //for std::setw()/setfill()  
  24. #include <stdlib.h>  
  25.   
  26.   
  27. #define WORKER_THREAD_NUM   5  
  28.   
  29. #define min(a, b) ((a <= b) ? (a) : (b))   
  30.   
  31. int g_epollfd = 0;  
  32. bool g_bStop = false;  
  33. int g_listenfd = 0;  
  34. pthread_t g_acceptthreadid = 0;  
  35. pthread_t g_threadid[WORKER_THREAD_NUM] = { 0 };  
  36. pthread_cond_t g_acceptcond;  
  37. pthread_mutex_t g_acceptmutex;  
  38.   
  39. pthread_cond_t g_cond /*= PTHREAD_COND_INITIALIZER*/;  
  40. pthread_mutex_t g_mutex /*= PTHREAD_MUTEX_INITIALIZER*/;  
  41.   
  42. pthread_mutex_t g_clientmutex;  
  43.   
  44. std::list<int> g_listClients;  
  45.   
  46. void prog_exit(int signo)  
  47. {  
  48.     ::signal(SIGINT, SIG_IGN);  
  49.     ::signal(SIGKILL, SIG_IGN);  
  50.     ::signal(SIGTERM, SIG_IGN);  
  51.   
  52.     std::cout << "program recv signal " << signo << " to exit." << std::endl;  
  53.   
  54.     g_bStop = true;  
  55.   
  56.     ::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, g_listenfd, NULL);  
  57.   
  58.     //TODO: 是否須要先調用shutdown()一下?  
  59.     ::shutdown(g_listenfd, SHUT_RDWR);  
  60.     ::close(g_listenfd);  
  61.     ::close(g_epollfd);  
  62.   
  63.     ::pthread_cond_destroy(&g_acceptcond);  
  64.     ::pthread_mutex_destroy(&g_acceptmutex);  
  65.       
  66.     ::pthread_cond_destroy(&g_cond);  
  67.     ::pthread_mutex_destroy(&g_mutex);  
  68.   
  69.     ::pthread_mutex_destroy(&g_clientmutex);  
  70. }  
  71.   
  72. bool create_server_listener(const char* ip, short port)  
  73. {  
  74.     g_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);  
  75.     if (g_listenfd == -1)  
  76.         return false;  
  77.   
  78.     int on = 1;  
  79.     ::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));  
  80.     ::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEPORT, (char *)&on, sizeof(on));  
  81.   
  82.     struct sockaddr_in servaddr;  
  83.     memset(&servaddr, 0, sizeof(servaddr));   
  84.     servaddr.sin_family = AF_INET;  
  85.     servaddr.sin_addr.s_addr = inet_addr(ip);  
  86.     servaddr.sin_port = htons(port);  
  87.     if (::bind(g_listenfd, (sockaddr *)&servaddr, sizeof(servaddr)) == -1)  
  88.         return false;  
  89.   
  90.     if (::listen(g_listenfd, 50) == -1)  
  91.         return false;  
  92.   
  93.     g_epollfd = ::epoll_create(1);  
  94.     if (g_epollfd == -1)  
  95.         return false;  
  96.   
  97.     struct epoll_event e;  
  98.     memset(&e, 0, sizeof(e));  
  99.     e.events = EPOLLIN | EPOLLRDHUP;  
  100.     e.data.fd = g_listenfd;  
  101.     if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, g_listenfd, &e) == -1)  
  102.         return false;  
  103.   
  104.     return true;  
  105. }  
  106.   
  107. void release_client(int clientfd)  
  108. {  
  109.     if (::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)  
  110.         std::cout << "release client socket failed as call epoll_ctl failed" << std::endl;  
  111.   
  112.     ::close(clientfd);  
  113. }  
  114.   
  115. void* accept_thread_func(void* arg)  
  116. {     
  117.     while (!g_bStop)  
  118.     {  
  119.         ::pthread_mutex_lock(&g_acceptmutex);  
  120.         ::pthread_cond_wait(&g_acceptcond, &g_acceptmutex);  
  121.         //::pthread_mutex_lock(&g_acceptmutex);  
  122.   
  123.         //std::cout << "run loop in accept_thread_func" << std::endl;  
  124.   
  125.         struct sockaddr_in clientaddr;  
  126.         socklen_t addrlen;  
  127.         int newfd = ::accept(g_listenfd, (struct sockaddr *)&clientaddr, &addrlen);  
  128.         ::pthread_mutex_unlock(&g_acceptmutex);  
  129.         if (newfd == -1)  
  130.             continue;  
  131.   
  132.         std::cout << "new client connected: " << ::inet_ntoa(clientaddr.sin_addr) << ":" << ::ntohs(clientaddr.sin_port) << std::endl;  
  133.   
  134.         //將新socket設置爲non-blocking  
  135.         int oldflag = ::fcntl(newfd, F_GETFL, 0);  
  136.         int newflag = oldflag | O_NONBLOCK;  
  137.         if (::fcntl(newfd, F_SETFL, newflag) == -1)  
  138.         {  
  139.             std::cout << "fcntl error, oldflag =" << oldflag << ", newflag = " << newflag << std::endl;  
  140.             continue;  
  141.         }  
  142.   
  143.         struct epoll_event e;  
  144.         memset(&e, 0, sizeof(e));  
  145.         e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;  
  146.         e.data.fd = newfd;  
  147.         if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1)  
  148.         {  
  149.             std::cout << "epoll_ctl error, fd =" << newfd << std::endl;  
  150.         }  
  151.     }  
  152.   
  153.     return NULL;  
  154. }  
  155.   
  156.   
  157. void* worker_thread_func(void* arg)  
  158. {     
  159.     while (!g_bStop)  
  160.     {  
  161.         int clientfd;  
  162.         ::pthread_mutex_lock(&g_clientmutex);  
  163.         while (g_listClients.empty())  
  164.             ::pthread_cond_wait(&g_cond, &g_clientmutex);  
  165.         clientfd = g_listClients.front();  
  166.         g_listClients.pop_front();    
  167.         pthread_mutex_unlock(&g_clientmutex);  
  168.   
  169.         //gdb調試時不能實時刷新標準輸出,用這個函數刷新標準輸出,使信息在屏幕上實時顯示出來  
  170.         std::cout << std::endl;  
  171.   
  172.         std::string strclientmsg;  
  173.         char buff[256];  
  174.         bool bError = false;  
  175.         while (true)  
  176.         {  
  177.             memset(buff, 0, sizeof(buff));  
  178.             int nRecv = ::recv(clientfd, buff, 256, 0);  
  179.             if (nRecv == -1)  
  180.             {  
  181.                 if (errno == EWOULDBLOCK)  
  182.                     break;  
  183.                 else  
  184.                 {  
  185.                     std::cout << "recv error, client disconnected, fd = " << clientfd << std::endl;  
  186.                     release_client(clientfd);  
  187.                     bError = true;  
  188.                     break;  
  189.                 }  
  190.                       
  191.             }  
  192.             //對端關閉了socket,這端也關閉。  
  193.             else if (nRecv == 0)  
  194.             {  
  195.                 std::cout << "peer closed, client disconnected, fd = " << clientfd << std::endl;  
  196.                 release_client(clientfd);  
  197.                 bError = true;  
  198.                 break;  
  199.             }  
  200.   
  201.             strclientmsg += buff;  
  202.         }  
  203.   
  204.         //出錯了,就不要再繼續往下執行了  
  205.         if (bError)  
  206.             continue;  
  207.           
  208.         std::cout << "client msg: " << strclientmsg;  
  209.   
  210.         //將消息加上時間標籤後發回  
  211.         time_t now = time(NULL);  
  212.         struct tm* nowstr = localtime(&now);  
  213.         std::ostringstream ostimestr;  
  214.         ostimestr << "[" << nowstr->tm_year + 1900 << "-"   
  215.                   << std::setw(2) << std::setfill('0') << nowstr->tm_mon + 1 << "-"   
  216.                   << std::setw(2) << std::setfill('0') << nowstr->tm_mday << " "  
  217.                   << std::setw(2) << std::setfill('0') << nowstr->tm_hour << ":"   
  218.                   << std::setw(2) << std::setfill('0') << nowstr->tm_min << ":"   
  219.                   << std::setw(2) << std::setfill('0') << nowstr->tm_sec << "]server reply: ";  
  220.   
  221.         strclientmsg.insert(0, ostimestr.str());  
  222.           
  223.         while (true)  
  224.         {  
  225.             int nSent = ::send(clientfd, strclientmsg.c_str(), strclientmsg.length(), 0);  
  226.             if (nSent == -1)  
  227.             {  
  228.                 if (errno == EWOULDBLOCK)  
  229.                 {  
  230.                     ::sleep(10);  
  231.                     continue;  
  232.                 }  
  233.                 else  
  234.                 {  
  235.                     std::cout << "send error, fd = " << clientfd << std::endl;  
  236.                     release_client(clientfd);  
  237.                     break;  
  238.                 }  
  239.                      
  240.             }            
  241.   
  242.             std::cout << "send: " << strclientmsg;  
  243.             strclientmsg.erase(0, nSent);  
  244.   
  245.             if (strclientmsg.empty())  
  246.                 break;  
  247.         }  
  248.     }  
  249.   
  250.     return NULL;  
  251. }  
  252.   
  253. void daemon_run()  
  254. {  
  255.     int pid;  
  256.     signal(SIGCHLD, SIG_IGN);  
  257.     //1)在父進程中,fork返回新建立子進程的進程ID;  
  258.     //2)在子進程中,fork返回0;  
  259.     //3)若是出現錯誤,fork返回一個負值;  
  260.     pid = fork();  
  261.     if (pid < 0)  
  262.     {  
  263.         std:: cout << "fork error" << std::endl;  
  264.         exit(-1);  
  265.     }  
  266.     //父進程退出,子進程獨立運行  
  267.     else if (pid > 0) {  
  268.         exit(0);  
  269.     }  
  270.     //以前parent和child運行在同一個session裏,parent是會話(session)的領頭進程,  
  271.     //parent進程做爲會話的領頭進程,若是exit結束執行的話,那麼子進程會成爲孤兒進程,並被init收養。  
  272.     //執行setsid()以後,child將從新得到一個新的會話(session)id。  
  273.     //這時parent退出以後,將不會影響到child了。  
  274.     setsid();  
  275.     int fd;  
  276.     fd = open("/dev/null", O_RDWR, 0);  
  277.     if (fd != -1)  
  278.     {  
  279.         dup2(fd, STDIN_FILENO);  
  280.         dup2(fd, STDOUT_FILENO);  
  281.         dup2(fd, STDERR_FILENO);  
  282.     }  
  283.     if (fd > 2)  
  284.         close(fd);  
  285.    
  286. }  
  287.   
  288.   
  289. int main(int argc, char* argv[])  
  290. {    
  291.     short port = 0;  
  292.     int ch;  
  293.     bool bdaemon = false;  
  294.     while ((ch = getopt(argc, argv, "p:d")) != -1)  
  295.     {  
  296.         switch (ch)  
  297.         {  
  298.         case 'd':  
  299.             bdaemon = true;  
  300.             break;  
  301.         case 'p':  
  302.             port = atol(optarg);  
  303.             break;  
  304.         }  
  305.     }  
  306.   
  307.     if (bdaemon)  
  308.         daemon_run();  
  309.   
  310.   
  311.     if (port == 0)  
  312.         port = 12345;  
  313.        
  314.     if (!create_server_listener("0.0.0.0", port))  
  315.     {  
  316.         std::cout << "Unable to create listen server: ip=0.0.0.0, port=" << port << "." << std::endl;  
  317.         return -1;  
  318.     }  
  319.   
  320.       
  321.     //設置信號處理  
  322.     signal(SIGCHLD, SIG_DFL);  
  323.     signal(SIGPIPE, SIG_IGN);  
  324.     signal(SIGINT, prog_exit);  
  325.     signal(SIGKILL, prog_exit);  
  326.     signal(SIGTERM, prog_exit);  
  327.   
  328.     ::pthread_cond_init(&g_acceptcond, NULL);  
  329.     ::pthread_mutex_init(&g_acceptmutex, NULL);  
  330.   
  331.     ::pthread_cond_init(&g_cond, NULL);  
  332.     ::pthread_mutex_init(&g_mutex, NULL);  
  333.   
  334.     ::pthread_mutex_init(&g_clientmutex, NULL);  
  335.        
  336.     ::pthread_create(&g_acceptthreadid, NULL, accept_thread_func, NULL);  
  337.     //啓動工做線程  
  338.     for (int i = 0; i < WORKER_THREAD_NUM; ++i)  
  339.     {  
  340.         ::pthread_create(&g_threadid[i], NULL, worker_thread_func, NULL);  
  341.     }  
  342.   
  343.     while (!g_bStop)  
  344.     {         
  345.         struct epoll_event ev[1024];  
  346.         int n = ::epoll_wait(g_epollfd, ev, 1024, 10);  
  347.         if (n == 0)  
  348.             continue;  
  349.         else if (n < 0)  
  350.         {  
  351.             std::cout << "epoll_wait error" << std::endl;  
  352.             continue;  
  353.         }  
  354.   
  355.         int m = min(n, 1024);  
  356.         for (int i = 0; i < m; ++i)  
  357.         {  
  358.             //通知接收鏈接線程接收新鏈接  
  359.             if (ev[i].data.fd == g_listenfd)  
  360.                 pthread_cond_signal(&g_acceptcond);  
  361.             //通知普通工做線程接收數據  
  362.             else  
  363.             {                 
  364.                 pthread_mutex_lock(&g_clientmutex);                
  365.                 g_listClients.push_back(ev[i].data.fd);  
  366.                 pthread_mutex_unlock(&g_clientmutex);  
  367.                 pthread_cond_signal(&g_cond);  
  368.                 //std::cout << "signal" << std::endl;  
  369.             }  
  370.                   
  371.         }  
  372.   
  373.     }  
  374.       
  375.     return 0;  
  376. }  


 

 

程序的功能一個簡單的echo服務:客戶端鏈接上服務器以後,給服務器發送信息,服務器加上時間戳等信息後返回給客戶端。使用到的知識點有:windows

1. 條件變量服務器

2.epoll的邊緣觸發模式session

 

程序的大體框架是:框架

1. 主線程只負責監聽偵聽socket上是否有新鏈接,若是有新鏈接到來,交給一個叫accept的工做線程去接收新鏈接,並將新鏈接socket綁定到主線程使用epollfd上去。socket

2. 主線程若是偵聽到客戶端的socket上有可讀事件,則通知另外五個工做線程去接收處理客戶端發來的數據,並將數據加上時間戳後發回給客戶端。

3. 能夠經過傳遞-p port來設置程序的監聽端口號;能夠經過傳遞-d來使程序以daemon模式運行在後臺。這也是標準linux daemon模式的書寫方法。

 

程序難點和須要注意的地方是:

1. 條件變量爲了防止虛假喚醒,必定要在一個循環裏面調用pthread_cond_wait()函數,我在worker_thread_func()中使用了:

 

[cpp] view plain copy

  在CODE上查看代碼片派生到個人代碼片

  1. while (g_listClients.empty())  
  2.             ::pthread_cond_wait(&g_cond, &g_clientmutex);  


 

 

在accept_thread_func()函數裏面我沒有使用循環,這樣會有問題嗎?

2. 使用條件變量pthread_cond_wait()函數的時候必定要先得到與該條件變量相關的mutex,即像下面這樣的結構:

 

[cpp] view plain copy

  在CODE上查看代碼片派生到個人代碼片

  1. mutex_lock(...);  
  2.   
  3. while (condition is true)  
  4.     ::pthread_cond_wait(...);  
  5.   
  6. //這裏能夠有其餘代碼...  
  7. mutex_unlock(...);  
  8.   
  9. //這裏能夠有其餘代碼...  


由於pthread_cond_wait()若是阻塞的話,它解鎖相關mutex和阻塞當前線程這兩個動做加在一塊兒是原子的。

 

 

3. 做爲服務器端程序最好對偵聽socket調用setsocketopt()設置SO_REUSEADDR和SO_REUSEPORT兩個標誌,由於服務程序有時候會須要重啓(好比調試的時候就會不斷重啓),若是不設置這兩個標誌的話,綁定端口時就會調用失敗。由於一個端口使用後,即便再也不使用,由於四次揮手該端口處於TIME_WAIT狀態,有大約2min的MSL(Maximum Segment Lifetime,最大存活期)。這2min內,該端口是不能被重複使用的。你的服務器程序上次使用了這個端口號,接着重啓,由於這個緣故,你再次綁定這個端口就會失敗(bind函數調用失敗)。要不你就每次重啓時須要等待2min後再試(這在頻繁重啓程序調試是難以接收的),或者設置這種SO_REUSEADDR和SO_REUSEPORT當即回收端口使用。

其實,SO_REUSEADDR在windows上和Unix平臺上還有些細微的區別,我在libevent源碼中看到這樣的描述:

 

[cpp] view plain copy

  在CODE上查看代碼片派生到個人代碼片

  1. int evutil_make_listen_socket_reuseable(evutil_socket_t sock)  
  2. {  
  3. #ifndef WIN32  
  4.     int one = 1;  
  5.     /* REUSEADDR on Unix means, "don't hang on to this address after the 
  6.      * listener is closed."  On Windows, though, it means "don't keep other 
  7.      * processes from binding to this address while we're using it. */  
  8.     return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void*) &one,  
  9.         (ev_socklen_t)sizeof(one));  
  10. #else  
  11.     return 0;  
  12. #endif  
  13. }  

注意註釋部分,在Unix平臺上設置這個選項意味着,任意進程能夠複用該地址;而在windows,不要阻止其餘進程複用該地址。也就是在在Unix平臺上,若是不設置這個選項,任意進程在必定時間內,不能bind該地址;在windows平臺上,在必定時間內,其餘進程不能bind該地址,而本進程卻能夠再次bind該地址。
 

 

 

4. epoll_wait對新鏈接socket使用的是邊緣觸發模式EPOLLET(edge trigger),而不是默認的水平觸發模式(level trigger)。由於若是採起水平觸發模式的話,主線程檢測到某個客戶端socket數據可讀時,通知工做線程去收取該socket上的數據,這個時候主線程繼續循環,只要在工做線程沒有將該socket上數據所有收完,或者在工做線程收取數據的過程當中,客戶端有新數據到來,主線程會繼續發通知(經過pthread_cond_signal())函數,再次通知工做線程收取數據。這樣會可能致使多個工做線程同時調用recv函數收取該客戶端socket上的數據,這樣產生的結果將會致使數據錯亂。

      相反,採起邊緣觸發模式,只有等某個工做線程將那個客戶端socket上數據所有收取完畢,主線程的epoll_wait纔可能會再次觸發來通知工做線程繼續收取那個客戶端socket新來的數據。

 

5. 代碼中有這樣一行:

 

[cpp] view plain copy

  在CODE上查看代碼片派生到個人代碼片

  1. //gdb調試時不能實時刷新標準輸出,用這個函數刷新標準輸出,使信息在屏幕上實時顯示出來  
  2.         std::cout << std::endl;  


若是不加上這一行,正常運行服務器程序,程序中要打印到控制檯的信息都會打印出來,可是若是用gdb調試狀態下,程序的全部輸出就不顯示了。我不知道這是否是gdb的一個bug,因此這裏加上std::endl來輸出一個換行符並flush標準輸出,讓輸出顯示出來。(std::endl不只是輸出一個換行符並且是同時刷新輸出,至關於fflush()函數)。

 

 

程序我部署起來了,你可使用linux的nc命令或本身寫程序鏈接服務器來查看程序效果,固然也可使用telnet命令,方法:

linux:

nc 120.55.94.78 12345

telnet 120.55.94.78 12345

而後就能夠給服務器自由發送數據了,服務器會給你發送的信息加上時間戳返回給你。效果如圖:

 

另外我將這個代碼改寫了成純C++11版本,使用CMake編譯,爲了支持編譯必須加上這-std=c++11:

CMakeLists.txt代碼以下:

 

[cpp] view plain copy

  在CODE上查看代碼片派生到個人代碼片

  1. cmake_minimum_required(VERSION 2.8)  
  2.   
  3. PROJECT(myreactorserver)  
  4.   
  5. AUX_SOURCE_DIRECTORY(./ SRC_LIST)  
  6. SET(EXECUTABLE_OUTPUT_PATH ./)  
  7.   
  8. ADD_DEFINITIONS(-g -W -Wall -Wno-deprecated -DLINUX -D_REENTRANT -D_FILE_OFFSET_BITS=64 -DAC_HAS_INFO -DAC_HAS_WARNING -DAC_HAS_ERROR -DAC_HAS_CRITICAL -DTIXML_USE_STL -DHAVE_CXX_STDHEADERS ${CMAKE_CXX_FLAGS} -std=c++11)  
  9.   
  10. INCLUDE_DIRECTORIES(  
  11. ./  
  12. )  
  13. LINK_DIRECTORIES(  
  14. ./  
  15. )  
  16.   
  17. set(  
  18. main.cpp  
  19. myreator.cpp  
  20. )  
  21.   
  22. ADD_EXECUTABLE(myreactorserver ${SRC_LIST})  
  23.   
  24. TARGET_LINK_LIBRARIES(myreactorserver pthread)  


myreactor.h文件內容:

 

 

[cpp] view plain copy

  在CODE上查看代碼片派生到個人代碼片

  1. /** 
  2. *@desc: myreactor頭文件, myreactor.h 
  3. *@author: zhangyl 
  4. *@date: 2016.12.03 
  5. */  
  6. #ifndef __MYREACTOR_H__  
  7. #define __MYREACTOR_H__  
  8.   
  9. #include <list>  
  10. #include <memory>  
  11. #include <thread>  
  12. #include <mutex>  
  13. #include <condition_variable>  
  14.   
  15. #define WORKER_THREAD_NUM   5  
  16.   
  17. class CMyReactor  
  18. {  
  19. public:  
  20.     CMyReactor();  
  21.     ~CMyReactor();  
  22.   
  23.     bool init(const char* ip, short nport);  
  24.     bool uninit();  
  25.   
  26.     bool close_client(int clientfd);  
  27.   
  28.     static void* main_loop(void* p);  
  29.   
  30. private:  
  31.     //no copyable  
  32.     CMyReactor(const CMyReactor& rhs);  
  33.     CMyReactor& operator = (const CMyReactor& rhs);  
  34.   
  35.     bool create_server_listener(const char* ip, short port);  
  36.       
  37.     static void accept_thread_proc(CMyReactor* pReatcor);  
  38.     static void worker_thread_proc(CMyReactor* pReatcor);  
  39.   
  40. private:  
  41.     //C11語法能夠在這裏初始化  
  42.     int                          m_listenfd = 0;  
  43.     int                          m_epollfd  = 0;  
  44.     bool                         m_bStop    = false;  
  45.       
  46.     std::shared_ptr<std::thread> m_acceptthread;  
  47.     std::shared_ptr<std::thread> m_workerthreads[WORKER_THREAD_NUM];  
  48.       
  49.     std::condition_variable      m_acceptcond;  
  50.     std::mutex                   m_acceptmutex;  
  51.   
  52.     std::condition_variable      m_workercond ;  
  53.     std::mutex                   m_workermutex;  
  54.   
  55.     std::list<int>                 m_listClients;  
  56. };  
  57.   
  58. #endif //!__MYREACTOR_H__  


myreactor.cpp文件內容:

 

 

[cpp] view plain copy

  在CODE上查看代碼片派生到個人代碼片

  1. /**  
  2.  *@desc: myreactor實現文件, myreactor.cpp 
  3.  *@author: zhangyl 
  4.  *@date: 2016.12.03 
  5.  */  
  6. #include "myreactor.h"  
  7. #include <iostream>  
  8. #include <string.h>  
  9. #include <sys/types.h>  
  10. #include <sys/socket.h>  
  11. #include <netinet/in.h>  
  12. #include <arpa/inet.h>  //for htonl() and htons()  
  13. #include <fcntl.h>  
  14. #include <sys/epoll.h>  
  15. #include <list>  
  16. #include <errno.h>  
  17. #include <time.h>  
  18. #include <sstream>  
  19. #include <iomanip> //for std::setw()/setfill()  
  20. #include <unistd.h>  
  21.   
  22. #define min(a, b) ((a <= b) ? (a) : (b))  
  23.   
  24. CMyReactor::CMyReactor()  
  25. {  
  26.     //m_listenfd = 0;  
  27.     //m_epollfd = 0;  
  28.     //m_bStop = false;  
  29. }  
  30.   
  31. CMyReactor::~CMyReactor()  
  32. {  
  33.   
  34. }  
  35.   
  36. bool CMyReactor::init(const char* ip, short nport)  
  37. {  
  38.     if (!create_server_listener(ip, nport))  
  39.     {  
  40.         std::cout << "Unable to bind: " << ip << ":" << nport << "." << std::endl;  
  41.         return false;  
  42.     }  
  43.   
  44.   
  45.     std::cout << "main thread id = " << std::this_thread::get_id() << std::endl;  
  46.   
  47.     //啓動接收新鏈接的線程  
  48.     m_acceptthread.reset(new std::thread(CMyReactor::accept_thread_proc, this));  
  49.       
  50.     //啓動工做線程  
  51.     for (auto& t : m_workerthreads)  
  52.     {  
  53.         t.reset(new std::thread(CMyReactor::worker_thread_proc, this));  
  54.     }  
  55.   
  56.   
  57.     return true;  
  58. }  
  59.   
  60. bool CMyReactor::uninit()  
  61. {  
  62.     m_bStop = true;  
  63.     m_acceptcond.notify_one();  
  64.     m_workercond.notify_all();  
  65.   
  66.     m_acceptthread->join();  
  67.     for (auto& t : m_workerthreads)  
  68.     {  
  69.         t->join();  
  70.     }  
  71.   
  72.     ::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, m_listenfd, NULL);  
  73.   
  74.     //TODO: 是否須要先調用shutdown()一下?  
  75.     ::shutdown(m_listenfd, SHUT_RDWR);  
  76.     ::close(m_listenfd);  
  77.     ::close(m_epollfd);  
  78.   
  79.     return true;  
  80. }  
  81.   
  82. bool CMyReactor::close_client(int clientfd)  
  83. {  
  84.     if (::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)  
  85.     {  
  86.         std::cout << "close client socket failed as call epoll_ctl failed" << std::endl;  
  87.         //return false;  
  88.     }  
  89.           
  90.   
  91.     ::close(clientfd);  
  92.   
  93.     return true;  
  94. }  
  95.   
  96.   
  97. void* CMyReactor::main_loop(void* p)  
  98. {  
  99.     std::cout << "main thread id = " << std::this_thread::get_id() << std::endl;  
  100.       
  101.     CMyReactor* pReatcor = static_cast<CMyReactor*>(p);  
  102.       
  103.     while (!pReatcor->m_bStop)  
  104.     {  
  105.         struct epoll_event ev[1024];  
  106.         int n = ::epoll_wait(pReatcor->m_epollfd, ev, 1024, 10);  
  107.         if (n == 0)  
  108.             continue;  
  109.         else if (n < 0)  
  110.         {  
  111.             std::cout << "epoll_wait error" << std::endl;  
  112.             continue;  
  113.         }  
  114.   
  115.         int m = min(n, 1024);  
  116.         for (int i = 0; i < m; ++i)  
  117.         {  
  118.             //通知接收鏈接線程接收新鏈接  
  119.             if (ev[i].data.fd == pReatcor->m_listenfd)  
  120.                 pReatcor->m_acceptcond.notify_one();  
  121.             //通知普通工做線程接收數據  
  122.             else  
  123.             {  
  124.                 {  
  125.                     std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);  
  126.                     pReatcor->m_listClients.push_back(ev[i].data.fd);  
  127.                 }  
  128.                                   
  129.                 pReatcor->m_workercond.notify_one();  
  130.                 //std::cout << "signal" << std::endl;  
  131.             }// end if  
  132.   
  133.         }// end for-loop  
  134.     }// end while  
  135.   
  136.     std::cout << "main loop exit ..." << std::endl;  
  137.   
  138.     return NULL;  
  139. }  
  140.   
  141. void CMyReactor::accept_thread_proc(CMyReactor* pReatcor)  
  142. {  
  143.     std::cout << "accept thread, thread id = " << std::this_thread::get_id() << std::endl;  
  144.   
  145.     while (true)  
  146.     {  
  147.         int newfd;  
  148.         struct sockaddr_in clientaddr;  
  149.         socklen_t addrlen;  
  150.         {  
  151.             std::unique_lock<std::mutex> guard(pReatcor->m_acceptmutex);  
  152.             pReatcor->m_acceptcond.wait(guard);  
  153.             if (pReatcor->m_bStop)  
  154.                 break;  
  155.   
  156.             //std::cout << "run loop in accept_thread_proc" << std::endl;  
  157.               
  158.             newfd = ::accept(pReatcor->m_listenfd, (struct sockaddr *)&clientaddr, &addrlen);  
  159.         }  
  160.         if (newfd == -1)  
  161.             continue;  
  162.   
  163.         std::cout << "new client connected: " << ::inet_ntoa(clientaddr.sin_addr) << ":" << ::ntohs(clientaddr.sin_port) << std::endl;  
  164.   
  165.         //將新socket設置爲non-blocking  
  166.         int oldflag = ::fcntl(newfd, F_GETFL, 0);  
  167.         int newflag = oldflag | O_NONBLOCK;  
  168.         if (::fcntl(newfd, F_SETFL, newflag) == -1)  
  169.         {  
  170.             std::cout << "fcntl error, oldflag =" << oldflag << ", newflag = " << newflag << std::endl;  
  171.             continue;  
  172.         }  
  173.   
  174.         struct epoll_event e;  
  175.         memset(&e, 0, sizeof(e));  
  176.         e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;  
  177.         e.data.fd = newfd;  
  178.         if (::epoll_ctl(pReatcor->m_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1)  
  179.         {  
  180.             std::cout << "epoll_ctl error, fd =" << newfd << std::endl;  
  181.         }  
  182.     }  
  183.   
  184.     std::cout << "accept thread exit ..." << std::endl;  
  185. }  
  186.   
  187. void CMyReactor::worker_thread_proc(CMyReactor* pReatcor)  
  188. {  
  189.     std::cout << "new worker thread, thread id = " << std::this_thread::get_id() << std::endl;  
  190.   
  191.     while (true)  
  192.     {  
  193.         int clientfd;  
  194.         {  
  195.             std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);  
  196.             while (pReatcor->m_listClients.empty())  
  197.             {  
  198.                 if (pReatcor->m_bStop)  
  199.                 {  
  200.                     std::cout << "worker thread exit ..." << std::endl;  
  201.                     return;  
  202.                 }  
  203.                       
  204.                 pReatcor->m_workercond.wait(guard);  
  205.             }  
  206.                   
  207.             clientfd = pReatcor->m_listClients.front();  
  208.             pReatcor->m_listClients.pop_front();  
  209.         }  
  210.   
  211.         //gdb調試時不能實時刷新標準輸出,用這個函數刷新標準輸出,使信息在屏幕上實時顯示出來  
  212.         std::cout << std::endl;  
  213.   
  214.         std::string strclientmsg;  
  215.         char buff[256];  
  216.         bool bError = false;  
  217.         while (true)  
  218.         {  
  219.             memset(buff, 0, sizeof(buff));  
  220.             int nRecv = ::recv(clientfd, buff, 256, 0);  
  221.             if (nRecv == -1)  
  222.             {  
  223.                 if (errno == EWOULDBLOCK)  
  224.                     break;  
  225.                 else  
  226.                 {  
  227.                     std::cout << "recv error, client disconnected, fd = " << clientfd << std::endl;  
  228.                     pReatcor->close_client(clientfd);  
  229.                     bError = true;  
  230.                     break;  
  231.                 }  
  232.   
  233.             }  
  234.             //對端關閉了socket,這端也關閉。  
  235.             else if (nRecv == 0)  
  236.             {  
  237.                 std::cout << "peer closed, client disconnected, fd = " << clientfd << std::endl;  
  238.                 pReatcor->close_client(clientfd);  
  239.                 bError = true;  
  240.                 break;  
  241.             }  
  242.   
  243.             strclientmsg += buff;  
  244.         }  
  245.   
  246.         //出錯了,就不要再繼續往下執行了  
  247.         if (bError)  
  248.             continue;  
  249.   
  250.         std::cout << "client msg: " << strclientmsg;  
  251.   
  252.         //將消息加上時間標籤後發回  
  253.         time_t now = time(NULL);  
  254.         struct tm* nowstr = localtime(&now);  
  255.         std::ostringstream ostimestr;  
  256.         ostimestr << "[" << nowstr->tm_year + 1900 << "-"  
  257.             << std::setw(2) << std::setfill('0') << nowstr->tm_mon + 1 << "-"  
  258.             << std::setw(2) << std::setfill('0') << nowstr->tm_mday << " "  
  259.             << std::setw(2) << std::setfill('0') << nowstr->tm_hour << ":"  
  260.             << std::setw(2) << std::setfill('0') << nowstr->tm_min << ":"  
  261.             << std::setw(2) << std::setfill('0') << nowstr->tm_sec << "]server reply: ";  
  262.   
  263.         strclientmsg.insert(0, ostimestr.str());  
  264.   
  265.         while (true)  
  266.         {  
  267.             int nSent = ::send(clientfd, strclientmsg.c_str(), strclientmsg.length(), 0);  
  268.             if (nSent == -1)  
  269.             {  
  270.                 if (errno == EWOULDBLOCK)  
  271.                 {  
  272.                     std::this_thread::sleep_for(std::chrono::milliseconds(10));  
  273.                     continue;  
  274.                 }  
  275.                 else  
  276.                 {  
  277.                     std::cout << "send error, fd = " << clientfd << std::endl;  
  278.                     pReatcor->close_client(clientfd);  
  279.                     break;  
  280.                 }  
  281.   
  282.             }  
  283.   
  284.             std::cout << "send: " << strclientmsg;  
  285.             strclientmsg.erase(0, nSent);  
  286.   
  287.             if (strclientmsg.empty())  
  288.                 break;  
  289.         }  
  290.     }  
  291. }  
  292.   
  293. bool CMyReactor::create_server_listener(const char* ip, short port)  
  294. {  
  295.     m_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);  
  296.     if (m_listenfd == -1)  
  297.         return false;  
  298.   
  299.     int on = 1;  
  300.     ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));  
  301.     ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEPORT, (char *)&on, sizeof(on));  
  302.   
  303.     struct sockaddr_in servaddr;  
  304.     memset(&servaddr, 0, sizeof(servaddr));  
  305.     servaddr.sin_family = AF_INET;  
  306.     servaddr.sin_addr.s_addr = inet_addr(ip);  
  307.     servaddr.sin_port = htons(port);  
  308.     if (::bind(m_listenfd, (sockaddr *)&servaddr, sizeof(servaddr)) == -1)  
  309.         return false;  
  310.   
  311.     if (::listen(m_listenfd, 50) == -1)  
  312.         return false;  
  313.   
  314.     m_epollfd = ::epoll_create(1);  
  315.     if (m_epollfd == -1)  
  316.         return false;  
  317.   
  318.     struct epoll_event e;  
  319.     memset(&e, 0, sizeof(e));  
  320.     e.events = EPOLLIN | EPOLLRDHUP;  
  321.     e.data.fd = m_listenfd;  
  322.     if (::epoll_ctl(m_epollfd, EPOLL_CTL_ADD, m_listenfd, &e) == -1)  
  323.         return false;  
  324.   
  325.     return true;  
  326. }  


main.cpp文件內容:

 

 

[cpp] view plain copy

  在CODE上查看代碼片派生到個人代碼片

  1. /**  
  2.  *@desc:   用reactor模式練習服務器程序 
  3.  *@author: zhangyl 
  4.  *@date:   2016.12.03 
  5.  */  
  6.   
  7. #include <iostream>  
  8. #include <signal.h>     //for signal()  
  9. #include<unistd.h>  
  10. #include <stdlib.h>       //for exit()  
  11. #include <sys/types.h>  
  12. #include <sys/stat.h>  
  13. #include <fcntl.h>  
  14. #include "myreactor.h"  
  15.   
  16. CMyReactor g_reator;  
  17.   
  18. void prog_exit(int signo)  
  19. {  
  20.     std::cout << "program recv signal " << signo << " to exit." << std::endl;   
  21.   
  22.     g_reator.uninit();  
  23. }  
  24.   
  25. void daemon_run()  
  26. {  
  27.     int pid;  
  28.     signal(SIGCHLD, SIG_IGN);  
  29.     //1)在父進程中,fork返回新建立子進程的進程ID;  
  30.     //2)在子進程中,fork返回0;  
  31.     //3)若是出現錯誤,fork返回一個負值;  
  32.     pid = fork();  
  33.     if (pid < 0)  
  34.     {  
  35.         std:: cout << "fork error" << std::endl;  
  36.         exit(-1);  
  37.     }  
  38.     //父進程退出,子進程獨立運行  
  39.     else if (pid > 0) {  
  40.         exit(0);  
  41.     }  
  42.     //以前parent和child運行在同一個session裏,parent是會話(session)的領頭進程,  
  43.     //parent進程做爲會話的領頭進程,若是exit結束執行的話,那麼子進程會成爲孤兒進程,並被init收養。  
  44.     //執行setsid()以後,child將從新得到一個新的會話(session)id。  
  45.     //這時parent退出以後,將不會影響到child了。  
  46.     setsid();  
  47.     int fd;  
  48.     fd = open("/dev/null", O_RDWR, 0);  
  49.     if (fd != -1)  
  50.     {  
  51.         dup2(fd, STDIN_FILENO);  
  52.         dup2(fd, STDOUT_FILENO);  
  53.         dup2(fd, STDERR_FILENO);  
  54.     }  
  55.     if (fd > 2)  
  56.         close(fd);  
  57. }  
  58.   
  59.   
  60. int main(int argc, char* argv[])  
  61. {    
  62.     //設置信號處理  
  63.     signal(SIGCHLD, SIG_DFL);  
  64.     signal(SIGPIPE, SIG_IGN);  
  65.     signal(SIGINT, prog_exit);  
  66.     signal(SIGKILL, prog_exit);  
  67.     signal(SIGTERM, prog_exit);  
  68.       
  69.     short port = 0;  
  70.     int ch;  
  71.     bool bdaemon = false;  
  72.     while ((ch = getopt(argc, argv, "p:d")) != -1)  
  73.     {  
  74.         switch (ch)  
  75.         {  
  76.         case 'd':  
  77.             bdaemon = true;  
  78.             break;  
  79.         case 'p':  
  80.             port = atol(optarg);  
  81.             break;  
  82.         }  
  83.     }  
  84.   
  85.     if (bdaemon)  
  86.         daemon_run();  
  87.   
  88.   
  89.     if (port == 0)  
  90.         port = 12345;  
  91.   
  92.       
  93.     if (!g_reator.init("0.0.0.0", 12345))  
  94.         return -1;  
  95.       
  96.     g_reator.main_loop(&g_reator);  
  97.   
  98.     return 0;  
  99. }  


完整實例代碼下載地址:

 

普通版本:https://pan.baidu.com/s/1o82Mkno

C++11版本:https://pan.baidu.com/s/1dEJdrih

相關文章
相關標籤/搜索