epoll源碼實現分析[整理]

 

epoll用法回顧html

先簡單回顧下如何使用C庫封裝的3epoll相關的系統調用。更詳細的用法參見http://www.cnblogs.com/apprentice89/archive/2013/05/06/3063039.htmlnode

int epoll_create(int size);web

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);數據結構

int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);併發

使用起來很清晰,首先要調用epoll_create創建一個epoll fd。參數size是內核保證可以正確處理的最大文件描述符數目(如今內核使用紅黑樹組織epoll相關數據結構,再也不使用這個參數)。app

epoll_ctl能夠操做上面創建的epoll fd,例如,將剛創建的socket fd加入到epoll中讓其監控,或者把 epoll正在監控的某個socket fd移出epoll,再也不監控它等等。socket

epoll_wait在調用時,在給定的timeout時間內,當在監控的這些文件描述符中的某些文件描述符上有事件發生時,就返回用戶態的進程。tcp

 

epoll爲何高效(相比select函數

l  僅從上面的調用方式就能夠看出epollselect/poll的一個優點:select/poll每次調用都要傳遞所要監控的全部fdselect/poll系統調用(這意味着每次調用都要將fd列表從用戶態拷貝到內核態,當fd數目不少時,這會形成低效)。而每次調用epoll_wait時(做用至關於調用select/poll),不須要再傳遞fd列表給內核,由於已經在epoll_ctl中將須要監控的fd告訴了內核(epoll_ctl不須要每次都拷貝全部的fd,只須要進行增量式操做)。因此,在調用epoll_create以後,內核已經在內核態開始準備數據結構存放要監控的fd了。每次epoll_ctl只是對這個數據結構進行簡單的維護。源碼分析

 l  此外,內核使用了slab機制,爲epoll提供了快速的數據結構:

在內核裏,一切皆文件。因此,epoll向內核註冊了一個文件系統,用於存儲上述的被監控的fd。當你調用epoll_create時,就會在這個虛擬的epoll文件系統裏建立一個file結點。固然這個file不是普通文件,它只服務於epollepoll在被內核初始化時(操做系統啓動),同時會開闢出epoll本身的內核高速cache區,用於安置每個咱們想監控的fd,這些fd會以紅黑樹的形式保存在內核cache裏,以支持快速的查找、插入、刪除。這個內核高速cache區,就是創建連續的物理內存頁,而後在之上創建slab層,簡單的說,就是物理上分配好你想要的size的內存對象,每次使用時都是使用空閒的已分配好的對象。 

l  epoll的第三個優點在於:當咱們調用epoll_ctl往裏塞入百萬個fd時,epoll_wait仍然能夠飛快的返回,並有效的將發生事件的fd給咱們用戶。這是因爲咱們在調用epoll_create時,內核除了幫咱們在epoll文件系統裏建了個file結點,在內核cache裏建了個紅黑樹用於存儲之後epoll_ctl傳來的fd外,還會再創建一個list鏈表,用於存儲準備就緒的事件,當epoll_wait調用時,僅僅觀察這個list鏈表裏有沒有數據便可。有數據就返回,沒有數據就sleep,等到timeout時間到後即便鏈表沒數據也返回。因此,epoll_wait很是高效。並且,一般狀況下即便咱們要監控百萬計的fd,大多一次也只返回不多量的準備就緒fd而已,因此,epoll_wait僅須要從內核態copy少許的fd到用戶態而已。那麼,這個準備就緒list鏈表是怎麼維護的呢?當咱們執行epoll_ctl時,除了把fd放到epoll文件系統裏file對象對應的紅黑樹上以外,還會給內核中斷處理程序註冊一個回調函數,告訴內核,若是這個fd的中斷到了,就把它放到準備就緒list鏈表裏。因此,當一個fd(例如socket)上有數據到了,內核在把設備(例如網卡)上的數據copy到內核中後就來把fdsocket)插入到準備就緒list鏈表裏了。

 

如此,一顆紅黑樹,一張準備就緒fd鏈表,少許的內核cache,就幫咱們解決了大併發下的fdsocket)處理問題。

1.執行epoll_create時,建立了紅黑樹和就緒list鏈表。

2.執行epoll_ctl時,若是增長fdsocket),則檢查在紅黑樹中是否存在,存在當即返回,不存在則添加到紅黑樹上,而後向內核註冊回調函數,用於當中斷事件來臨時向準備就緒list鏈表中插入數據。

3.執行epoll_wait時馬上返回準備就緒鏈表裏的數據便可。

 

 源碼分析以下:

 static int __init eventpoll_init(void)

