Epoll詳解及源碼分析

1.什麼是epollhtml

epoll是當前在Linux下開發大規模併發網絡程序的熱門人選,epoll 在Linux2.6內核中正式引入,和select類似,都是I/O多路複用(IO multiplexing)技術,按照man手冊的說法:是爲處理大批量句柄而做了改進的poll。node

Linux下有如下幾個經典的服務器模型:linux

 

①Apache模型(Process Per Connection,簡稱PPC) 和 TPC(Thread Per Connection)模型

這兩種模型思想相似,就是讓每個到來的鏈接都有一個進程/線程來服務。這種模型的代價是它要時間和空間。鏈接較多時,進程/線程切換的開銷比較大。所以這類模型能接受的最大鏈接數都不會高,通常在幾百個左右。程序員

 

②select模型

最大併發數限制:由於一個進程所打開的fd(文件描述符)是有限制的,由FD_SETSIZE設置,默認值是1024/2048,所以select模型的最大併發數就被相應限制了。數組

效率問題:select每次調用都會線性掃描所有的fd集合,這樣效率就會呈現線性降低,把FD_SETSIZE改大可能形成這些fd都超時了。服務器

內核/用戶空間內存拷貝問題:如何讓內核把fd消息通知給用戶空間呢?在這個問題上select採起了內存拷貝方法。網絡

 

③poll模型

雖然解決了select 最大併發數的限制,可是依然存在select的效率問題,select缺點的2和3它都沒有改掉。數據結構

 

④epoll模型

對比其餘模型的問題,epoll的改進以下:架構

1.支持一個進程打開大數目的socket描述符(FD)
    select 最不能忍受的是一個進程所打開的FD是有必定限制的,由FD_SETSIZE設置,默認值是2048。對於那些須要支持的上萬鏈接數目的IM服務器來講顯 然太少了。這時候你一是能夠選擇修改這個宏而後從新編譯內核,不過資料也同時指出這樣會帶來網絡效率的降低,二是能夠選擇多進程的解決方案(傳統的 Apache方案),不過雖然linux上面建立進程的代價比較小,但仍舊是不可忽視的,加上進程間數據同步遠比不上線程間同步的高效,因此也不是一種完 美的方案。不過 epoll則沒有這個限制,它所支持的FD上限是最大能夠打開文件的數目,這個數字通常遠大於2048,舉個例子,在1GB內存的機器上大約是10萬左 右,具體數目能夠cat /proc/sys/fs/file-max察看,通常來講這個數目和系統內存關係很大。
 
     2.IO效率不隨FD數目增長而線性降低
    傳統的select/poll另外一個致命弱點就是當你擁有一個很大的socket集合,不過因爲網絡延時,任一時間只有部分的socket是"活躍"的, 可是select/poll每次調用都會線性掃描所有的集合,致使效率呈現線性降低。可是epoll不存在這個問題,它只會對"活躍"的socket進行 操做---這是由於在內核實現中epoll是根據每一個fd上面的callback函數實現的。那麼,只有"活躍"的socket纔會主動的去調用 callback函數,其餘idle狀態socket則不會,在這點上,epoll實現了一個"僞"AIO,由於這時候推進力在os內核。在一些 benchmark中,若是全部的socket基本上都是活躍的---好比一個高速LAN環境,epoll並不比select/poll有什麼效率,相 反,若是過多使用epoll_ctl,效率相比還有稍微的降低。可是一旦使用idle connections模擬WAN環境,epoll的效率就遠在select/poll之上了。
 
3.使用mmap加速內核與用戶空間的消息傳遞
    這點實際上涉及到epoll的具體實現了。不管是select,poll仍是epoll都須要內核把FD消息通知給用戶空間,如何避免沒必要要的內存拷貝就 很重要,在這點上,epoll是經過內核於用戶空間mmap同一塊內存實現的。而若是你想我同樣從2.5內核就關注epoll的話,必定不會忘記手工 mmap這一步的。
 
4.內核微調
      這一點其實不算epoll的優勢了,而是整個linux平臺的優勢。也許你能夠懷疑linux平臺,可是你沒法迴避linux平臺賦予你微調內核的能力。 好比,內核TCP/IP協議棧使用內存池管理sk_buff結構,那麼能夠在運行時期動態調整這個內存pool(skb_head_pool)的大小 --- 經過echo XXXX>/proc/sys/net/core/hot_list_length完成。再好比listen函數的第2個參數(TCP完成3次握手 的數據包隊列長度),也能夠根據你平臺內存大小動態調整。更甚至在一個數據包面數目巨大但同時每一個數據包自己大小卻很小的特殊系統上嘗試最新的NAPI網 卡驅動架構。
 併發

2.Epoll API

epoll只有epoll_create,epoll_ctl,epoll_wait 3個系統調用。

   1: #include  <sys/epoll.h>
   2: 
   3: int  epoll_create(int  size);
   4: 
   5: int  epoll_ctl(int epfd, int op, int fd, structepoll_event *event);
   6: 
   7: int  epoll_wait(int epfd, struct epoll_event* events, int maxevents. int timeout);
   8: 
   9: 

① int epoll_create(int size);

建立一個epoll的句柄。自從linux2.6.8以後,size參數是被忽略的。須要注意的是,當建立好epoll句柄後,它就是會佔用一個 fd值,在linux下若是查看/proc/進程id/fd/,是可以看到這個fd的,因此在使用完epoll後,必須調用close()關閉,不然可能 致使fd被耗盡。

