epoll--源碼剖析

1.epoll_create()

在內核建立一個事件表,事件表用文件表示。因此epoll_create()返回的是一個文件描述符。主要源代碼:node

asmlinkage long sys_epoll_create(int size)
{
    int error, fd;
    struct inode *inode; //inode結構
    struct file *file;  //文件file結構

    if (size <= 0)
        goto eexit_1;

    error = ep_getfd(&fd, &inode, &file); //得到fd以及一個file和inode結構
    if (error)
        goto eexit_1;
        
    error = ep_file_init(file);  //file的初始化
    if (error)
        goto eexit_2;

    return fd;

eexit_2:
    sys_close(fd);
eexit_1:

    return error;
}

(1)ep_getfd()數據結構

爲epoll得到相應file、inode等數據結構,並與新的fd綁定起來。app

首先來看兩個數據結構:函數

  • a.file結構: 定義了Linux中文件所須要的全部信息,表明一個打開的文件

file的部分數據結構:this

struct file {
    
     /* 文件對應的目錄項結構。除了用filp->f_dentry->d_inode的方式來訪問索引節點結構以外,設備驅動程序的開發者們通常無需關心dentry結構。
     */
    struct dentry        *f_dentry;
    
    /**
     * 與文件相關的操做。內核在執行open操做時,對這個指針賦值,之後須要處理這些操做時就讀取這個指針。
     * 不能爲了方便而保存起來。也就是說,能夠在任何須要的時候修改文件的關聯操做。即"方法重載"。
     */
    struct file_operations    *f_op;


    /**
     * open系統調用在調用驅動程序的open方法前將這個指針置爲NULL。驅動程序能夠將這個字段用於任何目的或者忽略這個字段。
     * 驅動程序能夠用這個字段指向已分配的數據,可是必定要在內核銷燬file結構前在release方法中釋放內存。
     * 它是跨系統調用時保存狀態的很是有用的資源。
     */
    void            *private_data;

};
  • inode結構:

inode在內部表示一個文件,與file不一樣,file表示的是文件描述符。同一個文件被打開屢次,會有多個file文件描述符,可是隻會有一個inode。在後面的代碼中你能夠發現,Linux針對epoll的操做專門會有一個屬於epoll的文件系統,這個能夠在初始化inode的時候能夠看到。pwa

這是ep_getfd的代碼:
static int ep_getfd(int *efd, struct inode **einode, struct file **efile)
{
    struct qstr this;
    char name[32];
    struct dentry *dentry;
    struct inode *inode;
    struct file *file;
    int error, fd;


    error = -ENFILE;
    file = get_empty_filp(); //得到新的空的file文件描述符
    if (!file)
        goto eexit_1;

    inode = ep_eventpoll_inode();//得到新的inode結構,表明一個屬於epoll文件系統的文件。
    error = PTR_ERR(inode);
    if (IS_ERR(inode))
        goto eexit_2;

    error = get_unused_fd(); //得到一個未使用的fd
    if (error < 0)
        goto eexit_3;
    fd = error;


    error = -ENOMEM;
    sprintf(name, "[%lu]", inode->i_ino);
    this.name = name;
    this.len = strlen(name);
    this.hash = inode->i_ino;
    dentry = d_alloc(eventpoll_mnt->mnt_sb->s_root, &this);//得到一個新的dentry文件目錄項結構並初始化
    if (!dentry)
        goto eexit_4;
    dentry->d_op = &eventpollfs_dentry_operations;
    d_add(dentry, inode);//將inode和目錄項結構綁定
    file->f_vfsmnt = mntget(eventpoll_mnt); //把該文件所屬的文件系統置爲epoll文件系統
    file->f_dentry = dentry;
    file->f_mapping = inode->i_mapping;

    file->f_pos = 0;
    file->f_flags = O_RDONLY;
    file->f_op = &eventpoll_fops; //這一步是很重要的一步,用f_op指針指向epoll的回調數
    file->f_mode = FMODE_READ;
    file->f_version = 0;
    file->private_data = NULL;

    fd_install(fd, file);

    *efd = fd;
    *einode = inode;
    *efile = file;
    return 0;

eexit_4:
    put_unused_fd(fd);
eexit_3:
    iput(inode);
eexit_2:
    put_filp(file);
eexit_1:
    return error;
}
ep_getfd其實就作了三件事:獲取新的file文件描述符,獲取新的inode結構,得到新的fd,最後把三者鏈接綁定在一塊兒,就表示了epoll在內核的事件表。