{

   mutex_init(&pmutex);

   ep_poll_safewake_init(&psw);

   epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem), 0, SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|SLAB_PANIC, NULL);

   pwq_cache = kmem_cache_create("eventpoll_pwq", sizeof(struct eppoll_entry), 0, EPI_SLAB_DEBUG|SLAB_PANIC, NULL);

   return 0;

}

epollkmem_cache_createslab分配器)分配內存用來存放struct epitemstruct eppoll_entry

 

當向系統中添加一個fd時,就建立一個epitem結構體,這是內核管理epoll的基本數據結構:

struct epitem {

    struct rb_node  rbn;        //用於主結構管理的紅黑樹

    struct list_head  rdllink;  //事件就緒隊列

    struct epitem  *next;       //用於主結構體中的鏈表

 struct epoll_filefd  ffd;   //這個結構體對應的被監聽的文件描述符信息

 int  nwait;                 //poll操做中事件的個數

    struct list_head  pwqlist;  //雙向鏈表,保存着被監視文件的等待隊列,功能相似於select/poll中的poll_table

    struct eventpoll  *ep;      //該項屬於哪一個主結構體(多個epitm從屬於一個eventpoll

    struct list_head  fllink;   //雙向鏈表,用來連接被監視的文件描述符對應的struct file。由於file裏有f_ep_link,用來保存全部監視這個文件的epoll節點

    struct epoll_event  event;  //註冊的感興趣的事件,也就是用戶空間的epoll_event

}

  

而每一個epoll fdepfd)對應的主要數據結構爲:

struct eventpoll {

    spin_lock_t       lock;        //對本數據結構的訪問

    struct mutex      mtx;         //防止使用時被刪除

    wait_queue_head_t     wq;      //sys_epoll_wait() 使用的等待隊列

    wait_queue_head_t   poll_wait;       //file->poll()使用的等待隊列

    struct list_head    rdllist;        //事件知足條件的鏈表

    struct rb_root      rbr;            //用於管理全部fd的紅黑樹(樹根)

    struct epitem      *ovflist;       //將事件到達的fd進行連接起來發送至用戶空間

}

  

struct eventpollepoll_create時建立。

long sys_epoll_create(int size) {

    struct eventpoll *ep;

    ...

    ep_alloc(&ep); //ep分配內存並進行初始化

/* 調用anon_inode_getfd 新建一個file instance

也就是epoll能夠當作一個文件(匿名文件)。

所以咱們能夠看到epoll_create會返回一個fd。

           epoll所管理的全部的fd都是放在一個大的結構eventpoll(紅黑樹)中,

將主結構體struct eventpoll *ep放入file->private項中進行保存(sys_epoll_ctl會取用)*/

 fd = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC));

     return fd;

}

 

其中,ep_alloc(struct eventpoll **pep)pep分配內存,並初始化。

其中,上面註冊的操做eventpoll_fops定義以下:

static const struct file_operations eventpoll_fops = {

    .release=  ep_eventpoll_release,

    .poll    =  ep_eventpoll_poll,

};

  

這樣說來,內核中維護了一棵紅黑樹,大體的結構以下:

clip_image002

 

接着是epoll_ctl函數(省略了出錯檢查等代碼):

 asmlinkage long sys_epoll_ctl(int epfd,int op,int fd,struct epoll_event __user *event) {

    int error;

    struct file *file,*tfile;

    struct eventpoll *ep;

    struct epoll_event epds;

 

    error = -FAULT;

    //判斷參數的合法性,將 __user *event 複製給 epds

    if(ep_op_has_event(op) && copy_from_user(&epds,event,sizeof(struct epoll_event)))

            goto error_return; //省略跳轉到的代碼

 

    file  = fget (epfd); // epoll fd 對應的文件對象

    tfile = fget(fd);    // fd 對應的文件對象

 

    //create時存入進去的(anon_inode_getfd),如今取用。

    ep = file->private->data;

 

    mutex_lock(&ep->mtx);

 

    //防止重複添加(在ep的紅黑樹中查找是否已經存在這個fd

    epi = epi_find(ep,tfile,fd);

 

    switch(op)

    {

       ...

        case EPOLL_CTL_ADD:  //增長監聽一個fd

            if(!epi)

            {

                epds.events |= EPOLLERR | POLLHUP;     //默認包含POLLERRPOLLHUP事件

                error = ep_insert(ep,&epds,tfile,fd);  //ep的紅黑樹中插入這個fd對應的epitm結構體。

            } else  //重複添加(在ep的紅黑樹中查找已經存在這個fd)。

                error = -EEXIST;

            break;

        ...

    }

    return error;

}

 

 

 

