在內核建立一個事件表,事件表用文件表示。因此epoll_create()返回的是一個文件描述符。主要源代碼:node
asmlinkage long sys_epoll_create(int size) { int error, fd; struct inode *inode; //inode結構 struct file *file; //文件file結構 if (size <= 0) goto eexit_1; error = ep_getfd(&fd, &inode, &file); //得到fd以及一個file和inode結構 if (error) goto eexit_1; error = ep_file_init(file); //file的初始化 if (error) goto eexit_2; return fd; eexit_2: sys_close(fd); eexit_1: return error; }
(1)ep_getfd()數據結構
爲epoll得到相應file、inode等數據結構,並與新的fd綁定起來。app
首先來看兩個數據結構:函數
- a.file結構: 定義了Linux中文件所須要的全部信息,表明一個打開的文件
file的部分數據結構:this
struct file { /* 文件對應的目錄項結構。除了用filp->f_dentry->d_inode的方式來訪問索引節點結構以外,設備驅動程序的開發者們通常無需關心dentry結構。 */ struct dentry *f_dentry; /** * 與文件相關的操做。內核在執行open操做時,對這個指針賦值,之後須要處理這些操做時就讀取這個指針。 * 不能爲了方便而保存起來。也就是說,能夠在任何須要的時候修改文件的關聯操做。即"方法重載"。 */ struct file_operations *f_op; /** * open系統調用在調用驅動程序的open方法前將這個指針置爲NULL。驅動程序能夠將這個字段用於任何目的或者忽略這個字段。 * 驅動程序能夠用這個字段指向已分配的數據,可是必定要在內核銷燬file結構前在release方法中釋放內存。 * 它是跨系統調用時保存狀態的很是有用的資源。 */ void *private_data; };
- inode結構:
inode在內部表示一個文件,與file不一樣,file表示的是文件描述符。同一個文件被打開屢次,會有多個file文件描述符,可是隻會有一個inode。在後面的代碼中你能夠發現,Linux針對epoll的操做專門會有一個屬於epoll的文件系統,這個能夠在初始化inode的時候能夠看到。pwa
這是ep_getfd的代碼:
static int ep_getfd(int *efd, struct inode **einode, struct file **efile) { struct qstr this; char name[32]; struct dentry *dentry; struct inode *inode; struct file *file; int error, fd; error = -ENFILE; file = get_empty_filp(); //得到新的空的file文件描述符 if (!file) goto eexit_1; inode = ep_eventpoll_inode();//得到新的inode結構,表明一個屬於epoll文件系統的文件。 error = PTR_ERR(inode); if (IS_ERR(inode)) goto eexit_2; error = get_unused_fd(); //得到一個未使用的fd if (error < 0) goto eexit_3; fd = error; error = -ENOMEM; sprintf(name, "[%lu]", inode->i_ino); this.name = name; this.len = strlen(name); this.hash = inode->i_ino; dentry = d_alloc(eventpoll_mnt->mnt_sb->s_root, &this);//得到一個新的dentry文件目錄項結構並初始化 if (!dentry) goto eexit_4; dentry->d_op = &eventpollfs_dentry_operations; d_add(dentry, inode);//將inode和目錄項結構綁定 file->f_vfsmnt = mntget(eventpoll_mnt); //把該文件所屬的文件系統置爲epoll文件系統 file->f_dentry = dentry; file->f_mapping = inode->i_mapping; file->f_pos = 0; file->f_flags = O_RDONLY; file->f_op = &eventpoll_fops; //這一步是很重要的一步,用f_op指針指向epoll的回調數 file->f_mode = FMODE_READ; file->f_version = 0; file->private_data = NULL; fd_install(fd, file); *efd = fd; *einode = inode; *efile = file; return 0; eexit_4: put_unused_fd(fd); eexit_3: iput(inode); eexit_2: put_filp(file); eexit_1: return error; }
ep_getfd其實就作了三件事:獲取新的file文件描述符,獲取新的inode結構,得到新的fd,最後把三者鏈接綁定在一塊兒,就表示了epoll在內核的事件表。(2) ep_file_init(file)指針
前面講到epoll的內核事件表已經建立完畢了,可是咱們能夠發現epoll是事件一次寫入內核,屢次監聽的,然而在以前都沒有發現能夠存事件的數據結構紅黑樹。因此這一步ep_file_init就是來解決這個問題的。下面根據代碼來講它幹了些什麼rest
static int ep_file_init(struct file *file) { struct eventpoll *ep; //一個指向eventpoll的指針 if (!(ep = kmalloc(sizeof(struct eventpoll), GFP_KERNEL))) return -ENOMEM; /*下面是對eventpoll結構的一系列初始化*/ memset(ep, 0, sizeof(*ep)); rwlock_init(&ep->lock); init_rwsem(&ep->sem); init_waitqueue_head(&ep->wq); init_waitqueue_head(&ep->poll_wait); INIT_LIST_HEAD(&ep->rdllist); ep->rbr = RB_ROOT; //這裏能夠看到紅黑樹的root的初始化 file->private_data = ep; //ep指針給file DNPRINTK(3, (KERN_INFO "[%p] eventpoll: ep_file_init() ep=%p\n", current, ep)); return 0; }
這一步就是把一個eventpoll的結構建立並初始化,而後讓file->private_data指向這個結構,這個結構中咱們就能夠找到rb_root,這一步以後epoll_creat()也就是epoll全部的準備工做就已經作完了。
由於epoll對監聽事件來講是一次寫入屢次監聽的,因此必需要有對事件表的增刪改操做接口,epoll_ctl就是提供給用戶的能夠進行事件表進行操做的接口。咱們能夠經過這個系統調用來添加刪除和修改事件。code
下面根據源碼來走一遍:
asmlinkage long sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event) { /**一系列的數據結構的定義**/ int error; struct file *file, *tfile; struct eventpoll *ep; struct epitem *epi; struct epoll_event epds; error = -EFAULT; if (EP_OP_HASH_EVENT(op) && copy_from_user(&epds, event, sizeof(struct epoll_event))) goto eexit_1;//把用戶的事件從用戶空間考到內核空間 error = -EBADF; file = fget(epfd); //得到epoll內核事件表得文件描述符 if (!file) goto eexit_1; tfile = fget(fd); //得到要操做的事件得文件描述符 if (!tfile) goto eexit_2; error = -EPERM; if (!tfile->f_op || !tfile->f_op->poll) goto eexit_3; //若是fd文件描述符中f_op(指向全部文件操做指針得結構體)和這裏須要用到的poll操做爲空,就退出。由於poll是全部Io複用:select 、poll、epoll得底層實現。 error = -EINVAL; if (file == tfile || !IS_FILE_EPOLL(file)) goto eexit_3; //判斷須要操做的fd文件描述符是否是epfd文件描述符和 file是否是epoll文件系統的文件 ep = file->private_data; //拿到內核事件表的eventpoll結構體 down_write(&ep->sem); //爲寫得到讀寫內核事件表eventpoll的信號量。 epi = ep_find(ep, tfile, fd);//判斷該事件在內核事件表中是否存在 error = -EINVAL; switch (op) { case EPOLL_CTL_ADD: //添加事件操做 if (!epi) { epds.events |= POLLERR | POLLHUP; error = ep_insert(ep, &epds, tfile, fd); } else error = -EEXIST; break; case EPOLL_CTL_DEL: //刪除事件 if (epi) error = ep_remove(ep, epi); else error = -ENOENT; break; case EPOLL_CTL_MOD: //修改事件 if (epi) { epds.events |= POLLERR | POLLHUP; error = ep_modify(ep, epi, &epds); } else error = -ENOENT; break; } if (epi) ep_release_epitem(epi); up_write(&ep->sem); //釋放內核事件表eventpoll的讀寫信號量 eexit_3: fput(tfile); eexit_2: fput(file); eexit_1: DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_ctl(%d, %d, %d, %p) = %d\n", current, epfd, op, fd, event, error)); return error; }
在epoll_ctl中,主要分爲如下幾個步驟:索引
- 判斷須要操做的事件在事件表中是否存在
- 判斷須要進行的操做
- 使用更底層的代碼對事件表進行增刪改
epoll比poll和select高效的地方,其實在這裏就能夠 看出來,poll的系統調用會把該進程「掛」到fd所對應的設備的等待隊列上,select和poll他們每一次都須要把current "掛"到fd對應設備的等待隊列,同一個fd來講,你要不停監聽的話,你就須要不停的進行poll,那麼fd太多的話,這樣的poll操做實際上是很低效的,epoll則不一樣的是,每個事件,它只在添加的時候調用poll,之後都不用去調用poll,避免了太多重複的poll操做,因此epoll對於poll和select來講是更高效的。
還有一個比較有趣的地方是,全部的事件的存儲都是在紅黑樹裏面,可是咱們能夠發現紅黑樹的節點實際上是這樣的:
struct rb_node { /** * 紅黑樹節點的雙親。 */ struct rb_node *rb_parent; /** * 紅黑樹節點顏色。 */ int rb_color; #define RB_RED 0 #define RB_BLACK 1 /** * 右孩子 */ struct rb_node *rb_right; /** * 左孩子 */ struct rb_node *rb_left; }; struct rb_root { struct rb_node *rb_node; };
不論是根節點仍是節點,你都看不到數據,那內核是怎麼經過紅黑樹操做數據?
struct epitem { struct rb_node rbn; ........ }
其實這纔是真正的紅黑樹的節點,裏面的東西不少,可是這裏只列出來了相關的。一個fd對應一個這樣的結構,同一個fd只能插入一次,這也是要採用一個紅黑樹這樣的數據結構的其中一個緣由,還有一個緣由就是由於紅黑樹的查詢等操做效率高。那麼從rb_node到struct epitem就能夠很容易的作到了,只須要作一個指針的強轉就能夠從rb_node到epitem:
epic = rb_entry(parent, struct epitem, rbn); #define rb_entry(ptr, type, member) container_of(ptr, type, member) #define container_of(ptr, type, member) ({ \ const typeof( ((type *)0)->member ) *__mptr = (ptr); \ (type *)( (char *)__mptr - offsetof(type,member) );})
這也就跟C++的模板相似,只不過這是C語言實現的,不少方法仍是須要再從新定義。可是對C語言來講,在有限的條件下作到這樣的代碼複用仍是很是厲害的。
以前的兩個函數已經把事件添加到內核事件表,並且已經把當前進程「掛」到fd的全部設備上,這就至關於一個回調函數,當對應fd有可處理事件時,就會喚醒等待隊列的進程,進程會把當前可處理的事件及有關信息記錄到一個rdllist的鏈表中,接下來就是epoll_wait所要作的事了:
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout) { /*定義一些必要的數據結構*/ int res, eavail; unsigned long flags; long jtimeout; wait_queue_t wait; /*檢查超時時間是否有效*/ jtimeout = timeout == -1 || timeout > (MAX_SCHEDULE_TIMEOUT - 1000) / HZ ? MAX_SCHEDULE_TIMEOUT: (timeout * HZ + 999) / 1000; retry: /*得到eventpoll的寫鎖*/ write_lock_irqsave(&ep->lock, flags); /*rdllist若是是空就阻塞循環等待回調函數往rdllist中寫數據,一旦不爲空或者超過超時時間就會退出循環*/ res = 0; if (list_empty(&ep->rdllist)) { init_waitqueue_entry(&wait, current); add_wait_queue(&ep->wq, &wait); for (;;) { set_current_state(TASK_INTERRUPTIBLE); if (!list_empty(&ep->rdllist) || !jtimeout) break; if (signal_pending(current)) { res = -EINTR; break; } write_unlock_irqrestore(&ep->lock, flags); jtimeout = schedule_timeout(jtimeout); write_lock_irqsave(&ep->lock, flags); } remove_wait_queue(&ep->wq, &wait); set_current_state(TASK_RUNNING); } /*判斷rdllist是否爲空*/ eavail = !list_empty(&ep->rdllist); /*釋放eventpoll的寫鎖*/ write_unlock_irqrestore(&ep->lock, flags); /*這一步是把內核的rdllist中的事件copy到用戶空間*/ if (!res && eavail && !(res = ep_events_transfer(ep, events, maxevents)) && jtimeout) goto retry; return res; }
簡單總結一下epoll_wait:
- 檢查超時事件和用來存就緒事件的用戶空間的大小
- 循環等待就緒事件
- 把就緒事件從內核空間copy到用戶空間
這就是epoll的整個流程和主要的步驟的分析。可是咱們沒能看出來它的ET和LT工做模式體如今哪裏?因此下面就來講說epoll的ET模式具體實現。
ET模式是epoll特有的高效工做模式。主要體現就是一次就緒事件只給用戶提醒一次。ET模式的實現其實就是在epoll_wait的最後一步,從內核鏈表往用戶空間考數據的時候,下面來看代碼:
static int ep_events_transfer(struct eventpoll *ep, struct epoll_event __user *events, int maxevents) { int eventcnt = 0; struct list_head txlist; //創建一個臨時量,用以保存就緒的事件 INIT_LIST_HEAD(&txlist); //初始化 down_read(&ep->sem); //得到ep的讀寫信號量 /*從ep->rdllist中copy到txlist*/ if (ep_collect_ready_items(ep, &txlist, maxevents) > 0) { /*從txlist到用戶空間*/ eventcnt = ep_send_events(ep, &txlist, events); /*從txlist再反過來copy給ep->rdllist,這一步是具體的ET實現*/ ep_reinject_items(ep, &txlist); } up_read(&ep->sem); //釋放ep讀寫信號量 return eventcnt; }
下面的函數就是ET的具體實現,也是ET和LT的區別
static void ep_reinject_items(struct eventpoll *ep, struct list_head *txlist) { int ricnt = 0, pwake = 0; unsigned long flags; struct epitem *epi; /*得到ep的讀寫鎖*/ write_lock_irqsave(&ep->lock, flags); while (!list_empty(txlist)) { /*這一步跟以前講過的rb_node到epi的一步是同樣的*/ epi = list_entry(txlist->next, struct epitem, txlink); /*初始化*/ EP_LIST_DEL(&epi->txlink); /* 1.內核事件表(紅黑樹)不爲空 2.事件沒有設置ET工做模式 3.就緒事件類型和監聽事件類型相同 4.該事件的rdllink不爲空 */ if (EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) && (epi->revents & epi->event.events) && !EP_IS_LINKED(&epi->rdllink)) { /*把剛剛臨時量txlist中的該事件繼續添加到rdllist中*/ list_add_tail(&epi->rdllink, &ep->rdllist); ricnt++; } } if (ricnt) { if (waitqueue_active(&ep->wq)) wake_up(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; } /*釋放讀寫鎖*/ write_unlock_irqrestore(&ep->lock, flags); if (pwake) ep_poll_safewake(&psw, &ep->poll_wait); }
先說LT模式對於同一個就緒事件會重複提醒,從上面能夠看出來是由於它又把依舊就緒且未設置ET標誌的事件從新copy到了rdllist中,因此下一次epoll_wait仍是會把該事件返回給用戶。那麼ET這裏就很好解釋了,儘管該事件未處理完,可是你只要設置了ET標誌,我就不會再次把該事件返回給用戶。這就是ET的實現。