②int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件註冊函數,它不一樣於select()是在監聽事件時告訴內核要監聽什麼類型的事件,而是在這裏先註冊要監聽的事件類型。
第一個參數是epoll_create()的返回值。
第二個參數表示動做,用三個宏來表示:
EPOLL_CTL_ADD:註冊新的fd到epfd中;
EPOLL_CTL_MOD:修改已經註冊的fd的監聽事件;
EPOLL_CTL_DEL:從epfd中刪除一個fd;
 
第三個參數是須要監聽的fd。
第四個參數是告訴內核須要監聽什麼事,struct epoll_event結構以下:

   1: //保存觸發事件的某個文件描述符相關的數據(與具體使用方式有關)
   2: 
   3: typedef union epoll_data {
   4:     void *ptr;
   5:     int fd;
   6:     __uint32_t u32;
   7:     __uint64_t u64;
   8: } epoll_data_t;
   9:  //感興趣的事件和被觸發的事件
  10: struct epoll_event {
  11:     __uint32_t events; /* Epoll events */
  12:     epoll_data_t data; /* User data variable */
  13: };

events能夠是如下幾個宏的集合:
EPOLLIN :表示對應的文件描述符能夠讀(包括對端SOCKET正常關閉);
EPOLLOUT:表示對應的文件描述符能夠寫;
EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(這裏應該表示有帶外數據到來);
EPOLLERR:表示對應的文件描述符發生錯誤;
EPOLLHUP:表示對應的文件描述符被掛斷;
EPOLLET: 將EPOLL設爲邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來講的。
EPOLLONESHOT:只監聽一次事件,當監聽完此次事件以後,若是還須要繼續監聽這個socket的話,須要再次把這個socket加入到EPOLL隊列裏

③ int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

收集在epoll監控的事件中已經發送的事件。參數events是分配好的epoll_event結構體數組,epoll將會把發生的事件賦值到 events數組中(events不能夠是空指針,內核只負責把數據複製到這個events數組中,不會去幫助咱們在用戶態中分配內存)。 maxevents告以內核這個events有多大,這個 maxevents的值不能大於建立epoll_create()時的size,參數timeout是超時時間(毫秒,0會當即返回,-1將不肯定,也有 說法說是永久阻塞)。若是函數調用成功,返回對應I/O上已準備好的文件描述符數目,如返回0表示已超時。

3.Epoll  工做模式

①LT模式:Level Triggered水平觸發

這個是缺省的工做模式。同時支持block socket和non-block socket。內核會告訴程序員一個文件描述符是否就緒了。若是程序員不做任何操做,內核仍會通知。

 

②ET模式:Edge Triggered 邊緣觸發

是一種高速模式。僅當狀態發生變化的時候纔得到通知。這種模式假定程序員在收到一次通知後可以完整地處理事件,因而內核再也不通知這一事件。注意:緩 衝區中還有未處理的數據不算狀態變化,因此ET模式下程序員只讀取了一部分數據就再也得不到通知了,正確的用法是程序員本身確認讀完了全部的字節(一直調 用read/write直到出錯EAGAIN爲止)。

 

以下圖:

0:表示文件描述符未準備就緒

1:表示文件描述符準備就緒

image_thumb[2]

 

對於水平觸發模式(LT):在1處,若是你不作任何操做,內核依舊會不斷的通知進程文件描述符準備就緒。

對於邊緣出發模式(ET): 只有在0變化到1處的時候,內核纔會通知進程文件描述符準備就緒。以後若是不在發生文件描述符狀態變化,內核就不會再通知進程文件描述符已準備就緒。

 

Nginx 默認採用的就是ET。

 

 

