一塊兒學習epoll

 epoll 是Linux內核中的一種可擴展IO事件處理機制,最先在 Linux 2.5.44內核中引入,可被用於代替POSIX select 和 poll 系統調用,而且在具備大量應用程序請求時可以得到較好的性能( 此時被監視的文件描述符數目很是大,與舊的 select 和 poll 系統調用完成操做所需 O(n) 不一樣, epoll能在O(1)時間內完成操做,因此性能至關高),epoll 與 FreeBSD的kqueue相似,都向用戶空間提供了本身的文件描述符來進行操做。html

 

IO模型實現reactor 模式的工做流程:react

(1)主線程向epoll內核事件表內註冊socket上的可讀就緒事件。編程

(2)主線程調用epoll_wait()等待socket上有數據可讀數組

(3)當socket上有數據可讀,epoll_wait 通知主線程。主線程從socket可讀事件放入請求隊列。緩存

(4)睡眠在請求隊列上的某個可讀工做線程被喚醒,從socket上讀取數據,處理客戶的請求。安全

而後向 epoll內核事件表裏註冊寫的就緒事件數據結構

(5)主線程調用epoll_wait()等待數據可寫 。多線程

  1 /*!
  2 * Email: scictor@gmail.com
  3 * Auth: scictor
  4 * Date: 2019-9-9
  5 * File: epoll_reactor_threadpoll.h
  6 * Class: epoll_reactor_threadpoll (if applicable)
  7 * Brief:
  8 * Note:
  9  */
 10 #ifndef THREAD_POOL_H
 11 #define THREAD_POOL_H
 12 
 13 #include <pthread.h>
 14 #include <semaphore.h>
 15 
 16 typedef void *(*func_call)(void*);
 17 
 18 /*Individual job*/
 19 typedef struct thpool_job_t {
 20 //    void   (*function)(void* arg);    //函數指針
 21     void *(*function)(void* arg);    //函數指針
 22     void                    *arg ;    //函數的參數
 23     /*struct tpool_job_t     *next ;    //指向下一個任務
 24     struct tpool_job_t     *prev ;    //指向前一個任務*/
 25     struct thpool_job_t     *next ;    //指向下一個任務
 26     struct thpool_job_t     *prev ;    //指向前一個任務
 27 }thpool_job_t ;
 28 
 29 /*job queue as doubly linked list雙向鏈表*/
 30 typedef struct thpool_jobqueue {
 31     thpool_job_t    *head ;            //隊列的頭指針(頭部添加job任務)
 32     thpool_job_t    *tail;            //對列的尾指針(尾部作任務,並移除)
 33     int             jobsN;           //隊列中工做的個數
 34     sem_t           *queueSem;        //原子信號量
 35 }thpool_jobqueue;
 36 
 37 /*thread pool*/
 38 
 39 typedef struct thpool_t {
 40     pthread_t          *threads ;        //線程的ID
 41     int                 threadsN ;        //線程的數量
 42     thpool_jobqueue    *jobqueue;        //工做隊列的指針
 43 
 44 
 45 }thpool_t;
 46 
 47 
 48 /*線程池中的線程都須要互斥鎖和指向線程池的一個指針*/
 49 typedef struct thread_data{
 50     pthread_mutex_t     *mutex_p ;
 51     thpool_t            *tp_p ;
 52 }thread_data;
 53 
 54 
 55 
 56 /*
 57  *    初始化線程池
 58  *    爲線程池, 工做隊列, 申請內存空間,信號等申請內存空間
 59  *    @param  :將被使用的線程ID
 60  *    @return :成功返回的線程池結構體,錯誤返回null
 61  */
 62 
 63 thpool_t   *thpool_init (int threadsN);
 64 
 65 /*
 66  * 每一個線程要作的事情
 67  * 這是一個無止境循環,當撤銷這線程池的時候,這個循環纔會被中斷
 68  *@param: 線程池
 69  *@return:不作任何的事情
 70  */
 71 
 72 void  thpool_thread_do (thpool_t *tp_p);
 73 
 74 /*
 75  *向工做隊列裏面添加任何
 76  *採用來了一個行爲和他的參數,添加到線程池的工做對列中去,
 77  * 若是你想添加工做函數,須要更多的參數,經過傳遞一個指向結構體的指針,就能夠實現一個接口
 78  * ATTENTION:爲了避免引發警告,你不得不將函數和參數都帶上
 79  *
 80  * @param: 添加工做的線程線程池
 81  * @param: 這個工做的處理函數
 82  * @param:函數的參數
 83  * @return : int
 84  */
 85 
 86 int thpool_add_work (thpool_t *tp_p ,void* (*function_p) (void *), void* arg_p );
 87 
 88 
 89 /*
 90  *摧毀線程池
 91  *
 92  *這將撤銷這個線程池和釋放所申請的內存空間,當你在調用這個函數的時候,存在有的線程還在運行中,那麼
 93  *中止他們如今所作的工做,而後他們被撤銷掉
 94  * @param:你想要撤銷的線程池的指針
 95  */
 96 
 97 
 98 void thpool_destory (thpool_t  *tp_p);
 99 
