epoll用法回顧html
先簡單回顧下如何使用C庫封裝的3個epoll相關的系統調用。更詳細的用法參見http://www.cnblogs.com/apprentice89/archive/2013/05/06/3063039.htmlnode
int epoll_create(int size);web
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);數據結構
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);併發
使用起來很清晰,首先要調用epoll_create創建一個epoll fd。參數size是內核保證可以正確處理的最大文件描述符數目(如今內核使用紅黑樹組織epoll相關數據結構,再也不使用這個參數)。app
epoll_ctl能夠操做上面創建的epoll fd,例如,將剛創建的socket fd加入到epoll中讓其監控,或者把 epoll正在監控的某個socket fd移出epoll,再也不監控它等等。socket
epoll_wait在調用時,在給定的timeout時間內,當在監控的這些文件描述符中的某些文件描述符上有事件發生時,就返回用戶態的進程。tcp
epoll爲何高效(相比select)函數
l 僅從上面的調用方式就能夠看出epoll比select/poll的一個優點:select/poll每次調用都要傳遞所要監控的全部fd給select/poll系統調用(這意味着每次調用都要將fd列表從用戶態拷貝到內核態,當fd數目不少時,這會形成低效)。而每次調用epoll_wait時(做用至關於調用select/poll),不須要再傳遞fd列表給內核,由於已經在epoll_ctl中將須要監控的fd告訴了內核(epoll_ctl不須要每次都拷貝全部的fd,只須要進行增量式操做)。因此,在調用epoll_create以後,內核已經在內核態開始準備數據結構存放要監控的fd了。每次epoll_ctl只是對這個數據結構進行簡單的維護。源碼分析
l 此外,內核使用了slab機制,爲epoll提供了快速的數據結構:
在內核裏,一切皆文件。因此,epoll向內核註冊了一個文件系統,用於存儲上述的被監控的fd。當你調用epoll_create時,就會在這個虛擬的epoll文件系統裏建立一個file結點。固然這個file不是普通文件,它只服務於epoll。epoll在被內核初始化時(操做系統啓動),同時會開闢出epoll本身的內核高速cache區,用於安置每個咱們想監控的fd,這些fd會以紅黑樹的形式保存在內核cache裏,以支持快速的查找、插入、刪除。這個內核高速cache區,就是創建連續的物理內存頁,而後在之上創建slab層,簡單的說,就是物理上分配好你想要的size的內存對象,每次使用時都是使用空閒的已分配好的對象。
l epoll的第三個優點在於:當咱們調用epoll_ctl往裏塞入百萬個fd時,epoll_wait仍然能夠飛快的返回,並有效的將發生事件的fd給咱們用戶。這是因爲咱們在調用epoll_create時,內核除了幫咱們在epoll文件系統裏建了個file結點,在內核cache裏建了個紅黑樹用於存儲之後epoll_ctl傳來的fd外,還會再創建一個list鏈表,用於存儲準備就緒的事件,當epoll_wait調用時,僅僅觀察這個list鏈表裏有沒有數據便可。有數據就返回,沒有數據就sleep,等到timeout時間到後即便鏈表沒數據也返回。因此,epoll_wait很是高效。並且,一般狀況下即便咱們要監控百萬計的fd,大多一次也只返回不多量的準備就緒fd而已,因此,epoll_wait僅須要從內核態copy少許的fd到用戶態而已。那麼,這個準備就緒list鏈表是怎麼維護的呢?當咱們執行epoll_ctl時,除了把fd放到epoll文件系統裏file對象對應的紅黑樹上以外,還會給內核中斷處理程序註冊一個回調函數,告訴內核,若是這個fd的中斷到了,就把它放到準備就緒list鏈表裏。因此,當一個fd(例如socket)上有數據到了,內核在把設備(例如網卡)上的數據copy到內核中後就來把fd(socket)插入到準備就緒list鏈表裏了。
如此,一顆紅黑樹,一張準備就緒fd鏈表,少許的內核cache,就幫咱們解決了大併發下的fd(socket)處理問題。
1.執行epoll_create時,建立了紅黑樹和就緒list鏈表。
2.執行epoll_ctl時,若是增長fd(socket),則檢查在紅黑樹中是否存在,存在當即返回,不存在則添加到紅黑樹上,而後向內核註冊回調函數,用於當中斷事件來臨時向準備就緒list鏈表中插入數據。
3.執行epoll_wait時馬上返回準備就緒鏈表裏的數據便可。
源碼分析以下:
static int __init eventpoll_init(void)
{
mutex_init(&pmutex);
ep_poll_safewake_init(&psw);
epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem), 0, SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|SLAB_PANIC, NULL);
pwq_cache = kmem_cache_create("eventpoll_pwq", sizeof(struct eppoll_entry), 0, EPI_SLAB_DEBUG|SLAB_PANIC, NULL);
return 0;
}
epoll用kmem_cache_create(slab分配器)分配內存用來存放struct epitem和struct eppoll_entry。
當向系統中添加一個fd時,就建立一個epitem結構體,這是內核管理epoll的基本數據結構:
struct epitem {
struct rb_node rbn; //用於主結構管理的紅黑樹
struct list_head rdllink; //事件就緒隊列
struct epitem *next; //用於主結構體中的鏈表
struct epoll_filefd ffd; //這個結構體對應的被監聽的文件描述符信息
int nwait; //poll操做中事件的個數
struct list_head pwqlist; //雙向鏈表,保存着被監視文件的等待隊列,功能相似於select/poll中的poll_table
struct eventpoll *ep; //該項屬於哪一個主結構體(多個epitm從屬於一個eventpoll)
struct list_head fllink; //雙向鏈表,用來連接被監視的文件描述符對應的struct file。由於file裏有f_ep_link,用來保存全部監視這個文件的epoll節點
struct epoll_event event; //註冊的感興趣的事件,也就是用戶空間的epoll_event
}
而每一個epoll fd(epfd)對應的主要數據結構爲:
struct eventpoll {
spin_lock_t lock; //對本數據結構的訪問
struct mutex mtx; //防止使用時被刪除
wait_queue_head_t wq; //sys_epoll_wait() 使用的等待隊列
wait_queue_head_t poll_wait; //file->poll()使用的等待隊列
struct list_head rdllist; //事件知足條件的鏈表
struct rb_root rbr; //用於管理全部fd的紅黑樹(樹根)
struct epitem *ovflist; //將事件到達的fd進行連接起來發送至用戶空間
}
struct eventpoll在epoll_create時建立。
long sys_epoll_create(int size) {
struct eventpoll *ep;
...
ep_alloc(&ep); //爲ep分配內存並進行初始化
/* 調用anon_inode_getfd 新建一個file instance,
也就是epoll能夠當作一個文件(匿名文件)。
所以咱們能夠看到epoll_create會返回一個fd。
epoll所管理的全部的fd都是放在一個大的結構eventpoll(紅黑樹)中,
將主結構體struct eventpoll *ep放入file->private項中進行保存(sys_epoll_ctl會取用)*/
fd = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC));
return fd;
}
其中,ep_alloc(struct eventpoll **pep)爲pep分配內存,並初始化。
其中,上面註冊的操做eventpoll_fops定義以下:
static const struct file_operations eventpoll_fops = {
.release= ep_eventpoll_release,
.poll = ep_eventpoll_poll,
};
這樣說來,內核中維護了一棵紅黑樹,大體的結構以下:
接着是epoll_ctl函數(省略了出錯檢查等代碼):
asmlinkage long sys_epoll_ctl(int epfd,int op,int fd,struct epoll_event __user *event) {
int error;
struct file *file,*tfile;
struct eventpoll *ep;
struct epoll_event epds;
error = -FAULT;
//判斷參數的合法性,將 __user *event 複製給 epds。
if(ep_op_has_event(op) && copy_from_user(&epds,event,sizeof(struct epoll_event)))
goto error_return; //省略跳轉到的代碼
file = fget (epfd); // epoll fd 對應的文件對象
tfile = fget(fd); // fd 對應的文件對象
//在create時存入進去的(anon_inode_getfd),如今取用。
ep = file->private->data;
mutex_lock(&ep->mtx);
//防止重複添加(在ep的紅黑樹中查找是否已經存在這個fd)
epi = epi_find(ep,tfile,fd);
switch(op)
{
...
case EPOLL_CTL_ADD: //增長監聽一個fd
if(!epi)
{
epds.events |= EPOLLERR | POLLHUP; //默認包含POLLERR和POLLHUP事件
error = ep_insert(ep,&epds,tfile,fd); //在ep的紅黑樹中插入這個fd對應的epitm結構體。
} else //重複添加(在ep的紅黑樹中查找已經存在這個fd)。
error = -EEXIST;
break;
...
}
return error;
}
ep_insert的實現以下:
static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd)
{
int error ,revents,pwake = 0;
unsigned long flags ;
struct epitem *epi;
/*
struct ep_queue{
poll_table pt;
struct epitem *epi;
} */
struct ep_pqueue epq;
//分配一個epitem結構體來保存每一個加入的fd
if(!(epi = kmem_cache_alloc(epi_cache,GFP_KERNEL)))
goto error_return;
//初始化該結構體
ep_rb_initnode(&epi->rbn);
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd,tfile,fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
epq.epi = epi;
//安裝poll回調函數
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc );
/* 調用poll函數來獲取當前事件位,實際上是利用它來調用註冊函數ep_ptable_queue_proc(poll_wait中調用)。
若是fd是套接字,f_op爲socket_file_ops,poll函數是
sock_poll()。若是是TCP套接字的話,進而會調用
到tcp_poll()函數。此處調用poll函數查看當前
文件描述符的狀態,存儲在revents中。
在poll的處理函數(tcp_poll())中,會調用sock_poll_wait(),
在sock_poll_wait()中會調用到epq.pt.qproc指向的函數,
也就是ep_ptable_queue_proc()。 */
revents = tfile->f_op->poll(tfile, &epq.pt);
spin_lock(&tfile->f_ep_lock);
list_add_tail(&epi->fllink,&tfile->f_ep_lilnks);
spin_unlock(&tfile->f_ep_lock);
ep_rbtree_insert(ep,epi); //將該epi插入到ep的紅黑樹中
spin_lock_irqsave(&ep->lock,flags);
// revents & event->events:剛纔fop->poll的返回值中標識的事件有用戶event關心的事件發生。
// !ep_is_linked(&epi->rdllink):epi的ready隊列中有數據。ep_is_linked用於判斷隊列是否爲空。
/* 若是要監視的文件狀態已經就緒而且尚未加入到就緒隊列中,則將當前的
epitem加入到就緒隊列中.若是有進程正在等待該文件的狀態就緒,則
喚醒一個等待的進程。 */
if((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink,&ep->rdllist); //將當前epi插入到ep->ready隊列中。
/* 若是有進程正在等待文件的狀態就緒,
也就是調用epoll_wait睡眠的進程正在等待,
則喚醒一個等待進程。
waitqueue_active(q) 等待隊列q中有等待的進程返回1,不然返回0。
*/
if(waitqueue_active(&ep->wq))
__wake_up_locked(&ep->wq,TAKS_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE);
/* 若是有進程等待eventpoll文件自己(???)的事件就緒,
則增長臨時變量pwake的值,pwake的值不爲0時,
在釋放lock後,會喚醒等待進程。 */
if(waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock,flags);
if(pwake)
ep_poll_safewake(&psw,&ep->poll_wait);//喚醒等待eventpoll文件狀態就緒的進程
return 0;
}
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
revents = tfile->f_op->poll(tfile, &epq.pt);
這兩個函數將ep_ptable_queue_proc註冊到epq.pt中的qproc。
typedef struct poll_table_struct {
poll_queue_proc qproc;
unsigned long key;
}poll_table;
執行f_op->poll(tfile, &epq.pt)時,XXX_poll(tfile, &epq.pt)函數會執行poll_wait(),poll_wait()會調用epq.pt.qproc函數,即ep_ptable_queue_proc。
ep_ptable_queue_proc函數以下:
/* 在文件操做中的poll函數中調用,將epoll的回調函數加入到目標文件的喚醒隊列中。
若是監視的文件是套接字,參數whead則是sock結構的sk_sleep成員的地址。 */
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt) {
/* struct ep_queue{
poll_table pt;
struct epitem *epi;
} */
struct epitem *epi = ep_item_from_epqueue(pt); //pt獲取struct ep_queue的epi字段。
struct eppoll_entry *pwq;
if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
/* We have to signal that an error occurred */
/*
* 若是分配內存失敗,則將nwait置爲-1,表示
* 發生錯誤,即內存分配失敗,或者已發生錯誤
*/
epi->nwait = -1;
}
}
其中struct eppoll_entry定義以下:
struct eppoll_entry {
struct list_head llink;
struct epitem *base;
wait_queue_t wait;
wait_queue_head_t *whead;
};
ep_ptable_queue_proc 函數完成 epitem 加入到特定文件的wait隊列任務。
ep_ptable_queue_proc有三個參數:
struct file *file; 該fd對應的文件對象
wait_queue_head_t *whead; 該fd對應的設備等待隊列(同select中的mydev->wait_address)
poll_table *pt; f_op->poll(tfile, &epq.pt)中的epq.pt
在ep_ptable_queue_proc函數中,引入了另一個很是重要的數據結構eppoll_entry。eppoll_entry主要完成epitem和epitem事件發生時的callback(ep_poll_callback)函數之間的關聯。首先將eppoll_entry的whead指向fd的設備等待隊列(同select中的wait_address),而後初始化eppoll_entry的base變量指向epitem,最後經過add_wait_queue將epoll_entry掛載到fd的設備等待隊列上。完成這個動做後,epoll_entry已經被掛載到fd的設備等待隊列。
因爲ep_ptable_queue_proc函數設置了等待隊列的ep_poll_callback回調函數。因此在設備硬件數據到來時,硬件中斷處理函數中會喚醒該等待隊列上等待的進程時,會調用喚醒函數ep_poll_callback(參見博文http://www.cnblogs.com/apprentice89/archive/2013/05/09/3068274.html)。
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) {
int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;
spin_lock_irqsave(&ep->lock, flags);
//判斷註冊的感興趣事件
//#define EP_PRIVATE_BITS (EPOLLONESHOT | EPOLLET)
//有非EPOLLONESHONT或EPOLLET事件
if (!(epi->event.events & ~EP_PRIVATE_BITS))
goto out_unlock;
if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
if (epi->next == EP_UNACTIVE_PTR) {
epi->next = ep->ovflist;
ep->ovflist = epi;
}
goto out_unlock;
}
if (ep_is_linked(&epi->rdllink))
goto is_linked;
//***關鍵***,將該fd加入到epoll監聽的就緒鏈表中
list_add_tail(&epi->rdllink, &ep->rdllist);
//喚醒調用epoll_wait()函數時睡眠的進程。用戶層epoll_wait(...) 超時前返回。
if (waitqueue_active(&ep->wq))
__wake_up_locked(&ep->wq, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE);
if (waitqueue_active(&ep->poll_wait))
pwake++;
out_unlock: spin_unlock_irqrestore(&ep->lock, flags);
if (pwake)
ep_poll_safewake(&psw, &ep->poll_wait);
return 1;
}
因此ep_poll_callback函數主要的功能是將被監視文件的等待事件就緒時,將文件對應的epitem實例添加到就緒隊列中,當用戶調用epoll_wait()時,內核會將就緒隊列中的事件報告給用戶。
epoll_wait實現以下:
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events, int, maxevents, int, timeout) {
int error;
struct file *file;
struct eventpoll *ep;
/* 檢查maxevents參數。 */
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
/* 檢查用戶空間傳入的events指向的內存是否可寫。參見__range_not_ok()。 */
if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
error = -EFAULT;
goto error_return;
}
/* 獲取epfd對應的eventpoll文件的file實例,file結構是在epoll_create中建立。 */
error = -EBADF;
file = fget(epfd);
if (!file)
goto error_return;
/* 經過檢查epfd對應的文件操做是否是eventpoll_fops 來判斷epfd是不是一個eventpoll文件。若是不是則返回EINVAL錯誤。 */
error = -EINVAL;
if (!is_file_epoll(file))
goto error_fput;
/* At this point it is safe to assume that the "private_data" contains */
ep = file->private_data;
/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
error_fput:
fput(file);
error_return:
return error;
}
epoll_wait調用ep_poll,ep_poll實現以下:
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout) {
int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;
/* timeout是以毫秒爲單位,這裏是要轉換爲jiffies時間。這裏加上999(即1000-1),是爲了向上取整。 */
jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;
retry:
spin_lock_irqsave(&ep->lock, flags);
res = 0;
if (list_empty(&ep->rdllist)) {
/* 沒有事件,因此須要睡眠。當有事件到來時,睡眠會被ep_poll_callback函數喚醒。*/
init_waitqueue_entry(&wait, current); //將current進程放在wait這個等待隊列中。
wait.flags |= WQ_FLAG_EXCLUSIVE;
/* 將當前進程加入到eventpoll的等待隊列中,等待文件狀態就緒或直到超時,或被信號中斷。 */
__add_wait_queue(&ep->wq, &wait);
for (;;) {
/* 執行ep_poll_callback()喚醒時應當須要將當前進程喚醒,因此當前進程狀態應該爲「可喚醒」TASK_INTERRUPTIBLE */
set_current_state(TASK_INTERRUPTIBLE);
/* 若是就緒隊列不爲空,也就是說已經有文件的狀態就緒或者超時,則退出循環。*/
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
/* 若是當前進程接收到信號,則退出循環,返回EINTR錯誤 */
if (signal_pending(current)) {
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
/* 主動讓出處理器,等待ep_poll_callback()將當前進程喚醒或者超時,返回值是剩餘的時間。
從這裏開始當前進程會進入睡眠狀態,直到某些文件的狀態就緒或者超時。
當文件狀態就緒時,eventpoll的回調函數ep_poll_callback()會喚醒在ep->wq指向的等待隊列中的進程。*/
jtimeout = schedule_timeout(jtimeout);
spin_lock_irqsave(&ep->lock, flags);
}
__remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING);
}
/* ep->ovflist鏈表存儲的向用戶傳遞事件時暫存就緒的文件。
* 因此不論是就緒隊列ep->rdllist不爲空,或者ep->ovflist不等於
* EP_UNACTIVE_PTR,都有可能如今已經有文件的狀態就緒。
* ep->ovflist不等於EP_UNACTIVE_PTR有兩種狀況,一種是NULL,此時
* 可能正在向用戶傳遞事件,不必定就有文件狀態就緒,
* 一種狀況時不爲NULL,此時能夠確定有文件狀態就緒,
* 參見ep_send_events()。
*/
eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
spin_unlock_irqrestore(&ep->lock, flags);
/* Try to transfer events to user space. In case we get 0 events and there's still timeout left over, we go trying again in search of more luck. */
/* 若是沒有被信號中斷,而且有事件就緒,可是沒有獲取到事件(有可能被其餘進程獲取到了),而且沒有超時,則跳轉到retry標籤處,從新等待文件狀態就緒。 */
if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;
/* 返回獲取到的事件的個數或者錯誤碼 */
return res;
}
ep_send_events函數向用戶空間發送就緒事件。
ep_send_events()函數將用戶傳入的內存簡單封裝到ep_send_events_data結構中,而後調用ep_scan_ready_list() 將就緒隊列中的事件傳入用戶空間的內存。
用戶空間訪問這個結果,進行處理。
主要參考:
http://blog.chinaunix.net/uid-20687780-id-2105154.html
http://blog.chinaunix.net/uid-20687780-id-2105157.html
http://blog.chinaunix.net/uid-20687780-id-2105159.html
http://www.cnblogs.com/debian/archive/2012/02/16/2354454.html