(2) ep_file_init(file)指針

前面講到epoll的內核事件表已經建立完畢了,可是咱們能夠發現epoll是事件一次寫入內核,屢次監聽的,然而在以前都沒有發現能夠存事件的數據結構紅黑樹。因此這一步ep_file_init就是來解決這個問題的。

下面根據代碼來講它幹了些什麼rest

static int ep_file_init(struct file *file)
{
    struct eventpoll *ep;  //一個指向eventpoll的指針

    if (!(ep = kmalloc(sizeof(struct eventpoll), GFP_KERNEL)))
        return -ENOMEM;

    /*下面是對eventpoll結構的一系列初始化*/
    memset(ep, 0, sizeof(*ep));
    rwlock_init(&ep->lock);
    init_rwsem(&ep->sem);
    init_waitqueue_head(&ep->wq);
    init_waitqueue_head(&ep->poll_wait);
    INIT_LIST_HEAD(&ep->rdllist);
    ep->rbr = RB_ROOT; //這裏能夠看到紅黑樹的root的初始化

    file->private_data = ep;    //ep指針給file

    DNPRINTK(3, (KERN_INFO "[%p] eventpoll: ep_file_init() ep=%p\n",
             current, ep));
    return 0;
}
這一步就是把一個eventpoll的結構建立並初始化,而後讓file->private_data指向這個結構,這個結構中咱們就能夠找到rb_root,這一步以後epoll_creat()也就是epoll全部的準備工做就已經作完了。

2.epoll_ctl()

由於epoll對監聽事件來講是一次寫入屢次監聽的,因此必需要有對事件表的增刪改操做接口,epoll_ctl就是提供給用戶的能夠進行事件表進行操做的接口。咱們能夠經過這個系統調用來添加刪除和修改事件。code

下面根據源碼來走一遍:
asmlinkage long
sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event)
{
    /**一系列的數據結構的定義**/
    int error;
    struct file *file, *tfile;
    struct eventpoll *ep;
    struct epitem *epi;
    struct epoll_event epds;


    error = -EFAULT;
    if (EP_OP_HASH_EVENT(op) &&
        copy_from_user(&epds, event, sizeof(struct epoll_event)))
        goto eexit_1;//把用戶的事件從用戶空間考到內核空間


    error = -EBADF;
    file = fget(epfd); //得到epoll內核事件表得文件描述符
    if (!file)
        goto eexit_1;

    tfile = fget(fd); //得到要操做的事件得文件描述符
    if (!tfile)
        goto eexit_2;


    error = -EPERM;
    if (!tfile->f_op || !tfile->f_op->poll)
        goto eexit_3; //若是fd文件描述符中f_op(指向全部文件操做指針得結構體)和這裏須要用到的poll操做爲空,就退出。由於poll是全部Io複用:select 、poll、epoll得底層實現。

    
    error = -EINVAL;
    if (file == tfile || !IS_FILE_EPOLL(file))
        goto eexit_3;   //判斷須要操做的fd文件描述符是否是epfd文件描述符和 file是否是epoll文件系統的文件

    
    ep = file->private_data;  //拿到內核事件表的eventpoll結構體

    down_write(&ep->sem); //爲寫得到讀寫內核事件表eventpoll的信號量。

    
    epi = ep_find(ep, tfile, fd);//判斷該事件在內核事件表中是否存在

    error = -EINVAL;
    switch (op) {
    case EPOLL_CTL_ADD:  //添加事件操做
        if (!epi) {
            epds.events |= POLLERR | POLLHUP;

            error = ep_insert(ep, &epds, tfile, fd);
        } else
            error = -EEXIST;
        break;
    case EPOLL_CTL_DEL: //刪除事件
        if (epi)
            error = ep_remove(ep, epi);
        else
            error = -ENOENT;
        break;
    case EPOLL_CTL_MOD:   //修改事件
        if (epi) {
            epds.events |= POLLERR | POLLHUP;
            error = ep_modify(ep, epi, &epds);
        } else
            error = -ENOENT;
        break;
    }

    
    if (epi)
        ep_release_epitem(epi);

    up_write(&ep->sem); //釋放內核事件表eventpoll的讀寫信號量

eexit_3:
    fput(tfile);
eexit_2:
    fput(file);
eexit_1:
    DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_ctl(%d, %d, %d, %p) = %d\n",
             current, epfd, op, fd, event, error));

    return error;
}