4.實例

 

   1: #include <stdio.h>
   2: #include <stdlib.h>
   3: #include <unistd.h>
   4: #include <sys/socket.h>
   5: #include <errno.h>
   6: #include <sys/epoll.h>
   7: #include <netinet/in.h>
   8: #include <fcntl.h>
   9: #include <string.h>
  10:  #include <netdb.h>
  11: 
  12: 
  13: 
  14: struct epoll_event  *events = NULL;
  15: int epollFd = -1;
  16: 
  17: const int MAX_SOCK_NUM = 1024;
  18: 
  19: 
  20: int epoll_init();
  21: int epoll_socket(int domain, int type, int protocol);
  22: int epoll_cleanup();
  23: int epoll_new_conn(int sfd);
  24: 
  25: 
  26: int main()
  27: {
  28:       struct sockaddr_in listenAddr;
  29:       int listenFd = -1;
  30: 
  31:       if(-1 == epoll_init())
  32:       {
  33:           printf("epoll_init err\n");
  34:           return -1;
  35:       }
  36: 
  37:       if((listenFd = epoll_socket(AF_INET,SOCK_STREAM,0)) == -1)
  38:       {
  39:           printf("epoll_socket err\n");
  40:           epoll_cleanup();
  41:           return -1;
  42:       }
  43: 
  44:       listenAddr.sin_family = AF_INET;
  45:       listenAddr.sin_port = htons(999);
  46:       listenAddr.sin_addr.s_addr = htonl(INADDR_ANY);
  47: 
  48:       if(-1 == bind(listenFd,(struct sockaddr*)&listenAddr,sizeof(listenAddr)))
  49:       {
  50:           printf("bind err %d\n",errno);
  51:           epoll_cleanup();
  52:           return -1;
  53:       }
  54: 
  55:       if(-1 == listen(listenFd,1024))
  56:       {
  57:           printf("listen err\n");
  58:           epoll_cleanup();
  59:           return -1;
  60:       }
  61: 
  62:       //Add ListenFd into epoll
  63:       if(-1 == epoll_new_conn(listenFd))
  64:       {
  65:           printf("eph_new_conn err\n");
  66:           close(listenFd);
  67:         epoll_cleanup();
  68:         return -1;
  69:       }
  70: 
  71: 
  72:       //LOOP
  73:       while(1)
  74:       {
  75:           int n;
  76:           n = epoll_wait(epollFd,events,MAX_SOCK_NUM,-1);
  77:           for (int i = 0; i < n; i++)
  78:           {
  79:                if( (events[i].events & EPOLLERR) || ( events[i].events & EPOLLHUP ) || !(events[i].events & EPOLLIN) )
  80:                {
  81:                    printf("epoll err\n");
  82:                    close(events[i].data.fd);
  83:                    continue;
  84:                }
  85:                else if(events[i].data.fd == listenFd)
  86:                {
  87:                    while(1)
  88:                    {
  89:                        struct sockaddr inAddr;
  90:                        char hbuf[1024],sbuf[NI_MAXSERV];
  91:                        socklen_t inLen = -1;
  92:                        int inFd = -1;
  93:                        int s = 0;
  94:                        int flag = 0;
  95: 
  96:                        inLen = sizeof(inAddr);
  97:                        inFd = accept(listenFd,&inAddr,&inLen);
  98: 
  99:                        if(inFd == -1)
 100:                        {
 101:                            if( errno == EAGAIN || errno == EWOULDBLOCK )
 102:                            {
 103:                                break;
 104:                            }
 105:                            else
 106:                            {
 107:                                printf("accept error\n");
 108:                                break;
 109:                            }
 110:                        }
 111: 
 112:                     if (s ==  getnameinfo (&inAddr, inLen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV))
 113:                     {
 114:                         printf("Accepted connection on descriptor %d (host=%s, port=%s)\n", inFd, hbuf, sbuf);
 115:                     }
 116: 
 117:                     //Set Socket to non-block
 118:                     if((flag = fcntl(inFd,F_GETFL,0)) < 0 || fcntl(inFd,F_SETFL,flag | O_NONBLOCK) < 0)
 119:                     {
 120:                         close(inFd);
 121:                         return -1;
 122:                     }
 123: 
 124:                     epoll_new_conn(inFd);
 125:                    }
 126:                }
 127:                else
 128:                {
 129:                         while (1)
 130:                         {
 131:                         ssize_t count;
 132:                         char buf[512];
 133: 
 134:                         count = read (events[i].data.fd, buf, sizeof buf);
 135: 
 136:                         if (count == -1)
 137:                         {
 138:                             if (errno != EAGAIN)
 139:                              {
 140:                                 printf("read err\n");
 141:                                 }
 142: 
 143:                             break;
 144: 
 145:                         }
 146:                         else if (count == 0)
 147:                         { 
 148:                             break;
 149:                         }
 150: 
 151:                         write (1, buf, count);
 152:                     }
 153:                 }
 154:           }
 155: 
 156:       }
 157: 
 158:       epoll_cleanup();
 159: }
 160: 
 161: 
 162: int epoll_init()
 163: {
 164:     if(!(events = (struct epoll_event* ) malloc ( MAX_SOCK_NUM * sizeof(struct epoll_event))))
 165:     {
 166:         return -1;
 167:     }
 168: 
 169:     if( (epollFd = epoll_create(MAX_SOCK_NUM)) < 0 )
 170:     {
 171:         return -1;
 172:     }
 173: 
 174:     return 0;
 175: }
 176: 
 177: int epoll_socket(int domain, int type, int protocol)
 178: {
 179:     int sockFd = -1;
 180:     int flag = -1;
 181: 
 182:     if ((sockFd = socket(domain,type,protocol)) < 0)
 183:     {
 184:         return -1;
 185:     }
 186: 
 187:     //Set Socket to non-block
 188:     if((flag = fcntl(sockFd,F_GETFL,0)) < 0 || fcntl(sockFd,F_SETFL,flag | O_NONBLOCK) < 0)
 189:     {
 190:         close(sockFd);
 191:         return -1;
 192:     }
 193: 
 194:     return sockFd;
 195: }
 196: 
 197: int epoll_cleanup()
 198: {
 199:     free(events);
 200:     close(epollFd);
 201:     return 0;
 202: }
 203: 
 204: int epoll_new_conn(int sfd)
 205: {
 206: 
 207:       struct epoll_event  epollEvent;
 208:       memset(&epollEvent, 0, sizeof(struct epoll_event));
 209:       epollEvent.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET;
 210:       epollEvent.data.ptr = NULL;
 211:       epollEvent.data.fd  = sfd;
 212: 
 213:       if (epoll_ctl(epollFd, EPOLL_CTL_ADD, sfd, &epollEvent) < 0)
 214:       {
 215:         return -1;
 216:       }
 217: 
 218:     epollEvent.data.fd  = sfd;
 219: 
 220:     return 0;
 221: }

 

 

5.Epoll爲何高效

Epoll高效主要體如今如下三個方面:

