epoll
是Linux內核中的一種可擴展IO事件處理機制,最先在 Linux 2.5.44內核中引入,可被用於代替POSIX select 和 poll 系統調用,而且在具備大量應用程序請求時可以得到較好的性能( 此時被監視的文件描述符數目很是大,與舊的 select 和 poll 系統調用完成操做所需 O(n) 不一樣, epoll能在O(1)時間內完成操做,因此性能至關高),epoll 與 FreeBSD的kqueue相似,都向用戶空間提供了本身的文件描述符來進行操做。html
IO模型實現reactor 模式的工做流程:react
(1)主線程向epoll內核事件表內註冊socket上的可讀就緒事件。編程
(2)主線程調用epoll_wait()等待socket上有數據可讀數組
(3)當socket上有數據可讀,epoll_wait 通知主線程。主線程從socket可讀事件放入請求隊列。緩存
(4)睡眠在請求隊列上的某個可讀工做線程被喚醒,從socket上讀取數據,處理客戶的請求。安全
而後向 epoll內核事件表裏註冊寫的就緒事件數據結構
(5)主線程調用epoll_wait()等待數據可寫 。多線程
1 /*! 2 * Email: scictor@gmail.com 3 * Auth: scictor 4 * Date: 2019-9-9 5 * File: epoll_reactor_threadpoll.h 6 * Class: epoll_reactor_threadpoll (if applicable) 7 * Brief: 8 * Note: 9 */ 10 #ifndef THREAD_POOL_H 11 #define THREAD_POOL_H 12 13 #include <pthread.h> 14 #include <semaphore.h> 15 16 typedef void *(*func_call)(void*); 17 18 /*Individual job*/ 19 typedef struct thpool_job_t { 20 // void (*function)(void* arg); //函數指針 21 void *(*function)(void* arg); //函數指針 22 void *arg ; //函數的參數 23 /*struct tpool_job_t *next ; //指向下一個任務 24 struct tpool_job_t *prev ; //指向前一個任務*/ 25 struct thpool_job_t *next ; //指向下一個任務 26 struct thpool_job_t *prev ; //指向前一個任務 27 }thpool_job_t ; 28 29 /*job queue as doubly linked list雙向鏈表*/ 30 typedef struct thpool_jobqueue { 31 thpool_job_t *head ; //隊列的頭指針(頭部添加job任務) 32 thpool_job_t *tail; //對列的尾指針(尾部作任務,並移除) 33 int jobsN; //隊列中工做的個數 34 sem_t *queueSem; //原子信號量 35 }thpool_jobqueue; 36 37 /*thread pool*/ 38 39 typedef struct thpool_t { 40 pthread_t *threads ; //線程的ID 41 int threadsN ; //線程的數量 42 thpool_jobqueue *jobqueue; //工做隊列的指針 43 44 45 }thpool_t; 46 47 48 /*線程池中的線程都須要互斥鎖和指向線程池的一個指針*/ 49 typedef struct thread_data{ 50 pthread_mutex_t *mutex_p ; 51 thpool_t *tp_p ; 52 }thread_data; 53 54 55 56 /* 57 * 初始化線程池 58 * 爲線程池, 工做隊列, 申請內存空間,信號等申請內存空間 59 * @param :將被使用的線程ID 60 * @return :成功返回的線程池結構體,錯誤返回null 61 */ 62 63 thpool_t *thpool_init (int threadsN); 64 65 /* 66 * 每一個線程要作的事情 67 * 這是一個無止境循環,當撤銷這線程池的時候,這個循環纔會被中斷 68 *@param: 線程池 69 *@return:不作任何的事情 70 */ 71 72 void thpool_thread_do (thpool_t *tp_p); 73 74 /* 75 *向工做隊列裏面添加任何 76 *採用來了一個行爲和他的參數,添加到線程池的工做對列中去, 77 * 若是你想添加工做函數,須要更多的參數,經過傳遞一個指向結構體的指針,就能夠實現一個接口 78 * ATTENTION:爲了避免引發警告,你不得不將函數和參數都帶上 79 * 80 * @param: 添加工做的線程線程池 81 * @param: 這個工做的處理函數 82 * @param:函數的參數 83 * @return : int 84 */ 85 86 int thpool_add_work (thpool_t *tp_p ,void* (*function_p) (void *), void* arg_p ); 87 88 89 /* 90 *摧毀線程池 91 * 92 *這將撤銷這個線程池和釋放所申請的內存空間,當你在調用這個函數的時候,存在有的線程還在運行中,那麼 93 *中止他們如今所作的工做,而後他們被撤銷掉 94 * @param:你想要撤銷的線程池的指針 95 */ 96 97 98 void thpool_destory (thpool_t *tp_p); 99 100 /*-----------------------Queue specific---------------------------------*/ 101 102 103 104 /* 105 * 初始化隊列 106 * @param: 指向線程池的指針 107 * @return :成功的時候返回是 0 ,分配內存失敗的時候,返回是-1 108 */ 109 int thpool_jobqueue_init (thpool_t *tp_p); 110 111 112 /* 113 *添加任務到隊列 114 *一個新的工做任務將被添加到隊列,在使用這個函數或者其餘向別的相似這樣 115 *函數 thpool_jobqueue_empty ()以前,這個新的任務要被申請內存空間 116 * 117 * @param: 指向線程池的指針 118 * @param:指向一個已經申請內存空間的任務 119 * @return nothing 120 */ 121 void thpool_jobqueue_add (thpool_t * tp_p , thpool_job_t *newjob_p); 122 123 /* 124 * 移除對列的最後一個任務 125 *這個函數將不會被釋放申請的內存空間,因此要保證 126 * 127 *@param :指向線程池的指針 128 *@return : 成功返回0 ,若是對列是空的,就返回-1 129 */ 130 int thpool_jobqueue_removelast (thpool_t *tp_p); 131 132 133 /* 134 *對列的最後一個任務 135 *在隊列裏面獲得最後一個任務,即便隊列是空的,這個函數依舊可使用 136 * 137 *參數:指向線程池結構體的指針 138 *返回值:獲得隊列中最後一個任務的指針,或者在對列是空的狀況下,返回是空 139 */ 140 thpool_job_t * thpool_jobqueue_peek (thpool_t *tp_p); 141 142 /* 143 *移除和撤銷這個隊列中的全部任務 144 *這個函數將刪除這個隊列中的全部任務,將任務對列恢復到初始化狀態,所以隊列的頭和對列的尾都設置爲NULL ,此時隊列中任務= 0 145 * 146 *參數:指向線程池結構體的指針 147 * 148 */ 149 void thpool_jobqueue_empty (thpool_t *tp_p); 150 151 #endif
1 /*! 2 * Email: scictor@gmail.com 3 * Auth: scictor 4 * Date: 2019-9-9 5 * File: epoll_reactor_threadpoll.cpp 6 * Class: epoll_reactor_threadpoll (if applicable) 7 * Brief: 8 * Note: 9 */ 10 #include <unistd.h> 11 #include <assert.h> 12 #include <stdio.h> 13 #include <stdlib.h> 14 #include <errno.h> 15 #include <pthread.h> 16 #include <semaphore.h> 17 18 19 #include "threadPool.h" 20 21 22 static int thpool_keepalive = 1 ; //線程池保持存活 23 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER ; //靜態賦值法初始化互斥鎖 24 25 26 thpool_t * thpool_init (int threadsN){ 27 thpool_t *tp_p ; 28 29 if (!threadsN || threadsN < 1){ 30 threadsN = 1 ; 31 32 } 33 34 tp_p = (thpool_t *)malloc (sizeof (thpool_t)) ; 35 if (tp_p == NULL){ 36 fprintf (stderr ,"thpool_init (): could not allocate memory for thread pool\n"); 37 return NULL ; 38 } 39 tp_p->threads = (pthread_t *)malloc (threadsN * sizeof (pthread_t)); 40 if (tp_p->threads == NULL){ 41 fprintf( stderr , "could not allocation memory for thread id\n"); 42 return NULL; 43 } 44 tp_p->threadsN = threadsN ; 45 46 47 if (thpool_jobqueue_init (tp_p) == -1){ 48 fprintf (stderr ,"could not allocate memory for job queue\n"); 49 return NULL; 50 } 51 52 /*初始化信號*/ 53 tp_p->jobqueue->queueSem = (sem_t *)malloc (sizeof (sem_t)); 54 55 /*定位一個匿名信號量,第二個參數是0表示。這個信號量將在進程內的線程是共享的,爲1表示進程間共享,第三個參數是信號量的初始值*/ 56 sem_init (tp_p->jobqueue->queueSem, 0 , 0 ); 57 58 int t ; 59 60 61 62 for (t = 0 ; t < threadsN ; t++){ 63 printf ("Create thread %d in pool\n", t); 64 65 //第四個參數是傳遞給函數指針的一個參數,這個函數指針就是咱們所說的線程指針 66 if (pthread_create (&(tp_p->threads[t]) , NULL , (void *) thpool_thread_do , (void *)tp_p)){ 67 free (tp_p->threads); 68 69 free (tp_p->jobqueue->queueSem); 70 free (tp_p->jobqueue); 71 free (tp_p); 72 } 73 } 74 return tp_p ; 75 } 76 77 78 79 /* 80 * 初始化完線程應該處理的事情 81 * 這裏存在兩個信號量, 82 */ 83 84 void thpool_thread_do (thpool_t *tp_p){ 85 while (thpool_keepalive) 86 { 87 if (sem_wait (tp_p->jobqueue->queueSem)) //若是工做隊列中沒有工做,那麼全部的線程都將在這裏阻塞,當他調用成功的時候,信號量-1 88 { 89 fprintf(stderr , "Waiting for semaphore\n"); 90 exit (1); 91 } 92 93 if (thpool_keepalive) 94 { 95 void *(*func_buff) (void *arg); 96 void *arg_buff; 97 thpool_job_t *job_p; 98 99 pthread_mutex_lock (&mutex); 100 job_p = thpool_jobqueue_peek (tp_p); 101 func_buff = job_p->function ; 102 arg_buff= job_p->arg ; 103 thpool_jobqueue_removelast (tp_p); 104 pthread_mutex_unlock (&mutex); 105 106 func_buff (arg_buff); 107 108 free (job_p); 109 } 110 else 111 { 112 return ; 113 } 114 } 115 return ; 116 117 118 119 120 } 121 122 123 int thpool_add_work (thpool_t *tp_p ,void * (*function_p )(void *), void *arg_p){ 124 125 thpool_job_t *newjob ; 126 127 newjob = (thpool_job_t *)malloc (sizeof (thpool_job_t)); 128 if (newjob == NULL) 129 { 130 fprintf (stderr,"couldnot allocate memory for new job\n"); 131 exit (1); 132 } 133 newjob->function = function_p ; 134 newjob->arg = arg_p ; 135 136 pthread_mutex_lock (&mutex); 137 thpool_jobqueue_add (tp_p ,newjob); 138 pthread_mutex_unlock (&mutex); 139 return 0 ; 140 } 141 142 143 void thpool_destory (thpool_t *tp_p){ 144 int t ; 145 146 thpool_keepalive = 0 ; //讓全部的線程運行的線程都退出循環 147 148 for (t = 0 ; t < (tp_p->threadsN) ; t++ ){ 149 150 //sem_post 會使在這個線程上阻塞的線程,再也不阻塞 151 if (sem_post (tp_p->jobqueue->queueSem) ){ 152 fprintf (stderr,"thpool_destory () : could not bypass sem_wait ()\n"); 153 } 154 155 } 156 if (sem_destroy (tp_p->jobqueue->queueSem)!= 0){ 157 fprintf (stderr, "thpool_destory () : could not destroy semaphore\n"); 158 } 159 160 for (t = 0 ; t< (tp_p->threadsN) ; t++) 161 { 162 pthread_join (tp_p->threads[t], NULL); 163 } 164 thpool_jobqueue_empty (tp_p); 165 free (tp_p->threads); 166 free (tp_p->jobqueue->queueSem); 167 free (tp_p->jobqueue); 168 free (tp_p); 169 170 171 172 } 173 174 175 int thpool_jobqueue_init (thpool_t *tp_p) 176 { 177 tp_p->jobqueue = (thpool_jobqueue *)malloc (sizeof (thpool_jobqueue)); 178 if (tp_p->jobqueue == NULL) 179 { 180 fprintf (stderr ,"thpool_jobqueue malloc is error\n"); 181 return -1 ; 182 } 183 tp_p->jobqueue->tail = NULL ; 184 tp_p->jobqueue->head = NULL ; 185 tp_p->jobqueue->jobsN = 0 ; 186 return 0 ; 187 188 } 189 190 void thpool_jobqueue_add (thpool_t *tp_p , thpool_job_t *newjob_p){ 191 newjob_p->next = NULL ; 192 newjob_p->prev = NULL ; 193 194 thpool_job_t *oldfirstjob ; 195 oldfirstjob = tp_p->jobqueue->head; 196 197 198 switch (tp_p->jobqueue->jobsN) 199 { 200 case 0 : 201 tp_p->jobqueue->tail = newjob_p; 202 tp_p->jobqueue->head = newjob_p; 203 break; 204 default : 205 oldfirstjob->prev= newjob_p ; 206 newjob_p->next = oldfirstjob ; 207 tp_p->jobqueue->head= newjob_p; 208 break; 209 } 210 211 (tp_p->jobqueue->jobsN)++ ; 212 sem_post (tp_p->jobqueue->queueSem); //原子操做,信號量增長1 ,保證線程安全 213 214 int sval ; 215 sem_getvalue (tp_p->jobqueue->queueSem , &sval); //sval表示當前正在阻塞的線程數量 216 217 } 218 219 int thpool_jobqueue_removelast (thpool_t *tp_p){ 220 thpool_job_t *oldlastjob , *tmp; 221 oldlastjob = tp_p->jobqueue->tail ; 222 223 224 switch (tp_p->jobqueue->jobsN) 225 { 226 case 0 : 227 return -1 ; 228 break; 229 case 1 : 230 tp_p->jobqueue->head = NULL ; 231 tp_p->jobqueue->tail = NULL ; 232 break; 233 default : 234 tmp = oldlastjob->prev; 235 tmp->next = NULL ; 236 tp_p->jobqueue->tail = oldlastjob->prev; 237 238 } 239 (tp_p->jobqueue->jobsN) -- ; 240 int sval ; 241 sem_getvalue (tp_p->jobqueue->queueSem, &sval); 242 printf("sval:%d\n", sval); 243 return 0 ; 244 } 245 thpool_job_t * thpool_jobqueue_peek (thpool_t *tp_p){ 246 return tp_p->jobqueue->tail ; 247 } 248 249 250 void thpool_jobqueue_empty (thpool_t *tp_p) 251 { 252 thpool_job_t *curjob; 253 curjob = tp_p->jobqueue->tail ; 254 while (tp_p->jobqueue->jobsN){ 255 tp_p->jobqueue->tail = curjob->prev ; 256 free (curjob); 257 curjob = tp_p->jobqueue->tail ; 258 tp_p->jobqueue->jobsN -- ; 259 } 260 tp_p->jobqueue->tail = NULL ; 261 tp_p->jobqueue->head = NULL ; 262 }
1 /*! 2 * Email: scictor@gmail.com 3 * Auth: scictor 4 * Date: 2019-9-9 5 * File: epoll_reactor_main.cpp 6 * Class: %{Cpp:License:ClassName} (if applicable) 7 * Brief: 8 * Note: 9 */ 10 #include <arpa/inet.h> 11 #include <unistd.h> 12 #include <assert.h> 13 #include <stdio.h> 14 #include <stdlib.h> 15 #include <string.h> 16 #include <sys/socket.h> 17 #include <sys/epoll.h> 18 #include <sys/types.h> 19 #include <pthread.h> 20 #include <fcntl.h> 21 #include <assert.h> 22 #include <errno.h> 23 #include <netinet/in.h> 24 #include "threadPool.h" 25 26 #define MAX_EVENT_NUMBER 1000 27 #define SIZE 1024 28 #define MAX 10 29 30 //從主線程向工做線程數據結構 31 struct fd 32 { 33 int epollfd; 34 int sockfd ; 35 }; 36 37 //用戶說明 38 struct user 39 { 40 int sockfd ; //文件描述符 41 char client_buf [SIZE]; //數據的緩衝區 42 }; 43 struct user user_client[MAX]; //定義一個全局的客戶數據表 44 45 /* 46 EPOLL事件有兩種模型 Level Triggered (LT) 和 Edge Triggered (ET): 47 LT(level triggered,水平觸發模式)是缺省的工做方式,而且同時支持 block 和 non-block socket。在這種作法中,內核告訴你一個文件描述符是否就緒了,而後你能夠對這個就緒的fd進行IO操做。若是你不做任何操做,內核仍是會繼續通知你的,因此,這種模式編程出錯誤可能性要小一點。 48 ET(edge-triggered,邊緣觸發模式)是高速工做方式,只支持no-block socket。在這種模式下,當描述符從未就緒變爲就緒時,內核經過epoll告訴你。而後它會假設你知道文件描述符已經就緒,而且不會再爲那個文件描述符發送更多的就緒通知,等到下次有新的數據進來的時候纔會再次觸發就緒事件。 49 */ 50 //因爲epoll設置的EPOLLONESHOT模式,當出現errno =EAGAIN,就須要從新設置文件描述符(可讀) 51 void reset_oneshot (int epollfd , int fd) 52 { 53 struct epoll_event event ; 54 event.data.fd = fd ; 55 /* 56 EPOLLONESHOT: 57 epoll有兩種觸發的方式即LT(水平觸發)和ET(邊緣觸發)兩種,在前者,只要存在着事件就會不斷的觸發,直處處理完成,然後者只觸發一次相同事件或者說只在從非觸發到觸發兩個狀態轉換的時候兒才觸發。 58 這會出現下面一種狀況,若是是多線程在處理,一個SOCKET事件到來,數據開始解析,這時候這個SOCKET又來了一樣一個這樣的事件,而你的數據解析還沒有完成,那麼程序會自動調度另一個線程或者進程來處理新的事件,這形成一個很嚴重的問題,不一樣的線程或者進程在處理同一個SOCKET的事件,這會使程序的健壯性大下降而編程的複雜度大大增長!!即便在ET模式下也有可能出現這種狀況!! 59 解決這種現象有兩種方法,一種是在單獨的線程或進程裏解析數據,也就是說,接收數據的線程接收到數據後馬上將數據轉移至另外的線程。 60 第二種方法就是本文要提到的EPOLLONESHOT這種方法,能夠在epoll上註冊這個事件,註冊這個事件後,若是在處理寫成當前的SOCKET後再也不從新註冊相關事件,那麼這個事件就再也不響應了或者說觸發了。要想從新註冊事件則須要調用epoll_ctl重置文件描述符上的事件,這樣前面的socket就不會出現競態這樣就能夠經過手動的方式來保證同一SOCKET只能被一個線程處理,不會跨越多個線程。 61 62 events 能夠是如下幾個宏的集合: 63 EPOLLIN //表示對應的文件描述符能夠讀(包括對端SOCKET正常關閉); 64 EPOLLOUT //表示對應的文件描述符能夠寫; 65 EPOLLPRI //表示對應的文件描述符有緊急的數據可讀(這裏應該表示有帶外數據到來); 66 EPOLLERR //表示對應的文件描述符發生錯誤; 67 EPOLLHUP //表示對應的文件描述符被掛斷; 68 EPOLLET //將EPOLL設爲邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來講的。 69 EPOLLONESHOT//只監聽一次事件,當監聽完此次事件以後,若是還須要繼續監聽這個socket的話,須要再次把這個socket加入到EPOLL隊列裏。 70 71 EPOLLIN事件: 72 當對方關閉鏈接(FIN), EPOLLERR,均可以認爲是一種EPOLLIN事件,在read的時候分別有0,-1兩個返回值。 73 */ 74 event.events = EPOLLIN|EPOLLET|EPOLLONESHOT ; 75 epoll_ctl (epollfd , EPOLL_CTL_MOD, fd , &event); 76 77 } 78 //向epoll內核事件表裏面添加可寫的事件 79 int addreadfd (int epollfd , int fd , int oneshot) 80 { 81 struct epoll_event event; 82 event.data.fd = fd ; 83 event.events |= ~ EPOLLIN ; 84 event.events |= EPOLLOUT ; 85 event.events |= EPOLLET; 86 if (oneshot) 87 { 88 event.events |= EPOLLONESHOT ; //設置EPOLLONESHOT 89 90 } 91 /* 92 EPOLL_CTL_ADD //註冊新的fd到epfd中; 93 EPOLL_CTL_MOD //修改已經註冊的fd的監聽事件; 94 EPOLL_CTL_DEL //從epfd中刪除一個fd; 95 */ 96 epoll_ctl (epollfd , EPOLL_CTL_MOD ,fd , &event); 97 98 } 99 //羣聊函數,至關於放入緩存 100 int groupchat (int epollfd , int sockfd , char *buf) 101 { 102 int i = 0 ; 103 for ( i = 0 ; i < MAX ; i++) 104 { 105 if (user_client[i].sockfd == sockfd) 106 { 107 continue ; 108 } 109 strncpy (user_client[i].client_buf ,buf , strlen (buf)); 110 addreadfd (epollfd , user_client[i].sockfd , 1); 111 } 112 } 113 //接受數據的函數,也就是線程的回調函數 114 void *funcation (void *args) 115 { 116 int sockfd = ((struct fd*)args)->sockfd; 117 int epollfd =((struct fd*)args)->epollfd; 118 char buf[SIZE]; 119 memset (buf , '\0', SIZE); 120 121 printf ("start new thread to receive data on fd :%d\n", sockfd); 122 123 //因爲我將epoll的工做模式設置爲ET模式,因此就要用一個循環來讀取數據,防止數據沒有讀完,而丟失。 124 while (1) 125 { 126 int ret = recv (sockfd ,buf , SIZE-1 , 0); 127 if (ret == 0) 128 { 129 close (sockfd); 130 break; 131 } 132 else if (ret < 0) 133 { 134 if (errno == EAGAIN) 135 { 136 reset_oneshot (epollfd, sockfd); //從新設置(上面已經解釋了) 137 break; 138 } 139 } 140 else 141 { 142 printf (" read data is %s\n", buf); 143 // sleep (5); 144 groupchat (epollfd , sockfd, buf ); 145 } 146 147 148 } 149 printf ("end thread receive data on fd : %d\n", sockfd); 150 return NULL; 151 } 152 //這是從新註冊,將文件描述符從可寫變成可讀 153 int addagainfd (int epollfd , int fd) 154 { 155 struct epoll_event event; 156 event.data.fd = fd; 157 event.events |= ~EPOLLOUT ; 158 event.events = EPOLLIN|EPOLLET|EPOLLONESHOT; 159 epoll_ctl (epollfd , EPOLL_CTL_MOD , fd , &event); 160 } 161 //與前面的解釋同樣 162 int reset_read_oneshot (int epollfd , int sockfd) 163 { 164 struct epoll_event event; 165 event.data.fd = sockfd ; 166 event.events = EPOLLOUT |EPOLLET |EPOLLONESHOT; 167 epoll_ctl (epollfd, EPOLL_CTL_MOD , sockfd , &event); 168 return 0 ; 169 170 } 171 172 //發送讀的數據到全部客戶端 173 //int readfun (void *args) 174 void *readfun (void *args) 175 { 176 int sockfd = ((struct fd *)args)->sockfd ; 177 int epollfd= ((struct fd*)args)->epollfd ; 178 179 int ret = send (sockfd, user_client[sockfd].client_buf , strlen (user_client[sockfd].client_buf), 0); //發送數據 180 if (ret == 0 ) 181 { 182 close (sockfd); 183 printf ("發送數據失敗\n"); 184 return (void *)-1 ; 185 } 186 else if (ret == EAGAIN) 187 { 188 reset_read_oneshot (epollfd , sockfd); 189 printf("send later\n"); 190 return (void *)-1; 191 } 192 memset (&user_client[sockfd].client_buf , '\0', sizeof (user_client[sockfd].client_buf)); 193 addagainfd (epollfd , sockfd);//從新設置文件描述符 194 return NULL; 195 } 196 //套接字設置爲非阻塞 197 int setnoblocking (int fd) 198 { 199 int old_option = fcntl (fd, F_GETFL); 200 int new_option = old_option|O_NONBLOCK; 201 fcntl (fd , F_SETFL , new_option); 202 return old_option ; 203 } 204 205 int addfd (int epollfd , int fd , int oneshot) 206 { 207 struct epoll_event event; 208 event.data.fd = fd ; 209 event.events = EPOLLIN|EPOLLET ; 210 if (oneshot) 211 { 212 event.events |= EPOLLONESHOT ; 213 214 } 215 epoll_ctl (epollfd , EPOLL_CTL_ADD ,fd , &event); 216 setnoblocking (fd); 217 return 0 ; 218 } 219 220 221 222 int main(int argc, char *argv[]) 223 { 224 struct sockaddr_in address ; 225 const char *ip = "127.0.0.1"; 226 int port = 8086 ; 227 228 memset (&address , 0 , sizeof (address)); 229 address.sin_family = AF_INET ; 230 inet_pton (AF_INET ,ip , &address.sin_addr); 231 address.sin_port =htons( port) ; 232 233 234 int listenfd = socket (AF_INET, SOCK_STREAM, 0); 235 assert (listenfd >=0); 236 int reuse = 1; 237 setsockopt (listenfd , SOL_SOCKET , SO_REUSEADDR , &reuse , sizeof (reuse)); //端口重用,由於出現過端口沒法綁定的錯誤 238 int ret = bind (listenfd, (struct sockaddr*)&address , sizeof (address)); 239 assert (ret >=0 ); 240 241 ret = listen (listenfd , 5); 242 assert (ret >=0); 243 244 245 struct epoll_event events[MAX_EVENT_NUMBER]; 246 247 int epollfd = epoll_create (5); //建立內核事件描述符表 248 assert (epollfd != -1); 249 addfd (epollfd , listenfd, 0); 250 251 thpool_t *thpool ; //線程池 252 thpool = thpool_init (5) ; //線程池的一個初始化 253 254 while (1) 255 { 256 int ret = epoll_wait (epollfd, events, MAX_EVENT_NUMBER , -1);//等待就緒的文件描述符,這個函數會將就緒的複製到events的結構體數組中。 257 if (ret < 0) 258 { 259 printf ("poll failure\n"); 260 break; 261 } 262 int i =0; 263 for ( i = 0; i < ret; i++ ) 264 { 265 int sockfd = events[i].data.fd ; 266 267 if (sockfd == listenfd) 268 { 269 struct sockaddr_in client_address ; 270 socklen_t client_length = sizeof (client_address); 271 int connfd = accept (listenfd , (struct sockaddr*)&client_address,&client_length); 272 user_client[connfd].sockfd = connfd ; 273 memset (&user_client[connfd].client_buf , '\0', sizeof (user_client[connfd].client_buf)); 274 addfd (epollfd , connfd , 1);//將新的套接字加入到內核事件表裏面。 275 } 276 else if (events[i].events & EPOLLIN) 277 { 278 struct fd fds_for_new_worker ; 279 fds_for_new_worker.epollfd = epollfd ; 280 fds_for_new_worker.sockfd = sockfd ; 281 282 thpool_add_work (thpool, funcation ,&fds_for_new_worker);//將任務添加到工做隊列中 283 }else if (events[i].events & EPOLLOUT) 284 { 285 286 struct fd fds_for_new_worker ; 287 fds_for_new_worker.epollfd = epollfd ; 288 fds_for_new_worker.sockfd = sockfd ; 289 thpool_add_work (thpool, readfun , &fds_for_new_worker );//將任務添加到工做隊列中 290 } 291 292 } 293 294 } 295 296 thpool_destory (thpool); 297 close (listenfd); 298 return EXIT_SUCCESS; 299 }
這個例子只是做爲學習參考,裏面有一些問題,能夠試着修改以此提升你對epoll的認識.app
可使用telnet做爲測試客戶端:$ telnet 127.0.0.1 8086socket