100 /*-----------------------Queue specific---------------------------------*/
101 
102 
103 
104 /*
105  * 初始化隊列
106  * @param: 指向線程池的指針
107  * @return :成功的時候返回是 0 ,分配內存失敗的時候,返回是-1
108  */
109 int thpool_jobqueue_init (thpool_t *tp_p);
110 
111 
112 /*
113  *添加任務到隊列
114  *一個新的工做任務將被添加到隊列,在使用這個函數或者其餘向別的相似這樣
115  *函數 thpool_jobqueue_empty ()以前,這個新的任務要被申請內存空間
116  *
117  * @param: 指向線程池的指針
118  * @param:指向一個已經申請內存空間的任務
119  * @return   nothing
120  */
121 void thpool_jobqueue_add (thpool_t * tp_p , thpool_job_t *newjob_p);
122 
123 /*
124  * 移除對列的最後一個任務
125  *這個函數將不會被釋放申請的內存空間,因此要保證
126  *
127  *@param :指向線程池的指針
128  *@return : 成功返回0 ,若是對列是空的,就返回-1
129  */
130 int thpool_jobqueue_removelast (thpool_t *tp_p);
131 
132 
133 /*
134  *對列的最後一個任務
135  *在隊列裏面獲得最後一個任務,即便隊列是空的,這個函數依舊可使用
136  *
137  *參數:指向線程池結構體的指針
138  *返回值:獲得隊列中最後一個任務的指針,或者在對列是空的狀況下,返回是空
139  */
140 thpool_job_t * thpool_jobqueue_peek (thpool_t *tp_p);
141 
142 /*
143  *移除和撤銷這個隊列中的全部任務
144  *這個函數將刪除這個隊列中的全部任務,將任務對列恢復到初始化狀態,所以隊列的頭和對列的尾都設置爲NULL ,此時隊列中任務= 0
145  *
146  *參數:指向線程池結構體的指針
147  *
148  */
149 void thpool_jobqueue_empty (thpool_t *tp_p);
150 
151 #endif

 

  1 /*!
  2 * Email: scictor@gmail.com
  3 * Auth: scictor
  4 * Date: 2019-9-9
  5 * File: epoll_reactor_threadpoll.cpp
  6 * Class: epoll_reactor_threadpoll (if applicable)
  7 * Brief:
  8 * Note:
  9  */
 10 #include <unistd.h>
 11 #include <assert.h>
 12 #include <stdio.h>
 13 #include <stdlib.h>
 14 #include <errno.h>
 15 #include <pthread.h>
 16 #include <semaphore.h>
 17 
 18 
 19 #include "threadPool.h"
 20 
 21 
 22 static int thpool_keepalive = 1 ;   //線程池保持存活
 23 pthread_mutex_t  mutex  = PTHREAD_MUTEX_INITIALIZER ;  //靜態賦值法初始化互斥鎖
 24 
 25 
 26 thpool_t * thpool_init (int threadsN){
 27     thpool_t  *tp_p ;
 28 
 29     if (!threadsN || threadsN < 1){
 30         threadsN = 1 ;
 31 
 32     }
 33 
 34     tp_p =  (thpool_t *)malloc (sizeof (thpool_t)) ;
 35     if (tp_p == NULL){
 36         fprintf (stderr ,"thpool_init (): could not allocate memory for thread pool\n");
 37         return NULL ;
 38     }
 39     tp_p->threads = (pthread_t *)malloc (threadsN * sizeof (pthread_t));
 40     if (tp_p->threads == NULL){
 41         fprintf( stderr , "could not allocation memory for thread id\n");
 42         return NULL;
 43     }
 44     tp_p->threadsN = threadsN ;
 45 
 46 
 47     if (thpool_jobqueue_init (tp_p) == -1){
 48         fprintf (stderr ,"could not allocate memory for job queue\n");
 49         return NULL;
 50     }
 51 
 52     /*初始化信號*/
 53     tp_p->jobqueue->queueSem = (sem_t *)malloc (sizeof (sem_t));
 54 
 55     /*定位一個匿名信號量,第二個參數是0表示。這個信號量將在進程內的線程是共享的,爲1表示進程間共享,第三個參數是信號量的初始值*/
 56     sem_init (tp_p->jobqueue->queueSem, 0 , 0 );
 57 
 58     int  t ;
 59 
 60 
 61 
 62     for (t = 0 ; t < threadsN ; t++){
 63         printf ("Create thread %d in pool\n", t);
 64 
 65         //第四個參數是傳遞給函數指針的一個參數,這個函數指針就是咱們所說的線程指針
 66         if (pthread_create (&(tp_p->threads[t]) , NULL , (void *) thpool_thread_do , (void *)tp_p)){
 67             free (tp_p->threads);
 68 
 69             free (tp_p->jobqueue->queueSem);
 70             free (tp_p->jobqueue);
 71             free (tp_p);
 72         }
 73     }
 74     return  tp_p ;
 75 }
 76 
 77 
 78 
 79 /*
 80  * 初始化完線程應該處理的事情
 81  * 這裏存在兩個信號量,
 82  */
 83 
 84 void thpool_thread_do (thpool_t *tp_p){
 85     while (thpool_keepalive)
 86     {
 87         if (sem_wait (tp_p->jobqueue->queueSem))  //若是工做隊列中沒有工做,那麼全部的線程都將在這裏阻塞,當他調用成功的時候,信號量-1
 88         {
 89             fprintf(stderr , "Waiting for semaphore\n");
 90             exit (1);
 91         }
 92 
 93         if (thpool_keepalive)
 94         {
 95             void *(*func_buff) (void *arg);
 96             void *arg_buff;
 97             thpool_job_t *job_p;
 98 
 99             pthread_mutex_lock (&mutex);
100             job_p = thpool_jobqueue_peek (tp_p);
101             func_buff = job_p->function ;
102             arg_buff= job_p->arg ;
103             thpool_jobqueue_removelast (tp_p);
104             pthread_mutex_unlock (&mutex);
105 
106             func_buff (arg_buff);
107 
108             free (job_p);
109         }
110         else
111         {
112             return ;
113         }
114     }
115     return ;
116 
117 
118 
119 
120 }
121 
122 
123 int thpool_add_work (thpool_t *tp_p ,void * (*function_p )(void *), void *arg_p){
124 
125     thpool_job_t   *newjob ;
126 
127     newjob = (thpool_job_t *)malloc (sizeof (thpool_job_t));
128     if (newjob == NULL)
129     {
130         fprintf (stderr,"couldnot allocate memory for new job\n");
131         exit (1);
132     }
133     newjob->function = function_p ;
134     newjob->arg = arg_p ;
135 
136     pthread_mutex_lock (&mutex);
137     thpool_jobqueue_add (tp_p ,newjob);
138     pthread_mutex_unlock (&mutex);
139     return 0 ;
140 }
141 
142 
143 void thpool_destory (thpool_t *tp_p){
144     int    t ;
145 
146     thpool_keepalive = 0 ;  //讓全部的線程運行的線程都退出循環
147 
148     for (t = 0 ; t < (tp_p->threadsN) ; t++ ){
149 
150         //sem_post 會使在這個線程上阻塞的線程,再也不阻塞
151         if (sem_post (tp_p->jobqueue->queueSem) ){
152             fprintf (stderr,"thpool_destory () : could not bypass sem_wait ()\n");
153         }
154 
155     }
156     if (sem_destroy (tp_p->jobqueue->queueSem)!= 0){
157         fprintf (stderr, "thpool_destory () : could not destroy semaphore\n");
158     }
159 
160     for (t = 0 ; t< (tp_p->threadsN) ; t++)
161     {
162         pthread_join (tp_p->threads[t], NULL);
163     }
164     thpool_jobqueue_empty (tp_p);
165     free (tp_p->threads);
166     free (tp_p->jobqueue->queueSem);
167     free (tp_p->jobqueue);
168     free (tp_p);
169 
170 
171 
172 }
173 
174 
175 int thpool_jobqueue_init (thpool_t *tp_p)
176 {
177     tp_p->jobqueue = (thpool_jobqueue *)malloc (sizeof (thpool_jobqueue));
178     if (tp_p->jobqueue == NULL)
179     {
180         fprintf (stderr ,"thpool_jobqueue malloc is error\n");
181         return -1 ;
182     }
183     tp_p->jobqueue->tail = NULL ;
184     tp_p->jobqueue->head = NULL ;
185     tp_p->jobqueue->jobsN = 0 ;
186     return 0 ;
187 
188 }
189 
190 void thpool_jobqueue_add (thpool_t *tp_p , thpool_job_t *newjob_p){
191     newjob_p->next = NULL ;
192     newjob_p->prev = NULL ;
193 
194     thpool_job_t   *oldfirstjob ;
195     oldfirstjob = tp_p->jobqueue->head;
196 
197 
198     switch (tp_p->jobqueue->jobsN)
199     {
200     case 0 :
201         tp_p->jobqueue->tail = newjob_p;
202         tp_p->jobqueue->head = newjob_p;
203         break;
204     default :
205         oldfirstjob->prev= newjob_p ;
206         newjob_p->next = oldfirstjob ;
207         tp_p->jobqueue->head= newjob_p;
208         break;
209     }
210 
211     (tp_p->jobqueue->jobsN)++ ;
212     sem_post (tp_p->jobqueue->queueSem);  //原子操做,信號量增長1 ,保證線程安全
213 
214     int sval ;
215     sem_getvalue (tp_p->jobqueue->queueSem , &sval);   //sval表示當前正在阻塞的線程數量
216 
217 }
218 
219 int thpool_jobqueue_removelast (thpool_t *tp_p){
220     thpool_job_t *oldlastjob  , *tmp;
221     oldlastjob = tp_p->jobqueue->tail ;
222 
223 
224     switch (tp_p->jobqueue->jobsN)
225     {
226     case 0 :
227         return -1 ;
228         break;
229     case 1 :
230         tp_p->jobqueue->head = NULL ;
231         tp_p->jobqueue->tail = NULL ;
232         break;
233     default :
234         tmp = oldlastjob->prev;
235         tmp->next = NULL ;
236         tp_p->jobqueue->tail = oldlastjob->prev;
237 
238     }
239     (tp_p->jobqueue->jobsN) -- ;
240     int sval ;
241     sem_getvalue (tp_p->jobqueue->queueSem, &sval);
242     printf("sval:%d\n", sval);
243     return 0 ;
244 }
245 thpool_job_t * thpool_jobqueue_peek (thpool_t *tp_p){
246     return tp_p->jobqueue->tail ;
247 }
248 
249 
250 void thpool_jobqueue_empty (thpool_t *tp_p)
251 {
252     thpool_job_t *curjob;
253     curjob = tp_p->jobqueue->tail ;
254     while (tp_p->jobqueue->jobsN){
255         tp_p->jobqueue->tail = curjob->prev ;
256         free (curjob);
257         curjob = tp_p->jobqueue->tail ;
258         tp_p->jobqueue->jobsN -- ;
259     }
260     tp_p->jobqueue->tail = NULL ;
261     tp_p->jobqueue->head = NULL ;
262 }

 

  1 /*!
  2 * Email: scictor@gmail.com
  3 * Auth: scictor
  4 * Date: 2019-9-9
  5 * File: epoll_reactor_main.cpp
  6 * Class: %{Cpp:License:ClassName} (if applicable)
  7 * Brief:
  8 * Note:
  9  */
 10 #include <arpa/inet.h>
 11 #include <unistd.h>
 12 #include <assert.h>
 13 #include <stdio.h>
 14 #include <stdlib.h>
 15 #include <string.h>
 16 #include <sys/socket.h>
 17 #include <sys/epoll.h>
 18 #include <sys/types.h>
 19 #include <pthread.h>
 20 #include <fcntl.h>
 21 #include <assert.h>
 22 #include <errno.h>
 23 #include <netinet/in.h>
 24 #include "threadPool.h"
 25 
 26 #define MAX_EVENT_NUMBER  1000
 27 #define SIZE    1024
 28 #define MAX     10
 29 
 30 //從主線程向工做線程數據結構
 31 struct fd
 32 {
 33     int epollfd;
 34     int sockfd ;
 35 };
 36 
 37 //用戶說明
 38 struct user
 39 {
 40     int  sockfd ;   //文件描述符
 41     char client_buf [SIZE]; //數據的緩衝區
 42 };
 43 struct user user_client[MAX];  //定義一個全局的客戶數據表
 44 
 45 /*
 46 EPOLL事件有兩種模型 Level Triggered (LT) 和 Edge Triggered (ET):
 47 LT(level triggered,水平觸發模式)是缺省的工做方式,而且同時支持 block 和 non-block socket。在這種作法中,內核告訴你一個文件描述符是否就緒了,而後你能夠對這個就緒的fd進行IO操做。若是你不做任何操做,內核仍是會繼續通知你的,因此,這種模式編程出錯誤可能性要小一點。
 48 ET(edge-triggered,邊緣觸發模式)是高速工做方式,只支持no-block socket。在這種模式下,當描述符從未就緒變爲就緒時,內核經過epoll告訴你。而後它會假設你知道文件描述符已經就緒,而且不會再爲那個文件描述符發送更多的就緒通知,等到下次有新的數據進來的時候纔會再次觸發就緒事件。
 49 */
 50 //因爲epoll設置的EPOLLONESHOT模式,當出現errno =EAGAIN,就須要從新設置文件描述符(可讀)
 51 void reset_oneshot (int epollfd , int fd)
 52 {
 53     struct epoll_event event ;
 54     event.data.fd = fd ;
 55     /*
 56 EPOLLONESHOT:
 57 epoll有兩種觸發的方式即LT(水平觸發)和ET(邊緣觸發)兩種,在前者,只要存在着事件就會不斷的觸發,直處處理完成,然後者只觸發一次相同事件或者說只在從非觸發到觸發兩個狀態轉換的時候兒才觸發。
 58 這會出現下面一種狀況,若是是多線程在處理,一個SOCKET事件到來,數據開始解析,這時候這個SOCKET又來了一樣一個這樣的事件,而你的數據解析還沒有完成,那麼程序會自動調度另一個線程或者進程來處理新的事件,這形成一個很嚴重的問題,不一樣的線程或者進程在處理同一個SOCKET的事件,這會使程序的健壯性大下降而編程的複雜度大大增長!!即便在ET模式下也有可能出現這種狀況!!
 59 解決這種現象有兩種方法,一種是在單獨的線程或進程裏解析數據,也就是說,接收數據的線程接收到數據後馬上將數據轉移至另外的線程。
 60 第二種方法就是本文要提到的EPOLLONESHOT這種方法,能夠在epoll上註冊這個事件,註冊這個事件後,若是在處理寫成當前的SOCKET後再也不從新註冊相關事件,那麼這個事件就再也不響應了或者說觸發了。要想從新註冊事件則須要調用epoll_ctl重置文件描述符上的事件,這樣前面的socket就不會出現競態這樣就能夠經過手動的方式來保證同一SOCKET只能被一個線程處理,不會跨越多個線程。
 61 
 62 events 能夠是如下幾個宏的集合:
 63 EPOLLIN     //表示對應的文件描述符能夠讀(包括對端SOCKET正常關閉);
 64 EPOLLOUT    //表示對應的文件描述符能夠寫;
 65 EPOLLPRI    //表示對應的文件描述符有緊急的數據可讀(這裏應該表示有帶外數據到來);
 66 EPOLLERR    //表示對應的文件描述符發生錯誤;
 67 EPOLLHUP    //表示對應的文件描述符被掛斷;
 68 EPOLLET     //將EPOLL設爲邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來講的。
 69 EPOLLONESHOT//只監聽一次事件,當監聽完此次事件以後,若是還須要繼續監聽這個socket的話,須要再次把這個socket加入到EPOLL隊列裏。
 70 
 71 EPOLLIN事件:
 72 當對方關閉鏈接(FIN), EPOLLERR,均可以認爲是一種EPOLLIN事件,在read的時候分別有0,-1兩個返回值。
 73 */
 74     event.events = EPOLLIN|EPOLLET|EPOLLONESHOT ;
 75     epoll_ctl (epollfd , EPOLL_CTL_MOD, fd , &event);
 76 
 77 }
 78 //向epoll內核事件表裏面添加可寫的事件
 79 int addreadfd (int epollfd , int fd , int oneshot)
 80 {
 81     struct epoll_event  event;
 82     event.data.fd = fd ;
 83     event.events |= ~ EPOLLIN ;
 84     event.events |= EPOLLOUT ;
 85     event.events |= EPOLLET;
 86     if (oneshot)
 87     {
 88         event.events |= EPOLLONESHOT ; //設置EPOLLONESHOT
 89 
 90     }
 91     /*
 92 EPOLL_CTL_ADD    //註冊新的fd到epfd中;
 93 EPOLL_CTL_MOD    //修改已經註冊的fd的監聽事件;
 94 EPOLL_CTL_DEL    //從epfd中刪除一個fd;
 95 */
 96     epoll_ctl (epollfd , EPOLL_CTL_MOD ,fd , &event);
 97 
 98 }
 99 //羣聊函數,至關於放入緩存