①從上面的調用方式就能夠看出epoll比select/poll的一個優點:select/poll每次調用都要傳遞所要監控的全部fd給 select/poll系統調用(這意味着每次調用都要將fd列表從用戶態拷貝到內核態,當fd數目不少時,這會形成低效)。而每次調用 epoll_wait時(做用至關於調用select/poll),不須要再傳遞fd列表給內核,由於已經在epoll_ctl中將須要監控的fd告訴了 內核(epoll_ctl不須要每次都拷貝全部的fd,只須要進行增量式操做)。因此,在調用epoll_create以後,內核已經在內核態開始準備數 據結構存放要監控的fd了。每次epoll_ctl只是對這個數據結構進行簡單的維護。

 

② 此外,內核使用了slab機制,爲epoll提供了快速的數據結構:

在內核裏,一切皆文件。因此,epoll向內核註冊了一個文件系統,用於存儲上述的被監控的fd。當你調用 epoll_create時,就會在這個虛擬的epoll文件系統裏建立一個file結點。固然這個file不是普通文件,它只服務於epoll。 epoll在被內核初始化時(操做系統啓動),同時會開闢出epoll本身的內核高速cache區,用於安置每個咱們想監控的fd,這些fd會以紅黑樹的形式保存在內核cache裏,以支持快速的查找、插入、刪除。這個內核高速cache區,就是創建連續的物理內存頁,而後在之上創建slab層,簡單的說,就是物理上分配好你想要的size的內存對象,每次使用時都是使用空閒的已分配好的對象。

 

③ epoll的第三個優點在於:當咱們調用epoll_ctl往裏塞入百萬個fd時,epoll_wait仍然能夠飛快的返回,並有效的將發生事件的fd給咱們用戶。這 是因爲咱們在調用epoll_create時,內核除了幫咱們在epoll文件系統裏建了個file結點,在內核cache裏建了個紅黑樹用於存儲之後 epoll_ctl傳來的fd外,還會再創建一個list鏈表,用於存儲準備就緒的事件,當epoll_wait調用時,僅僅觀察這個list鏈表裏有沒 有數據便可。有數據就返回,沒有數據就sleep,等到timeout時間到後即便鏈表沒數據也返回。因此,epoll_wait很是 高效。並且,一般狀況下即便咱們要監控百萬計的fd,大多一次也只返回不多量的準備就緒fd而已,因此,epoll_wait僅須要從內核態copy少許 的fd到用戶態而已。那麼,這個準備就緒list鏈表是怎麼維護的呢?當咱們執行epoll_ctl時,除了把fd放到epoll文件系統裏file對象 對應的紅黑樹上以外,還會給內核中斷處理程序註冊一個回調函數,告訴內核,若是這個fd的中斷到了,就把它放到準備就緒list鏈表裏。因此,當一個 fd(例如socket)上有數據到了,內核在把設備(例如網卡)上的數據copy到內核中後就來把fd(socket)插入到準備就緒list鏈表裏 了。

如此,一顆紅黑樹,一張準備就緒fd鏈表,少許的內核cache,就幫咱們解決了大併發下的fd(socket)處理問題。

1.執行epoll_create時,建立了紅黑樹和就緒list鏈表。

2.執行epoll_ctl時,若是增長fd(socket),則檢查在紅黑樹中是否存在,存在當即返回,不存在則添加到紅黑樹上,而後向內核註冊回調函數,用於當中斷事件來臨時向準備就緒list鏈表中插入數據。

3.執行epoll_wait時馬上返回準備就緒鏈表裏的數據便可。

6.Epoll源碼分析

 

   1: static int __init eventpoll_init(void)
   2: {
   3:   mutex_init(&pmutex);
   4: 
   5:   ep_poll_safewake_init(&psw);
   6: 
   7:   epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem), 0, SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|SLAB_PANIC, NULL);
   8: 
   9:   pwq_cache = kmem_cache_create("eventpoll_pwq", sizeof(struct eppoll_entry), 0, EPI_SLAB_DEBUG|SLAB_PANIC, NULL);
  10: 
  11:   return 0;
  12: }

 

epoll用kmem_cache_create(slab分配器)分配內存用來存放struct epitem和struct eppoll_entry。

 

當向系統中添加一個fd時,就建立一個epitem結構體,這是內核管理epoll的基本數據結構:

   1: struct epitem
   2: {
   3:     struct rb_node  rbn;        //用於主結構管理的紅黑樹
   4: 
   5:     struct list_head  rdllink;  //事件就緒隊列
   6: 
   7:     struct epitem  *next;       //用於主結構體中的鏈表
   8: 
   9:     struct epoll_filefd  ffd;   //這個結構體對應的被監聽的文件描述符信息
  10: 
  11:     int  nwait;                 //poll操做中事件的個數
  12: 
  13:     struct list_head  pwqlist;  //雙向鏈表,保存着被監視文件的等待隊列,功能相似於select/poll中的poll_table
  14: 
  15:     struct eventpoll  *ep;      //該項屬於哪一個主結構體(多個epitm從屬於一個eventpoll)
  16: 
  17:     struct list_head  fllink;   //雙向鏈表,用來連接被監視的文件描述符對應的struct file。由於file裏有f_ep_link,用來保存全部監視這個文件的epoll節點
  18: 
  19:     struct epoll_event  event;  //註冊的感興趣的事件,也就是用戶空間的epoll_event
  20: 
  21: }

 