ep_insert的實現以下:

static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd)

{

   int error ,revents,pwake = 0;

   unsigned long flags ;

   struct epitem *epi;

   /*

      struct ep_queue{

         poll_table pt;

         struct epitem *epi;

      }   */

 

   struct ep_pqueue epq;

 

   //分配一個epitem結構體來保存每一個加入的fd

   if(!(epi = kmem_cache_alloc(epi_cache,GFP_KERNEL)))

      goto error_return;

   //初始化該結構體

   ep_rb_initnode(&epi->rbn);

   INIT_LIST_HEAD(&epi->rdllink);

   INIT_LIST_HEAD(&epi->fllink);

   INIT_LIST_HEAD(&epi->pwqlist);

   epi->ep = ep;

   ep_set_ffd(&epi->ffd,tfile,fd);

   epi->event = *event;

   epi->nwait = 0;

   epi->next = EP_UNACTIVE_PTR;

 

   epq.epi = epi;

   //安裝poll回調函數

   init_poll_funcptr(&epq.pt, ep_ptable_queue_proc );

   /* 調用poll函數來獲取當前事件位,實際上是利用它來調用註冊函數ep_ptable_queue_procpoll_wait中調用)。

       若是fd是套接字,f_opsocket_file_opspoll函數是

       sock_poll()。若是是TCP套接字的話,進而會調用

       tcp_poll()函數。此處調用poll函數查看當前

       文件描述符的狀態,存儲在revents中。

       poll的處理函數(tcp_poll())中,會調用sock_poll_wait()

       sock_poll_wait()中會調用到epq.pt.qproc指向的函數,

       也就是ep_ptable_queue_proc()  */ 

 

   revents = tfile->f_op->poll(tfile, &epq.pt);

 

   spin_lock(&tfile->f_ep_lock);

   list_add_tail(&epi->fllink,&tfile->f_ep_lilnks);

   spin_unlock(&tfile->f_ep_lock);

 

   ep_rbtree_insert(ep,epi); //將該epi插入到ep的紅黑樹中

 

   spin_lock_irqsave(&ep->lock,flags);

 

//  revents & event->events:剛纔fop->poll的返回值中標識的事件有用戶event關心的事件發生。

// !ep_is_linked(&epi->rdllink)epiready隊列中有數據。ep_is_linked用於判斷隊列是否爲空。

/*  若是要監視的文件狀態已經就緒而且尚未加入到就緒隊列中,則將當前的

    epitem加入到就緒隊列中.若是有進程正在等待該文件的狀態就緒,

    喚醒一個等待的進程。  */ 

 

if((revents & event->events) && !ep_is_linked(&epi->rdllink)) {

      list_add_tail(&epi->rdllink,&ep->rdllist); //將當前epi插入到ep->ready隊列中。

/* 若是有進程正在等待文件的狀態就緒,

也就是調用epoll_wait睡眠的進程正在等待,

則喚醒一個等待進程。

waitqueue_active(q) 等待隊列q中有等待的進程返回1,不然返回0

*/

 

      if(waitqueue_active(&ep->wq))

         __wake_up_locked(&ep->wq,TAKS_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE);

 

/*  若是有進程等待eventpoll文件自己(???)的事件就緒,

           則增長臨時變量pwake的值,pwake的值不爲0時,

           在釋放lock後,會喚醒等待進程。 */ 

 

      if(waitqueue_active(&ep->poll_wait))

         pwake++;

   }

   spin_unlock_irqrestore(&ep->lock,flags);

  

 

if(pwake)

      ep_poll_safewake(&psw,&ep->poll_wait);//喚醒等待eventpoll文件狀態就緒的進程

   return 0;

}

 

 

init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

revents = tfile->f_op->poll(tfile, &epq.pt);

這兩個函數將ep_ptable_queue_proc註冊到epq.pt中的qproc

typedef struct poll_table_struct {

poll_queue_proc qproc;

unsigned long key;

}poll_table;

 

執行f_op->poll(tfile, &epq.pt)時,XXX_poll(tfile, &epq.pt)函數會執行poll_wait()poll_wait()會調用epq.pt.qproc函數,即ep_ptable_queue_proc

ep_ptable_queue_proc函數以下:

 

 