在epoll_ctl中,主要分爲如下幾個步驟:索引

  • 判斷須要操做的事件在事件表中是否存在
  • 判斷須要進行的操做
  • 使用更底層的代碼對事件表進行增刪改

epoll比poll和select高效的地方,其實在這裏就能夠 看出來,poll的系統調用會把該進程「掛」到fd所對應的設備的等待隊列上,select和poll他們每一次都須要把current "掛"到fd對應設備的等待隊列,同一個fd來講,你要不停監聽的話,你就須要不停的進行poll,那麼fd太多的話,這樣的poll操做實際上是很低效的,epoll則不一樣的是,每個事件,它只在添加的時候調用poll,之後都不用去調用poll,避免了太多重複的poll操做,因此epoll對於poll和select來講是更高效的。
還有一個比較有趣的地方是,全部的事件的存儲都是在紅黑樹裏面,可是咱們能夠發現紅黑樹的節點實際上是這樣的:

struct rb_node
{
    /**
     * 紅黑樹節點的雙親。
     */
    struct rb_node *rb_parent;
    /**
     * 紅黑樹節點顏色。
     */
    int rb_color;
#define    RB_RED        0
#define    RB_BLACK    1
    /**
     * 右孩子
     */
    struct rb_node *rb_right;
    /**
     * 左孩子
     */
    struct rb_node *rb_left;
};

struct rb_root
{
    struct rb_node *rb_node;
};
不論是根節點仍是節點,你都看不到數據,那內核是怎麼經過紅黑樹操做數據?
struct epitem {
    
    struct rb_node rbn;
    ........
    }
其實這纔是真正的紅黑樹的節點,裏面的東西不少,可是這裏只列出來了相關的。一個fd對應一個這樣的結構,同一個fd只能插入一次,這也是要採用一個紅黑樹這樣的數據結構的其中一個緣由,還有一個緣由就是由於紅黑樹的查詢等操做效率高。那麼從rb_node到struct epitem就能夠很容易的作到了,只須要作一個指針的強轉就能夠從rb_node到epitem:
epic = rb_entry(parent, struct epitem, rbn);


#define    rb_entry(ptr, type, member) container_of(ptr, type, member)


#define container_of(ptr, type, member) ({            \
        const typeof( ((type *)0)->member ) *__mptr = (ptr);    \
        (type *)( (char *)__mptr - offsetof(type,member) );})
這也就跟C++的模板相似,只不過這是C語言實現的,不少方法仍是須要再從新定義。可是對C語言來講,在有限的條件下作到這樣的代碼複用仍是很是厲害的。

3.epoll_wait()

以前的兩個函數已經把事件添加到內核事件表,並且已經把當前進程「掛」到fd的全部設備上,這就至關於一個回調函數,當對應fd有可處理事件時,就會喚醒等待隊列的進程,進程會把當前可處理的事件及有關信息記錄到一個rdllist的鏈表中,接下來就是epoll_wait所要作的事了:
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
           int maxevents, long timeout)
{
    /*定義一些必要的數據結構*/
    int res, eavail;
    unsigned long flags;
    long jtimeout;
    wait_queue_t wait;

    /*檢查超時時間是否有效*/
    jtimeout = timeout == -1 || timeout > (MAX_SCHEDULE_TIMEOUT - 1000) / HZ ?
        MAX_SCHEDULE_TIMEOUT: (timeout * HZ + 999) / 1000;

retry:
    /*得到eventpoll的寫鎖*/
    write_lock_irqsave(&ep->lock, flags);


    /*rdllist若是是空就阻塞循環等待回調函數往rdllist中寫數據,一旦不爲空或者超過超時時間就會退出循環*/
    res = 0;
    if (list_empty(&ep->rdllist)) {
        
        init_waitqueue_entry(&wait, current);
        add_wait_queue(&ep->wq, &wait);

        for (;;) {
        
            set_current_state(TASK_INTERRUPTIBLE);
            if (!list_empty(&ep->rdllist) || !jtimeout)
                break;
            if (signal_pending(current)) {
                res = -EINTR;
                break;
            }

            write_unlock_irqrestore(&ep->lock, flags);
            jtimeout = schedule_timeout(jtimeout);
            write_lock_irqsave(&ep->lock, flags);
        }
        remove_wait_queue(&ep->wq, &wait);

        set_current_state(TASK_RUNNING);
    }

    /*判斷rdllist是否爲空*/
    eavail = !list_empty(&ep->rdllist); 

    /*釋放eventpoll的寫鎖*/
    write_unlock_irqrestore(&ep->lock, flags);


    /*這一步是把內核的rdllist中的事件copy到用戶空間*/
    if (!res && eavail &&
        !(res = ep_events_transfer(ep, events, maxevents)) && jtimeout)
        goto retry;

    return res;
}