而每一個epoll fd(epfd)對應的主要數據結構爲:

   1: struct eventpoll
   2: {
   3:     spin_lock_t       lock;             //對本數據結構的訪問
   4: 
   5:     struct mutex      mtx;              //防止使用時被刪除
   6: 
   7:     wait_queue_head_t     wq;           //sys_epoll_wait() 使用的等待隊列
   8: 
   9:     wait_queue_head_t   poll_wait;      //file->poll()使用的等待隊列
  10: 
  11:     struct list_head    rdllist;        //事件知足條件的鏈表
  12: 
  13:     struct rb_root      rbr;            //用於管理全部fd的紅黑樹(樹根)
  14: 
  15:     struct epitem      *ovflist;       //將事件到達的fd進行連接起來發送至用戶空間
  16: 
  17: }
  18: 

 

eventpoll在epoll_create時建立:

   1: long sys_epoll_create(int size)
   2: {
   3: 
   4:     struct eventpoll *ep;
   5: 
   6:     ...
   7: 
   8:     ep_alloc(&ep); //爲ep分配內存並進行初始化
   9: 
  10: /* 調用anon_inode_getfd 新建一個file instance,也就是epoll能夠當作一個文件(匿名文件)。所以咱們能夠看到epoll_create會返回一個fd。epoll所管理的全部的fd都是放在一個大的結構eventpoll(紅黑樹)中,
  11: 將主結構體struct eventpoll *ep放入file->private項中進行保存(sys_epoll_ctl會取用)*/
  12: 
  13:  fd = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC));
  14: 
  15:      return fd;
  16: 
  17: }

 

其中,ep_alloc(struct eventpoll **pep)爲pep分配內存,並初始化。

其中,上面註冊的操做eventpoll_fops定義以下:

   1: static const struct file_operations eventpoll_fops = {
   2: 
   3:     .release=  ep_eventpoll_release,
   4: 
   5:     .poll    =  ep_eventpoll_poll,
   6: 
   7: };

 

這樣說來,內核中維護了一棵紅黑樹,大體的結構以下:

 

03152919-51d2e2ac3a51422bace3e4b0009225e1[2]_thumb[3]

 