100 int groupchat (int epollfd , int sockfd , char *buf)
101 {
102     int i = 0 ;
103     for ( i  = 0 ; i < MAX ; i++)
104     {
105         if (user_client[i].sockfd == sockfd)
106         {
107             continue ;
108         }
109         strncpy (user_client[i].client_buf ,buf , strlen (buf));
110         addreadfd (epollfd , user_client[i].sockfd , 1);
111     }
112 }
113 //接受數據的函數,也就是線程的回調函數
114 void *funcation (void *args)
115 {
116     int sockfd = ((struct fd*)args)->sockfd;
117     int epollfd =((struct fd*)args)->epollfd;
118     char buf[SIZE];
119     memset (buf , '\0', SIZE);
120 
121     printf ("start new thread to receive data on fd :%d\n", sockfd);
122 
123     //因爲我將epoll的工做模式設置爲ET模式,因此就要用一個循環來讀取數據,防止數據沒有讀完,而丟失。
124     while (1)
125     {
126         int ret = recv (sockfd ,buf , SIZE-1 , 0);
127         if (ret == 0)
128         {
129             close (sockfd);
130             break;
131         }
132         else if (ret < 0)
133         {
134             if (errno == EAGAIN)
135             {
136                 reset_oneshot (epollfd, sockfd);  //從新設置(上面已經解釋了)
137                 break;
138             }
139         }
140         else
141         {
142             printf (" read data is %s\n", buf);
143 //            sleep (5);
144             groupchat (epollfd , sockfd, buf );
145         }
146 
147 
148     }
149     printf ("end thread receive  data on fd : %d\n", sockfd);
150     return NULL;
151 }
152 //這是從新註冊,將文件描述符從可寫變成可讀
153 int addagainfd (int epollfd , int fd)
154 {
155     struct epoll_event event;
156     event.data.fd = fd;
157     event.events  |= ~EPOLLOUT ;
158     event.events = EPOLLIN|EPOLLET|EPOLLONESHOT;
159     epoll_ctl (epollfd , EPOLL_CTL_MOD , fd , &event);
160 }
161 //與前面的解釋同樣
162 int reset_read_oneshot (int epollfd , int sockfd)
163 {
164     struct epoll_event  event;
165     event.data.fd = sockfd ;
166     event.events = EPOLLOUT |EPOLLET |EPOLLONESHOT;
167     epoll_ctl (epollfd, EPOLL_CTL_MOD , sockfd , &event);
168     return 0 ;
169 
170 }
171 
172 //發送讀的數據到全部客戶端
173 //int readfun (void *args)
174 void *readfun (void *args)
175 {
176     int sockfd = ((struct fd *)args)->sockfd ;
177     int epollfd= ((struct fd*)args)->epollfd ;
178 
179     int ret = send (sockfd, user_client[sockfd].client_buf , strlen (user_client[sockfd].client_buf), 0); //發送數據
180     if (ret == 0 )
181     {
182         close (sockfd);
183         printf ("發送數據失敗\n");
184         return (void *)-1 ;
185     }
186     else if (ret == EAGAIN)
187     {
188         reset_read_oneshot (epollfd , sockfd);
189         printf("send later\n");
190         return (void *)-1;
191     }
192     memset (&user_client[sockfd].client_buf , '\0', sizeof (user_client[sockfd].client_buf));
193     addagainfd (epollfd , sockfd);//從新設置文件描述符
194     return NULL;
195 }
196 //套接字設置爲非阻塞
197 int setnoblocking (int fd)
198 {
199     int old_option = fcntl (fd, F_GETFL);
200     int new_option = old_option|O_NONBLOCK;
201     fcntl (fd , F_SETFL , new_option);
202     return old_option ;
203 }
204 
205 int addfd (int epollfd , int fd , int oneshot)
206 {
207     struct epoll_event  event;
208     event.data.fd = fd ;
209     event.events = EPOLLIN|EPOLLET ;
210     if (oneshot)
211     {
212         event.events |= EPOLLONESHOT ;
213 
214     }
215     epoll_ctl (epollfd , EPOLL_CTL_ADD ,fd ,  &event);
216     setnoblocking (fd);
217     return 0 ;
218 }
219 
220 
221 
222 int main(int argc, char *argv[])
223 {
224     struct sockaddr_in  address ;
225     const char *ip = "127.0.0.1";
226     int port  = 8086 ;
227 
228     memset (&address , 0 , sizeof (address));
229     address.sin_family = AF_INET ;
230     inet_pton (AF_INET ,ip , &address.sin_addr);
231     address.sin_port =htons( port) ;
232 
233 
234     int listenfd = socket (AF_INET, SOCK_STREAM, 0);
235     assert (listenfd >=0);
236     int reuse = 1;
237     setsockopt (listenfd , SOL_SOCKET , SO_REUSEADDR , &reuse , sizeof (reuse)); //端口重用,由於出現過端口沒法綁定的錯誤
238     int ret = bind (listenfd, (struct sockaddr*)&address , sizeof (address));
239     assert (ret >=0 );
240 
241     ret = listen (listenfd , 5);
242     assert (ret >=0);
243 
244 
245     struct epoll_event events[MAX_EVENT_NUMBER];
246 
247     int epollfd = epoll_create (5); //建立內核事件描述符表
248     assert (epollfd != -1);
249     addfd (epollfd , listenfd, 0);
250 
251     thpool_t  *thpool ;  //線程池
252     thpool = thpool_init (5) ; //線程池的一個初始化
253 
254     while (1)
255     {
256         int ret = epoll_wait (epollfd, events, MAX_EVENT_NUMBER , -1);//等待就緒的文件描述符,這個函數會將就緒的複製到events的結構體數組中。
257         if (ret < 0)
258         {
259             printf ("poll failure\n");
260             break;
261         }
262         int i =0;
263         for ( i = 0; i < ret; i++ )
264         {
265             int sockfd = events[i].data.fd ;
266 
267             if (sockfd == listenfd)
268             {
269                 struct sockaddr_in client_address ;
270                 socklen_t  client_length = sizeof (client_address);
271                 int connfd = accept (listenfd , (struct sockaddr*)&client_address,&client_length);
272                 user_client[connfd].sockfd = connfd ;
273                 memset (&user_client[connfd].client_buf , '\0', sizeof (user_client[connfd].client_buf));
274                 addfd (epollfd , connfd , 1);//將新的套接字加入到內核事件表裏面。
275             }
276             else if (events[i].events & EPOLLIN)
277             {
278                 struct fd    fds_for_new_worker ;
279                 fds_for_new_worker.epollfd = epollfd ;
280                 fds_for_new_worker.sockfd = sockfd ;
281 
282                 thpool_add_work (thpool, funcation ,&fds_for_new_worker);//將任務添加到工做隊列中
283             }else if (events[i].events & EPOLLOUT)
284             {
285 
286                 struct  fd   fds_for_new_worker ;
287                 fds_for_new_worker.epollfd = epollfd ;
288                 fds_for_new_worker.sockfd = sockfd ;
289                 thpool_add_work (thpool, readfun , &fds_for_new_worker );//將任務添加到工做隊列中
290             }
291 
292         }
293 
294     }
295 
296     thpool_destory (thpool);
297     close (listenfd);
298     return EXIT_SUCCESS;
299 }

 

這個例子只是做爲學習參考,裏面有一些問題,能夠試着修改以此提升你對epoll的認識.app

可使用telnet做爲測試客戶端:$ telnet 127.0.0.1 8086socket

整理資料

相關文章
相關標籤/搜索