簡單總結一下epoll_wait:

  • 檢查超時事件和用來存就緒事件的用戶空間的大小
  • 循環等待就緒事件
  • 把就緒事件從內核空間copy到用戶空間

這就是epoll的整個流程和主要的步驟的分析。可是咱們沒能看出來它的ET和LT工做模式體如今哪裏?因此下面就來講說epoll的ET模式具體實現。

4.ET模式

ET模式是epoll特有的高效工做模式。主要體現就是一次就緒事件只給用戶提醒一次。ET模式的實現其實就是在epoll_wait的最後一步,從內核鏈表往用戶空間考數據的時候,下面來看代碼:

static int ep_events_transfer(struct eventpoll *ep,
                  struct epoll_event __user *events, int maxevents)
{
    int eventcnt = 0;
    struct list_head txlist;  //創建一個臨時量,用以保存就緒的事件

    INIT_LIST_HEAD(&txlist); //初始化

    
    down_read(&ep->sem);  //得到ep的讀寫信號量


    /*從ep->rdllist中copy到txlist*/
    if (ep_collect_ready_items(ep, &txlist, maxevents) > 0) {
        /*從txlist到用戶空間*/
        eventcnt = ep_send_events(ep, &txlist, events);

        /*從txlist再反過來copy給ep->rdllist,這一步是具體的ET實現*/
        ep_reinject_items(ep, &txlist);
    }

    up_read(&ep->sem); //釋放ep讀寫信號量

    return eventcnt;
}
下面的函數就是ET的具體實現,也是ET和LT的區別
static void ep_reinject_items(struct eventpoll *ep, struct list_head *txlist)
{
    int ricnt = 0, pwake = 0;
    unsigned long flags;
    struct epitem *epi;

    /*得到ep的讀寫鎖*/
    write_lock_irqsave(&ep->lock, flags);

    while (!list_empty(txlist)) {
        /*這一步跟以前講過的rb_node到epi的一步是同樣的*/
        epi = list_entry(txlist->next, struct epitem, txlink);
        
        /*初始化*/
        EP_LIST_DEL(&epi->txlink);

        /*
        1.內核事件表(紅黑樹)不爲空
        2.事件沒有設置ET工做模式
        3.就緒事件類型和監聽事件類型相同
        4.該事件的rdllink不爲空
        */
        if (EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) &&
            (epi->revents & epi->event.events) && !EP_IS_LINKED(&epi->rdllink)) {
        /*把剛剛臨時量txlist中的該事件繼續添加到rdllist中*/
            list_add_tail(&epi->rdllink, &ep->rdllist);
            ricnt++;
        }
    }

    if (ricnt) {
    
        if (waitqueue_active(&ep->wq))
            wake_up(&ep->wq);
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
    }

    /*釋放讀寫鎖*/
    write_unlock_irqrestore(&ep->lock, flags);


    if (pwake)
        ep_poll_safewake(&psw, &ep->poll_wait);
}
先說LT模式對於同一個就緒事件會重複提醒,從上面能夠看出來是由於它又把依舊就緒且未設置ET標誌的事件從新copy到了rdllist中,因此下一次epoll_wait仍是會把該事件返回給用戶。那麼ET這裏就很好解釋了,儘管該事件未處理完,可是你只要設置了ET標誌,我就不會再次把該事件返回給用戶。這就是ET的實現。
相關文章
相關標籤/搜索