接着是epoll_ctl函數(省略了出錯檢查等代碼):

   1: asmlinkage long sys_epoll_ctl(int epfd,int op,int fd,struct epoll_event __user *event) {
   2: 
   3:    int error;
   4: 
   5:    struct file *file,*tfile;
   6: 
   7:    struct eventpoll *ep;
   8: 
   9:    struct epoll_event epds;
  10: 
  11: 
  12: 
  13:    error = -FAULT;
  14: 
  15:    //判斷參數的合法性,將 __user *event 複製給 epds。
  16: 
  17:    if(ep_op_has_event(op) && copy_from_user(&epds,event,sizeof(struct epoll_event)))
  18: 
  19:            goto error_return; //省略跳轉到的代碼
  20: 
  21: 
  22: 
  23:    file  = fget (epfd); // epoll fd 對應的文件對象
  24: 
  25:    tfile = fget(fd);    // fd 對應的文件對象
  26: 
  27: 
  28: 
  29:    //在create時存入進去的(anon_inode_getfd),如今取用。
  30: 
  31:    ep = file->private->data;
  32: 
  33: 
  34: 
  35:    mutex_lock(&ep->mtx);
  36: 
  37: 
  38: 
  39:    //防止重複添加(在ep的紅黑樹中查找是否已經存在這個fd)
  40: 
  41:    epi = epi_find(ep,tfile,fd);
  42: 
  43: 
  44: 
  45:    switch(op)
  46: 
  47:    {
  48: 
  49:       ...
  50: 
  51:        case EPOLL_CTL_ADD:  //增長監聽一個fd
  52: 
  53:            if(!epi)
  54: 
  55:            {
  56: 
  57:                epds.events |= EPOLLERR | POLLHUP;     //默認包含POLLERR和POLLHUP事件
  58: 
  59:                error = ep_insert(ep,&epds,tfile,fd);  //在ep的紅黑樹中插入這個fd對應的epitm結構體。
  60: 
  61:            } else  //重複添加(在ep的紅黑樹中查找已經存在這個fd)。
  62: 
  63:                error = -EEXIST;
  64: 
  65:            break;
  66: 
  67:        ...
  68: 
  69:    }
  70: 
  71:    return error;
  72: 
  73: 
  74: 

 

ep_insert的實現以下:

   1: static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd)
   2: 
   3: {
   4: 
   5:    int error ,revents,pwake = 0;
   6: 
   7:    unsigned long flags ;
   8: 
   9:    struct epitem *epi;
  10: 
  11:    /*
  12:
  13:       struct ep_queue{
  14:
  15:          poll_table pt;
  16:
  17:          struct epitem *epi;
  18:
  19:       }   */
  20: 
  21: 
  22: 
  23:    struct ep_pqueue epq;
  24: 
  25: 
  26: 
  27:    //分配一個epitem結構體來保存每一個加入的fd
  28: 
  29:    if(!(epi = kmem_cache_alloc(epi_cache,GFP_KERNEL)))
  30: 
  31:       goto error_return;
  32: 
  33:    //初始化該結構體
  34: 
  35:    ep_rb_initnode(&epi->rbn);
  36: 
  37:    INIT_LIST_HEAD(&epi->rdllink);
  38: 
  39:    INIT_LIST_HEAD(&epi->fllink);
  40: 
  41:    INIT_LIST_HEAD(&epi->pwqlist);
  42: 
  43:    epi->ep = ep;
  44: 
  45:    ep_set_ffd(&epi->ffd,tfile,fd);
  46: 
  47:    epi->event = *event;
  48: 
  49:    epi->nwait = 0;
  50: 
  51:    epi->next = EP_UNACTIVE_PTR;
  52: 
  53: 
  54: 
  55:    epq.epi = epi;
  56: 
  57:    //安裝poll回調函數
  58: 
  59:    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc );
  60: 
  61:    /* 調用poll函數來獲取當前事件位,實際上是利用它來調用註冊函數ep_ptable_queue_proc(poll_wait中調用)。
  62:
  63:        若是fd是套接字,f_op爲socket_file_ops,poll函數是
  64:
  65:        sock_poll()。若是是TCP套接字的話,進而會調用
  66:
  67:        到tcp_poll()函數。此處調用poll函數查看當前
  68:
  69:        文件描述符的狀態,存儲在revents中。
  70:
  71:        在poll的處理函數(tcp_poll())中,會調用sock_poll_wait(),
  72:
  73:        在sock_poll_wait()中會調用到epq.pt.qproc指向的函數,
  74:
  75:        也就是ep_ptable_queue_proc()。  */ 
  76: 
  77: 
  78: 
  79:    revents = tfile->f_op->poll(tfile, &epq.pt);
  80: 
  81: 
  82: 
  83:    spin_lock(&tfile->f_ep_lock);
  84: 
  85:    list_add_tail(&epi->fllink,&tfile->f_ep_lilnks);
  86: 
  87:    spin_unlock(&tfile->f_ep_lock);
  88: 
  89: 
  90: 
  91:    ep_rbtree_insert(ep,epi); //將該epi插入到ep的紅黑樹中
  92: 
  93: 
  94: 
  95:    spin_lock_irqsave(&ep->lock,flags);
  96: 
  97: 
  98: 
  99: //  revents & event->events:剛纔fop->poll的返回值中標識的事件有用戶event關心的事件發生。
 100: 
 101: // !ep_is_linked(&epi->rdllink):epi的ready隊列中有數據。ep_is_linked用於判斷隊列是否爲空。
 102: 
 103: /*  若是要監視的文件狀態已經就緒而且尚未加入到就緒隊列中,則將當前的
 104:
 105:     epitem加入到就緒隊列中.若是有進程正在等待該文件的狀態就緒,則
 106:
 107:     喚醒一個等待的進程。  */ 
 108: 
 109: 
 110: 
 111: if((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
 112: 
 113:       list_add_tail(&epi->rdllink,&ep->rdllist); //將當前epi插入到ep->ready隊列中。
 114: 
 115: /* 若是有進程正在等待文件的狀態就緒,
 116:
 117: 也就是調用epoll_wait睡眠的進程正在等待,
 118:
 119: 則喚醒一個等待進程。
 120:
 121: waitqueue_active(q) 等待隊列q中有等待的進程返回1,不然返回0。
 122:
 123: */
 124: 
 125: 
 126: 
 127:       if(waitqueue_active(&ep->wq))
 128: 
 129:          __wake_up_locked(&ep->wq,TAKS_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE);
 130: 
 131: 
 132: 
 133: /*  若是有進程等待eventpoll文件自己(???)的事件就緒,
 134:
 135:            則增長臨時變量pwake的值,pwake的值不爲0時,
 136:
 137:            在釋放lock後,會喚醒等待進程。 */ 
 138: 
 139: 
 140: 
 141:       if(waitqueue_active(&ep->poll_wait))
 142: 
 143:          pwake++;
 144: 
 145:    }
 146: 
 147:    spin_unlock_irqrestore(&ep->lock,flags);
 148: 
 149:  
 150: 
 151: 
 152: 
 153: if(pwake)
 154: 
 155:       ep_poll_safewake(&psw,&ep->poll_wait);//喚醒等待eventpoll文件狀態就緒的進程
 156: 
 157:    return 0;
 158: 
 159: }

 

init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

revents = tfile->f_op->poll(tfile, &epq.pt);

這兩個函數將ep_ptable_queue_proc註冊到epq.pt中的qproc。

 

   1: typedef struct poll_table_struct {
   2: 
   3: poll_queue_proc qproc;
   4: 
   5: unsigned long key;
   6: 
   7: }poll_table;

執行f_op->poll(tfile, &epq.pt)時,XXX_poll(tfile, &epq.pt)函數會執行poll_wait(),poll_wait()會調用epq.pt.qproc函數,即ep_ptable_queue_proc

ep_ptable_queue_proc函數以下:

   1: /*  在文件操做中的poll函數中調用,將epoll的回調函數加入到目標文件的喚醒隊列中。
   2:
   3:     若是監視的文件是套接字,參數whead則是sock結構的sk_sleep成員的地址。  */
   4: 
   5: static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt) {
   6: 
   7: /* struct ep_queue{
   8:
   9:          poll_table pt;
  10:
  11:          struct epitem *epi;
  12:
  13:       } */
  14: 
  15:     struct epitem *epi = ep_item_from_epqueue(pt); //pt獲取struct ep_queue的epi字段。
  16: 
  17:     struct eppoll_entry *pwq;
  18: 
  19: 
  20: 
  21:     if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
  22: 
  23:         init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
  24: 
  25:         pwq->whead = whead;
  26: 
  27:         pwq->base = epi;
  28: 
  29:         add_wait_queue(whead, &pwq->wait);
  30: 
  31:         list_add_tail(&pwq->llink, &epi->pwqlist);
  32: 
  33:         epi->nwait++;
  34: 
  35:     } else {
  36: 
  37:         /* We have to signal that an error occurred */
  38: 
  39:         /*
  40:
  41:          * 若是分配內存失敗,則將nwait置爲-1,表示
  42:
  43:          * 發生錯誤,即內存分配失敗,或者已發生錯誤
  44:
  45:          */
  46: 
  47:         epi->nwait = -1;
  48: 
  49:     }
  50: 
  51: }

 