/*  在文件操做中的poll函數中調用,將epoll的回調函數加入到目標文件的喚醒隊列中。

    若是監視的文件是套接字,參數whead則是sock結構的sk_sleep成員的地址。  */

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt) {

/* struct ep_queue{

         poll_table pt;

         struct epitem *epi;

      } */

    struct epitem *epi = ep_item_from_epqueue(pt); //pt獲取struct ep_queueepi字段。

    struct eppoll_entry *pwq;

 

    if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {

        init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);

        pwq->whead = whead;

        pwq->base = epi;

        add_wait_queue(whead, &pwq->wait);

        list_add_tail(&pwq->llink, &epi->pwqlist);

        epi->nwait++;

    } else {

        /* We have to signal that an error occurred */

        /*

         * 若是分配內存失敗,則將nwait置爲-1,表示

         * 發生錯誤,即內存分配失敗,或者已發生錯誤

         */

        epi->nwait = -1;

    }

}

 

其中struct eppoll_entry定義以下:

struct eppoll_entry {

   struct list_head llink;

   struct epitem *base;

   wait_queue_t wait;

   wait_queue_head_t *whead;

};

 

ep_ptable_queue_proc 函數完成 epitem 加入到特定文件的wait隊列任務。

ep_ptable_queue_proc有三個參數:

struct file *file;              fd對應的文件對象

wait_queue_head_t *whead;      fd對應的設備等待隊列(同select中的mydev->wait_address

poll_table *pt;                 f_op->poll(tfile, &epq.pt)中的epq.pt

 

ep_ptable_queue_proc函數中,引入了另一個很是重要的數據結構eppoll_entryeppoll_entry主要完成epitemepitem事件發生時的callbackep_poll_callback)函數之間的關聯。首先將eppoll_entrywhead指向fd的設備等待隊列(同select中的wait_address),而後初始化eppoll_entrybase變量指向epitem,最後經過add_wait_queueepoll_entry掛載到fd的設備等待隊列上。完成這個動做後,epoll_entry已經被掛載到fd的設備等待隊列。

因爲ep_ptable_queue_proc函數設置了等待隊列的ep_poll_callback回調函數。因此在設備硬件數據到來時,硬件中斷處理函數中會喚醒該等待隊列上等待的進程時,會調用喚醒函數ep_poll_callback(參見博文http://www.cnblogs.com/apprentice89/archive/2013/05/09/3068274.html)。

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) {

   int pwake = 0;

   unsigned long flags;

   struct epitem *epi = ep_item_from_wait(wait);

   struct eventpoll *ep = epi->ep;

 

   spin_lock_irqsave(&ep->lock, flags);

   //判斷註冊的感興趣事件

//#define EP_PRIVATE_BITS  (EPOLLONESHOT | EPOLLET)

//有非EPOLLONESHONTEPOLLET事件

   if (!(epi->event.events & ~EP_PRIVATE_BITS))

      goto out_unlock;

 

   if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {

      if (epi->next == EP_UNACTIVE_PTR) {

         epi->next = ep->ovflist;

         ep->ovflist = epi;

      }

      goto out_unlock;

   }

 

   if (ep_is_linked(&epi->rdllink))

      goto is_linked;

    //***關鍵***,將該fd加入到epoll監聽的就緒鏈表中

   list_add_tail(&epi->rdllink, &ep->rdllist);

   //喚醒調用epoll_wait()函數時睡眠的進程。用戶層epoll_wait(...) 超時前返回。

if (waitqueue_active(&ep->wq))

      __wake_up_locked(&ep->wq, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE);

   if (waitqueue_active(&ep->poll_wait))

      pwake++;

   out_unlock: spin_unlock_irqrestore(&ep->lock, flags);

   if (pwake)

      ep_poll_safewake(&psw, &ep->poll_wait);

   return 1;

}

 

因此ep_poll_callback函數主要的功能是將被監視文件的等待事件就緒時,將文件對應的epitem實例添加到就緒隊列中,當用戶調用epoll_wait()時,內核會將就緒隊列中的事件報告給用戶。

  

epoll_wait實現以下:

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events, int, maxevents, int, timeout)  {

   int error;

   struct file *file;

   struct eventpoll *ep;

    /* 檢查maxevents參數。 */

   if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)

      return -EINVAL;

    /* 檢查用戶空間傳入的events指向的內存是否可寫。參見__range_not_ok() */

   if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {

      error = -EFAULT;

      goto error_return;

   }

    /* 獲取epfd對應的eventpoll文件的file實例,file結構是在epoll_create中建立。 */

   error = -EBADF;

   file = fget(epfd);

   if (!file)

      goto error_return;

    /* 經過檢查epfd對應的文件操做是否是eventpoll_fops 來判斷epfd是不是一個eventpoll文件。若是不是則返回EINVAL錯誤。 */

   error = -EINVAL;

   if (!is_file_epoll(file))

      goto error_fput;

    /* At this point it is safe to assume that the "private_data" contains  */

   ep = file->private_data;

    /* Time to fish for events ... */

   error = ep_poll(ep, events, maxevents, timeout);

    error_fput:

   fput(file);