其中struct eppoll_entry定義以下:

   1: struct eppoll_entry {
   2: 
   3: struct list_head llink;
   4: 
   5: struct epitem *base;
   6: 
   7: wait_queue_t wait;
   8: 
   9: wait_queue_head_t *whead;
  10: 
  11: };

 

ep_ptable_queue_proc 函數完成 epitem 加入到特定文件的wait隊列任務。

ep_ptable_queue_proc有三個參數:

struct file *file; 該fd對應的文件對象

wait_queue_head_t *whead; 該fd對應的設備等待隊列(同select中的mydev->wait_address)

poll_table *pt; f_op->poll(tfile, &epq.pt)中的epq.pt

在ep_ptable_queue_proc函數中,引入了另一個很是重要的數據結構eppoll_entry。eppoll_entry主要完成epitem和epitem事件發生時的callback(ep_poll_callback) 函數之間的關聯。首先將eppoll_entry的whead指向fd的設備等待隊列(同select中的wait_address),而後初始化 eppoll_entry的base變量指向epitem,最後經過add_wait_queue將epoll_entry掛載到fd的設備等待隊列上。 完成這個動做後,epoll_entry已經被掛載到fd的設備等待隊列。

 

因爲ep_ptable_queue_proc函數設置了等待隊列的ep_poll_callback回調函數。因此在設備硬件數據到來時,硬件中斷處理函數中會喚醒該等待隊列上等待的進程時,會調用喚醒函數ep_poll_callback

 

   1: static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) {
   2: 
   3:    int pwake = 0;
   4: 
   5:    unsigned long flags;
   6: 
   7:    struct epitem *epi = ep_item_from_wait(wait);
   8: 
   9:    struct eventpoll *ep = epi->ep;
  10: 
  11: 
  12: 
  13:    spin_lock_irqsave(&ep->lock, flags);
  14: 
  15:    //判斷註冊的感興趣事件
  16: 
  17: //#define EP_PRIVATE_BITS  (EPOLLONESHOT | EPOLLET)
  18: 
  19: //有非EPOLLONESHONT或EPOLLET事件
  20: 
  21:    if (!(epi->event.events & ~EP_PRIVATE_BITS))
  22: 
  23:       goto out_unlock;
  24: 
  25: 
  26: 
  27:    if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
  28: 
  29:       if (epi->next == EP_UNACTIVE_PTR) {
  30: 
  31:          epi->next = ep->ovflist;
  32: 
  33:          ep->ovflist = epi;
  34: 
  35:       }
  36: 
  37:       goto out_unlock;
  38: 
  39:    }
  40: 
  41: 
  42: 
  43:    if (ep_is_linked(&epi->rdllink))
  44: 
  45:       goto is_linked;
  46: 
  47:     //***關鍵***,將該fd加入到epoll監聽的就緒鏈表中
  48: 
  49:    list_add_tail(&epi->rdllink, &ep->rdllist);
  50: 
  51:    //喚醒調用epoll_wait()函數時睡眠的進程。用戶層epoll_wait(...) 超時前返回。
  52: 
  53: if (waitqueue_active(&ep->wq))
  54: 
  55:       __wake_up_locked(&ep->wq, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE);
  56: 
  57:    if (waitqueue_active(&ep->poll_wait))
  58: 
  59:       pwake++;
  60: 
  61:    out_unlock: spin_unlock_irqrestore(&ep->lock, flags);
  62: 
  63:    if (pwake)
  64: 
  65:       ep_poll_safewake(&psw, &ep->poll_wait);
  66: 
  67:    return 1;
  68: 
  69: }

 

因此ep_poll_callback函數主要的功能是將被監視文件的等待事件就緒時,將文件對應的epitem實例添加到就緒隊列中,當用戶調用epoll_wait()時,內核會將就緒隊列中的事件報告給用戶。

epoll_wait實現以下:

   1: SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events, int, maxevents, int, timeout)  {
   2: 
   3:    int error;
   4: 
   5:    struct file *file;
   6: 
   7:    struct eventpoll *ep;
   8: 
   9:     /* 檢查maxevents參數。 */
  10: 
  11:    if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
  12: 
  13:       return -EINVAL;
  14: 
  15:     /* 檢查用戶空間傳入的events指向的內存是否可寫。參見__range_not_ok()。 */
  16: 
  17:    if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
  18: 
  19:       error = -EFAULT;
  20: 
  21:       goto error_return;
  22: 
  23:    }
  24: 
  25:     /* 獲取epfd對應的eventpoll文件的file實例,file結構是在epoll_create中建立。 */
  26: 
  27:    error = -EBADF;
  28: 
  29:    file = fget(epfd);
  30: 
  31:    if (!file)
  32: 
  33:       goto error_return;
  34: 
  35:     /* 經過檢查epfd對應的文件操做是否是eventpoll_fops 來判斷epfd是不是一個eventpoll文件。若是不是則返回EINVAL錯誤。 */
  36: 
  37:    error = -EINVAL;
  38: 
  39:    if (!is_file_epoll(file))
  40: 
  41:       goto error_fput;
  42: 
  43:     /* At this point it is safe to assume that the "private_data" contains  */
  44: 
  45:    ep = file->private_data;
  46: 
  47:     /* Time to fish for events ... */
  48: 
  49:    error = ep_poll(ep, events, maxevents, timeout);
  50: 
  51:     error_fput:
  52: 
  53:    fput(file);
  54: 
  55: error_return:
  56: 
  57:    return error;
  58: 
  59: }
  60: 
  61: 
  62: 
  63: epoll_wait調用ep_poll,ep_poll實現以下:
  64: 
  65:  static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout) {
  66: 
  67:     int res, eavail;
  68: 
  69:    unsigned long flags;
  70: 
  71:    long jtimeout;
  72: 
  73:    wait_queue_t wait;
  74: 
  75:     /* timeout是以毫秒爲單位,這裏是要轉換爲jiffies時間。這裏加上999(即1000-1),是爲了向上取整。 */
  76: 
  77:    jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;
  78: 
  79:  retry:
  80: 
  81:    spin_lock_irqsave(&ep->lock, flags);
  82: 
  83:     res = 0;
  84: 
  85:    if (list_empty(&ep->rdllist)) {
  86: 
  87:       /* 沒有事件,因此須要睡眠。當有事件到來時,睡眠會被ep_poll_callback函數喚醒。*/
  88: 
  89:       init_waitqueue_entry(&wait, current); //將current進程放在wait這個等待隊列中。
  90: 
  91:       wait.flags |= WQ_FLAG_EXCLUSIVE;
  92: 
  93:       /* 將當前進程加入到eventpoll的等待隊列中,等待文件狀態就緒或直到超時,或被信號中斷。 */
  94: 
  95:       __add_wait_queue(&ep->wq, &wait);
  96: 
  97:        for (;;) {
  98: 
  99:          /* 執行ep_poll_callback()喚醒時應當須要將當前進程喚醒,因此當前進程狀態應該爲「可喚醒」TASK_INTERRUPTIBLE  */
 100: 
 101:          set_current_state(TASK_INTERRUPTIBLE);
 102: 
 103:          /* 若是就緒隊列不爲空,也就是說已經有文件的狀態就緒或者超時,則退出循環。*/
 104: 
 105:          if (!list_empty(&ep->rdllist) || !jtimeout)
 106: 
 107:             break;
 108: 
 109:          /* 若是當前進程接收到信號,則退出循環,返回EINTR錯誤 */
 110: 
 111:          if (signal_pending(current)) {
 112: 
 113:             res = -EINTR;
 114: 
 115:             break;
 116: 
 117:          }
 118: 
 119:           spin_unlock_irqrestore(&ep->lock, flags);
 120: 
 121:          /* 主動讓出處理器,等待ep_poll_callback()將當前進程喚醒或者超時,返回值是剩餘的時間。
 122:
 123: 從這裏開始當前進程會進入睡眠狀態,直到某些文件的狀態就緒或者超時。
 124:
 125: 當文件狀態就緒時,eventpoll的回調函數ep_poll_callback()會喚醒在ep->wq指向的等待隊列中的進程。*/
 126: 
 127:          jtimeout = schedule_timeout(jtimeout);
 128: 
 129:          spin_lock_irqsave(&ep->lock, flags);
 130: 
 131:       }
 132: 
 133:       __remove_wait_queue(&ep->wq, &wait);
 134: 
 135:        set_current_state(TASK_RUNNING);
 136: 
 137:    }
 138: 
 139:     /* ep->ovflist鏈表存儲的向用戶傳遞事件時暫存就緒的文件。
 140:
 141:     * 因此無論是就緒隊列ep->rdllist不爲空,或者ep->ovflist不等於
 142:
 143:     * EP_UNACTIVE_PTR,都有可能如今已經有文件的狀態就緒。
 144:
 145:     * ep->ovflist不等於EP_UNACTIVE_PTR有兩種狀況,一種是NULL,此時
 146:
 147:     * 可能正在向用戶傳遞事件,不必定就有文件狀態就緒,
 148:
 149:     * 一種狀況時不爲NULL,此時能夠確定有文件狀態就緒,
 150:
 151:     * 參見ep_send_events()。
 152:
 153:     */
 154: 
 155:    eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
 156: 
 157:     spin_unlock_irqrestore(&ep->lock, flags);
 158: 
 159:     /* Try to transfer events to user space. In case we get 0 events and there's still timeout left over, we go trying again in search of more luck. */
 160: 
 161:    /* 若是沒有被信號中斷,而且有事件就緒,可是沒有獲取到事件(有可能被其餘進程獲取到了),而且沒有超時,則跳轉到retry標籤處,從新等待文件狀態就緒。 */
 162: 
 163:    if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) && jtimeout)
 164: 
 165:       goto retry;
 166: 
 167:     /* 返回獲取到的事件的個數或者錯誤碼 */
 168: 
 169:    return res;
 170: 
 171: }

 

 

ep_send_events函數向用戶空間發送就緒事件。

ep_send_events()函數將用戶傳入的內存簡單封裝到ep_send_events_data結構中,而後調用ep_scan_ready_list() 將就緒隊列中的事件傳入用戶空間的內存。

用戶空間訪問這個結果,進行處理。

 

 

7.參考

 

1.http://www.cnblogs.com/apprentice89/p/3234677.html

2.http://www.cnblogs.com/apprentice89/archive/2013/05/06/3063039.html

相關文章
相關標籤/搜索