error_return:

   return error;

}

 

epoll_wait調用ep_pollep_poll實現以下:

 static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout) {

    int res, eavail;

   unsigned long flags;

   long jtimeout;

   wait_queue_t wait;

    /* timeout是以毫秒爲單位,這裏是要轉換爲jiffies時間。這裏加上999(1000-1),是爲了向上取整。 */

   jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;

 retry:

   spin_lock_irqsave(&ep->lock, flags);

    res = 0;

   if (list_empty(&ep->rdllist)) {

      /* 沒有事件,因此須要睡眠。當有事件到來時,睡眠會被ep_poll_callback函數喚醒。*/

      init_waitqueue_entry(&wait, current); //current進程放在wait這個等待隊列中。

      wait.flags |= WQ_FLAG_EXCLUSIVE;

      /* 將當前進程加入到eventpoll的等待隊列中,等待文件狀態就緒或直到超時,或被信號中斷。 */

      __add_wait_queue(&ep->wq, &wait);

       for (;;) {

         /* 執行ep_poll_callback()喚醒時應當須要將當前進程喚醒,因此當前進程狀態應該爲「可喚醒」TASK_INTERRUPTIBLE  */

         set_current_state(TASK_INTERRUPTIBLE);

         /* 若是就緒隊列不爲空,也就是說已經有文件的狀態就緒或者超時,則退出循環。*/

         if (!list_empty(&ep->rdllist) || !jtimeout)

            break;

         /* 若是當前進程接收到信號,則退出循環,返回EINTR錯誤 */

         if (signal_pending(current)) {

            res = -EINTR;

            break;

         }

          spin_unlock_irqrestore(&ep->lock, flags);

         /* 主動讓出處理器,等待ep_poll_callback()將當前進程喚醒或者超時,返回值是剩餘的時間。

從這裏開始當前進程會進入睡眠狀態,直到某些文件的狀態就緒或者超時。

當文件狀態就緒時,eventpoll的回調函數ep_poll_callback()會喚醒在ep->wq指向的等待隊列中的進程。*/

         jtimeout = schedule_timeout(jtimeout);

         spin_lock_irqsave(&ep->lock, flags);

      }

      __remove_wait_queue(&ep->wq, &wait);

       set_current_state(TASK_RUNNING);

   }

    /* ep->ovflist鏈表存儲的向用戶傳遞事件時暫存就緒的文件。

    * 因此不論是就緒隊列ep->rdllist不爲空,或者ep->ovflist不等於

    * EP_UNACTIVE_PTR,都有可能如今已經有文件的狀態就緒。

    * ep->ovflist不等於EP_UNACTIVE_PTR有兩種狀況,一種是NULL,此時

    * 可能正在向用戶傳遞事件,不必定就有文件狀態就緒,

    * 一種狀況時不爲NULL,此時能夠確定有文件狀態就緒,

    * 參見ep_send_events()

    */

   eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;

    spin_unlock_irqrestore(&ep->lock, flags);

    /* Try to transfer events to user space. In case we get 0 events and there's still timeout left over, we go trying again in search of more luck. */

   /* 若是沒有被信號中斷,而且有事件就緒,可是沒有獲取到事件(有可能被其餘進程獲取到了),而且沒有超時,則跳轉到retry標籤處,從新等待文件狀態就緒。 */

   if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) && jtimeout)

      goto retry;

    /* 返回獲取到的事件的個數或者錯誤碼 */

   return res;

}

 

ep_send_events函數向用戶空間發送就緒事件。

ep_send_events()函數將用戶傳入的內存簡單封裝到ep_send_events_data結構中,而後調用ep_scan_ready_list() 將就緒隊列中的事件傳入用戶空間的內存。

用戶空間訪問這個結果,進行處理。

 

 

主要參考:

http://blog.chinaunix.net/uid-20687780-id-2105154.html

http://blog.chinaunix.net/uid-20687780-id-2105157.html

http://blog.chinaunix.net/uid-20687780-id-2105159.html

http://www.cnblogs.com/debian/archive/2012/02/16/2354454.html

http://blog.csdn.net/moonvs2010/article/details/8506890

相關文章
相關標籤/搜索