select、poll、epoll之間的區別

  select,poll,epoll都是IO多路複用的機制。I/O多路複用就經過一種機制,能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做。但select,poll,epoll本質上都是同步I/O,由於他們都須要在讀寫事件就緒後本身負責進行讀寫,也就是說這個讀寫過程是阻塞的,而異步I/O則無需本身負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間。關於這三種IO多路複用的用法,前面三篇總結寫的很清楚,並用服務器回射echo程序進行了測試。鏈接以下所示:html

select:http://www.cnblogs.com/Anker/archive/2013/08/14/3258674.htmllinux

poll:http://www.cnblogs.com/Anker/archive/2013/08/15/3261006.html程序員

epoll:http://www.cnblogs.com/Anker/archive/2013/08/17/3263780.htmlweb

  今天對這三種IO多路複用進行對比,參考網上和書上面的資料,整理以下:數據庫

一、select實現編程

select的調用過程以下所示:數組

(1)使用copy_from_user從用戶空間拷貝fd_set到內核空間緩存

(2)註冊回調函數__pollwaittomcat

(3)遍歷全部fd,調用其對應的poll方法(對於socket,這個poll方法是sock_poll,sock_poll根據狀況會調用到tcp_poll,udp_poll或者datagram_poll)安全

(4)以tcp_poll爲例,其核心實現就是__pollwait,也就是上面註冊的回調函數。

(5)__pollwait的主要工做就是把current(當前進程)掛到設備的等待隊列中,不一樣的設備有不一樣的等待隊列,對於tcp_poll來講,其等待隊列是sk->sk_sleep(注意把進程掛到等待隊列中並不表明進程已經睡眠了)。在設備收到一條消息(網絡設備)或填寫完文件數據(磁盤設備)後,會喚醒設備等待隊列上睡眠的進程,這時current便被喚醒了。

(6)poll方法返回時會返回一個描述讀寫操做是否就緒的mask掩碼,根據這個mask掩碼給fd_set賦值。

(7)若是遍歷完全部的fd,尚未返回一個可讀寫的mask掩碼,則會調用schedule_timeout是調用select的進程(也就是current)進入睡眠。當設備驅動發生自身資源可讀寫後,會喚醒其等待隊列上睡眠的進程。若是超過必定的超時時間(schedule_timeout指定),仍是沒人喚醒,則調用select的進程會從新被喚醒得到CPU,進而從新遍歷fd,判斷有沒有就緒的fd。

(8)把fd_set從內核空間拷貝到用戶空間。

總結:

select的幾大缺點:

(1)每次調用select,都須要把fd集合從用戶態拷貝到內核態,這個開銷在fd不少時會很大

(2)同時每次調用select都須要在內核遍歷傳遞進來的全部fd,這個開銷在fd不少時也很大

(3)select支持的文件描述符數量過小了,默認是1024

2 poll實現

  poll的實現和select很是類似,只是描述fd集合的方式不一樣,poll使用pollfd結構而不是select的fd_set結構,其餘的都差很少。

關於select和poll的實現分析,能夠參考下面幾篇博文:

http://blog.csdn.net/lizhiguo0532/article/details/6568964#comments

http://blog.csdn.net/lizhiguo0532/article/details/6568968

http://blog.csdn.net/lizhiguo0532/article/details/6568969

http://www.ibm.com/developerworks/cn/linux/l-cn-edntwk/index.html?ca=drs-

 

三、epoll

  epoll既然是對select和poll的改進,就應該能避免上述的三個缺點。那epoll都是怎麼解決的呢?在此以前,咱們先看一下epoll和select和poll的調用接口上的不一樣,select和poll都只提供了一個函數——select或者poll函數。而epoll提供了三個函數,epoll_create,epoll_ctl和epoll_wait,epoll_create是建立一個epoll句柄;epoll_ctl是註冊要監聽的事件類型;epoll_wait則是等待事件的產生。

  對於第一個缺點,epoll的解決方案在epoll_ctl函數中。每次註冊新的事件到epoll句柄中時(在epoll_ctl中指定EPOLL_CTL_ADD),會把全部的fd拷貝進內核,而不是在epoll_wait的時候重複拷貝。epoll保證了每一個fd在整個過程當中只會拷貝一次。

  對於第二個缺點,epoll的解決方案不像select或poll同樣每次都把current輪流加入fd對應的設備等待隊列中,而只在epoll_ctl時把current掛一遍(這一遍必不可少)併爲每一個fd指定一個回調函數,當設備就緒,喚醒等待隊列上的等待者時,就會調用這個回調函數,而這個回調函數會把就緒的fd加入一個就緒鏈表)。epoll_wait的工做實際上就是在這個就緒鏈表中查看有沒有就緒的fd(利用schedule_timeout()實現睡一會,判斷一會的效果,和select實現中的第7步是相似的)。

  對於第三個缺點,epoll沒有這個限制,它所支持的FD上限是最大能夠打開文件的數目,這個數字通常遠大於2048,舉個例子,在1GB內存的機器上大約是10萬左右,具體數目能夠cat /proc/sys/fs/file-max察看,通常來講這個數目和系統內存關係很大。

總結:

(1)select,poll實現須要本身不斷輪詢全部fd集合,直到設備就緒,期間可能要睡眠和喚醒屢次交替。而epoll其實也須要調用epoll_wait不斷輪詢就緒鏈表,期間也可能屢次睡眠和喚醒交替,可是它是設備就緒時,調用回調函數,把就緒fd放入就緒鏈表中,並喚醒在epoll_wait中進入睡眠的進程。雖然都要睡眠和交替,可是select和poll在「醒着」的時候要遍歷整個fd集合,而epoll在「醒着」的時候只要判斷一下就緒鏈表是否爲空就好了,這節省了大量的CPU時間。這就是回調機制帶來的性能提高。

(2)select,poll每次調用都要把fd集合從用戶態往內核態拷貝一次,而且要把current往設備等待隊列中掛一次,而epoll只要一次拷貝,並且把current往等待隊列上掛也只掛一次(在epoll_wait的開始,注意這裏的等待隊列並非設備等待隊列,只是一個epoll內部定義的等待隊列)。這也能節省很多的開銷。

參考資料:

http://www.cnblogs.com/apprentice89/archive/2013/05/09/3070051.html

http://www.linuxidc.com/Linux/2012-05/59873p3.htm

http://xingyunbaijunwei.blog.163.com/blog/static/76538067201241685556302/

http://blog.csdn.net/kkxgx/article/details/7717125

 

 

 

select(poll)系統調用實現解析

      上層要能使用select()和poll()系統調用來監測某個設備文件描述符,那麼就必須實現這個設備驅動程序中struct file_operation結構體的poll函數,爲何?

由於這兩個系統調用最終都會調用驅動程序中的poll函數來初始化一個等待隊列項, 而後將其加入到驅動程序中的等待隊列頭,這樣就能夠在硬件可讀寫的時候wake up這個等待隊列頭,而後等待(能夠是多個)同一個硬件設備可讀寫事件的進程都將被喚醒。

(這個等待隊列頭能夠包含多個等待隊列項,這些不一樣的等待隊列項是由不一樣的應用程序調用select或者poll來監測同一個硬件設備的時候調用file_operation的poll函數初始化填充的)。

       下面就以select系統調用分析具體實現,源碼路徑:fs/select.c。

1、          select()系統調用代碼走讀

調用順序以下:sys_select() à core_sys_select() à do_select() à fop->poll()

SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,

fd_set __user *, exp, struct timeval __user *, tvp)

{

       struct timespec end_time, *to = NULL;

       struct timeval tv;

       int ret;

 

       if (tvp) {// 若是超時值非NULL

              if (copy_from_user(&tv, tvp, sizeof(tv)))   // 從用戶空間取數據到內核空間

                     return -EFAULT;

              to = &end_time;

              // 獲得timespec格式的將來超時時間

              if (poll_select_set_timeout(to,

                            tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),

                            (tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))

                     return -EINVAL;

       }

 

       ret = core_sys_select(n, inp, outp, exp, to);             // 關鍵函數

       ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);

       /*若是有超時值, 並拷貝離超時時刻還剩的時間到用戶空間的timeval中*/

       return ret;             // 返回就緒的文件描述符的個數

}

==================================================================

       core_sys_select()函數解析

int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,

                        fd_set __user *exp, struct timespec *end_time)

{

       fd_set_bits fds;

       /**

       typedef struct {

              unsigned long *in, *out, *ex;

              unsigned long *res_in, *res_out, *res_ex;

} fd_set_bits;

這個結構體中定義的全是指針,這些指針都是用來指向描述符集合的。

**/

       void *bits;

       int ret, max_fds;

       unsigned int size;

       struct fdtable *fdt;

       /* Allocate small arguments on the stack to save memory and be faster */

       long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];

       // 256/32 = 8, stack中分配的空間

       /**

       @ include/linux/poll.h

#define FRONTEND_STACK_ALLOC     256

#define SELECT_STACK_ALLOC    FRONTEND_STACK_ALLOC

       **/

       ret = -EINVAL;

       if (n < 0)

              goto out_nofds;

       /* max_fds can increase, so grab it once to avoid race */

       rcu_read_lock();

       fdt = files_fdtable(current->files); // RCU ref, 獲取當前進程的文件描述符表

       max_fds = fdt->max_fds;

       rcu_read_unlock();

       if (n > max_fds)// 若是傳入的n大於當前進程最大的文件描述符,給予修正

              n = max_fds;

       /*

        * We need 6 bitmaps (in/out/ex for both incoming and outgoing),

        * since we used fdset we need to allocate memory in units of

        * long-words.

        */

       size = FDS_BYTES(n);

       // 以一個文件描述符佔一bit來計算,傳遞進來的這些fd_set須要用掉多少個字

       bits = stack_fds;

       if (size > sizeof(stack_fds) / 6) {

              // 除6,爲何?由於每一個文件描述符須要6個bitmaps

              /* Not enough space in on-stack array; must use kmalloc */

              ret = -ENOMEM;

              bits = kmalloc(6 * size, GFP_KERNEL); // stack中分配的過小,直接kmalloc

              if (!bits)

                     goto out_nofds;

       }

       // 這裏就能夠明顯看出struct fd_set_bits結構體的用處了。

       fds.in      = bits;

       fds.out     = bits +   size;

       fds.ex      = bits + 2*size;

       fds.res_in  = bits + 3*size;

       fds.res_out = bits + 4*size;

       fds.res_ex  = bits + 5*size;

       // get_fd_set僅僅調用copy_from_user從用戶空間拷貝了fd_set

       if ((ret = get_fd_set(n, inp, fds.in)) ||

           (ret = get_fd_set(n, outp, fds.out)) ||

           (ret = get_fd_set(n, exp, fds.ex)))

              goto out;

       zero_fd_set(n, fds.res_in);  // 對這些存放返回狀態的字段清0

       zero_fd_set(n, fds.res_out);

       zero_fd_set(n, fds.res_ex);

 

       ret = do_select(n, &fds, end_time);    // 關鍵函數,完成主要的工做

       if (ret < 0)             // 有錯誤

              goto out;

       if (!ret) {              // 超時返回,無設備就緒

              ret = -ERESTARTNOHAND;

              if (signal_pending(current))

                     goto out;

              ret = 0;

       }

       // 把結果集,拷貝回用戶空間

       if (set_fd_set(n, inp, fds.res_in) ||

           set_fd_set(n, outp, fds.res_out) ||

           set_fd_set(n, exp, fds.res_ex))

              ret = -EFAULT;

out:

       if (bits != stack_fds)

              kfree(bits);     // 若是有申請空間,那麼釋放fds對應的空間

out_nofds:

       return ret;                    // 返回就緒的文件描述符的個數

}

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

do_select()函數解析:

int do_select(int n, fd_set_bits *fds, struct timespec *end_time)

{

       ktime_t expire, *to = NULL;

       struct poll_wqueues table;

       poll_table *wait;

       int retval, i, timed_out = 0;

       unsigned long slack = 0;

 

       rcu_read_lock();

       // 根據已經設置好的fd位圖檢查用戶打開的fd, 要求對應fd必須打開, 而且返回

// 最大的fd。

       retval = max_select_fd(n, fds);

       rcu_read_unlock();

       if (retval < 0)

              return retval;

       n = retval;

       // 一些重要的初始化:

       // poll_wqueues.poll_table.qproc函數指針初始化,該函數是驅動程序中poll函數實

       // 現中必需要調用的poll_wait()中使用的函數。

       poll_initwait(&table);

       wait = &table.pt;

       if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {

              wait = NULL;

              timed_out = 1;     // 若是系統調用帶進來的超時時間爲0,那麼設置

                                          // timed_out = 1,表示不阻塞,直接返回。

       }

       if (end_time && !timed_out)

              slack = estimate_accuracy(end_time); // 超時時間轉換

       retval = 0;

       for (;;) {

              unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;

              inp = fds->in; outp = fds->out; exp = fds->ex;

              rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;

              // 全部n個fd的循環

              for (i = 0; i < n; ++rinp, ++routp, ++rexp) {

                     unsigned long in, out, ex, all_bits, bit = 1, mask, j;

                     unsigned long res_in = 0, res_out = 0, res_ex = 0;

                     const struct file_operations *f_op = NULL;

                     struct file *file = NULL;

                     // 先取出當前循環週期中的32個文件描述符對應的bitmaps

                     in = *inp++; out = *outp++; ex = *exp++;

                     all_bits = in | out | ex;  // 組合一下,有的fd可能只監測讀,或者寫,

// 或者e rr,或者同時都監測

                     if (all_bits == 0) {  // 這32個描述符沒有任何狀態被監測,就跳入

// 下一個32個fd的循環中

                            i += __NFDBITS; //每32個文件描述符一個循環,正好一個long型數

                            continue;

                     }

                     // 本次32個fd的循環中有須要監測的狀態存在

                     for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {// 初始bit = 1

                            int fput_needed;

                            if (i >= n)      // i用來檢測是否超出了最大待監測的fd

                                   break;

                            if (!(bit & all_bits))

                                   continue; // bit每次循環後左移一位的做用在這裏,用來

// 跳過沒有狀態監測的fd

                            file = fget_light(i, &fput_needed); // 獲得file結構指針,並增長

// 引用計數字段f_count

                            if (file) {        // 若是file存在

                                   f_op = file->f_op;

                                   mask = DEFAULT_POLLMASK;

                                   if (f_op && f_op->poll) {

                                          wait_key_set(wait, in, out, bit);// 設置當前fd待監測

//  的事件掩碼

                                          mask = (*f_op->poll)(file, wait);

                                          /*

                                                 調用驅動程序中的poll函數,以evdev驅動中的

evdev_poll()爲例該函數會調用函數poll_wait(file, &evdev->wait, wait),繼續調用__pollwait()回調來分配一個poll_table_entry結構體,該結構體有一個內嵌的等待隊列項,設置好wake時調用的回調函數後將其添加到驅動程序中的等待隊列頭中。

                                          */

                                   }

                                   fput_light(file, fput_needed);

                                   // 釋放file結構指針,實際就是減少他的一個引用

計數字段f_count。

                                   // mask是每個fop->poll()程序返回的設備狀態掩碼。

                                   if ((mask & POLLIN_SET) && (in & bit)) {

                                          res_in |= bit;         // fd對應的設備可讀

                                          retval++;

                                          wait = NULL;       // 後續有用,避免重複執行__pollwait()

                                   }

                                   if ((mask & POLLOUT_SET) && (out & bit)) {

                                          res_out |= bit;              // fd對應的設備可寫

                                          retval++;

                                          wait = NULL;

                                   }

                                   if ((mask & POLLEX_SET) && (ex & bit)) {

                                          res_ex |= bit;

                                          retval++;

                                          wait = NULL;

                                   }

                            }

                     }

                     // 根據poll的結果寫回到輸出位圖裏,返回給上級函數

                     if (res_in)

                            *rinp = res_in;

                     if (res_out)

                            *routp = res_out;

                     if (res_ex)

                            *rexp = res_ex;

                     /*

                            這裏的目的純粹是爲了增長一個搶佔點。

                            在支持搶佔式調度的內核中(定義了CONFIG_PREEMPT),

cond_resched是空操做。

                     */

                     cond_resched();

              }

              wait = NULL;  // 後續有用,避免重複執行__pollwait()

              if (retval || timed_out || signal_pending(current))

                     break;

              if (table.error) {

                     retval = table.error;

                     break;

              }

              /*跳出這個大循環的條件有: 有設備就緒或有異常(retval!=0), 超時(timed_out

              = 1), 或者有停止信號出現*/

              /*

               * If this is the first loop and we have a timeout

               * given, then we convert to ktime_t and set the to

               * pointer to the expiry value.

               */

              if (end_time && !to) {

                     expire = timespec_to_ktime(*end_time);

                     to = &expire;

              }

              // 第一次循環中,當前用戶進程從這裏進入休眠,

// 上面傳下來的超時時間只是爲了用在睡眠超時這裏而已

              // 超時,poll_schedule_timeout()返回0;被喚醒時返回-EINTR

              if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,

                                      to, slack))

                     timed_out = 1; /* 超時後,將其設置成1,方便後面退出循環返回到上層 */

       }

       // 清理各個驅動程序的等待隊列頭,同時釋放掉全部空出來

// 的page頁(poll_table_entry)

       poll_freewait(&table);

       return retval; // 返回就緒的文件描述符的個數

}

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

==================================================================

 

2、重要結構體之間關係

       比較重要的結構體由四個:struct poll_wqueues、struct poll_table_page、struct poll_table_entry、struct poll_table_struct,這小節重點討論前三個,後面一個留到後面小節。

      

2.一、結構體關係

每個調用select()系統調用的應用進程都會存在一個struct poll_weueues結構體,用來統一輔佐實現這個進程中全部待監測的fd的輪詢工做,後面全部的工做和都這個結構體有關,因此它很是重要。

struct poll_wqueues {

       poll_table pt;

       struct poll_table_page *table;

       struct task_struct *polling_task; //保存當前調用select的用戶進程struct task_struct結構體

       int triggered;         // 當前用戶進程被喚醒後置成1,以避免該進程接着進睡眠

       int error;               // 錯誤碼

       int inline_index;   // 數組inline_entries的引用下標

       struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];

};

實際上結構體poll_wqueues內嵌的poll_table_entry數組inline_entries[] 的大小是有限:

#define MAX_STACK_ALLOC 832

#define FRONTEND_STACK_ALLOC     256

#define WQUEUES_STACK_ALLOC     

(MAX_STACK_ALLOC - FRONTEND_STACK_ALLOC)

#define N_INLINE_POLL_ENTRIES      

(WQUEUES_STACK_ALLOC / sizeof(struct poll_table_entry))

若是空間不夠用,後續會動態申請物理內存頁以鏈表的形式掛載poll_wqueues.table上統一管理。接下來的兩個結構體就和這項內容密切相關:

struct poll_table_page { // 申請的物理頁都會將起始地址強制轉換成該結構體指針

       struct poll_table_page * next;     // 指向下一個申請的物理頁

       struct poll_table_entry * entry; // 指向entries[]中首個待分配(空的) poll_table_entry地址

       struct poll_table_entry entries[0]; // page頁後面剩餘的空間都是待分配的

//  poll_table_entry結構體

};

對每個fd調用fop->poll()  à  poll_wait()  à  __pollwait()都會先從poll_wqueues. inline_entries[]中分配一個poll_table_entry結構體,直到該數組用完纔會分配物理頁掛在鏈表指針poll_wqueues.table上而後纔會分配一個poll_table_entry結構體。具體用來作什麼?這裏先簡單說說,__pollwait()函數調用時須要3個參數,第一個是特定fd對應的file結構體指針,第二個就是特定fd對應的硬件驅動程序中的等待隊列頭指針,第3個是調用select()的應用進程中poll_wqueues結構體的poll_table項(該進程監測的全部fd調用fop->poll函數都用這一個poll_table結構體)。

struct poll_table_entry {

       struct file *filp;            // 指向特定fd對應的file結構體;

       unsigned long key;              // 等待特定fd對應硬件設備的事件掩碼,如POLLIN

//  POLLOUTPOLLERR;

       wait_queue_t wait;             // 表明調用select()的應用進程,等待在fd對應設備的特定事件

//  (讀或者寫)的等待隊列頭上,的等待隊列項;

       wait_queue_head_t *wait_address; // 設備驅動程序中特定事件的等待隊列頭;

};

       總結一下幾點:

1.               特定的硬件設備驅動程序的事件等待隊列頭是有限個數的,一般是有讀事件和寫事件的等待隊列頭;

2.               而一個調用了select()的應用進程只存在一個poll_wqueues結構體;

3.               該應用程序能夠有多個fd在進行同時監測其各自的事件發生,但該應用進程中每個fd有多少個poll_table_entry存在,那就取決於fd對應的驅動程序中有幾個事件等待隊列頭了,也就是說,一般驅動程序的poll函數中須要對每個事件的等待隊列頭調用poll_wait()函數。好比,若是有讀寫兩個等待隊列頭,那麼就在這個應用進程中存在兩個poll_table_entry結構體,在這兩個事件的等待隊列頭中分別將兩個等待隊列項加入;

4.               若是有多個應用進程使用selcet()方式同時在訪問同一個硬件設備,此時硬件驅動程序中加入等待隊列頭中的等待隊列項對每個應用程序來講都是相同數量的(一個事件等待隊列頭一個,數量取決於事件等待隊列頭的個數)。

 

2.二、注意項

對於第3點中,若是驅動程序中有多個事件等待隊列頭,那麼在這種狀況下,寫設備驅動程序時就要特別當心了,特別是設備有事件就緒而後喚醒等待隊列頭中全部應用進程的時候須要使用另外的宏,喚醒使用的宏和函數源碼見include/linux/wait.h:

在這以前看一看__pollwait()函數中填充poll_table_entry結構體時註冊的喚醒回調函數pollwake()。

static int pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key)

{

       struct poll_table_entry *entry;

 

       entry = container_of(wait, struct poll_table_entry, wait);

       // 取得poll_table_entry結構體指針

       if (key && !((unsigned long)key & entry->key))

       /*這裏的條件判斷相當重要,避免應用進程被誤喚醒,什麼意思?*/

              return 0;

       return __pollwake(wait, mode, sync, key);

}

到底什麼狀況下會出現誤喚醒呢?固然是有先決條件的。

驅動程序中存在多個事件的等待隊列頭,而且應用程序中只監測了該硬件的某幾項事件,好比,驅動中有讀寫等待隊裏頭,但應用程序中只有在監測讀事件的發生。這種狀況下,寫驅動程序時候,若是喚醒函數用法不當,就會引發誤喚醒的狀況。

先來看一看咱們熟知的一些喚醒函數吧!

#define wake_up(x)                    __wake_up(x, TASK_NORMAL, 1, NULL)

#define wake_up_interruptible(x)      __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL)

void __wake_up(wait_queue_head_t *q, unsigned int mode, int nr, void *key);

注意到這個key了嗎?一般咱們調用喚醒函數時key爲NULL,很容易看出,若是咱們在這種狀況下,使用上面兩種喚醒函數,那麼上面紅色字體的判斷條件一直都會是假,那麼也就是說,只要設備的幾類事件之一有發生,無論應用程序中是否對其有監測,都會在這裏順利經過將應用程序喚醒,喚醒後,從新調用一遍fop->poll(注意:第一次和第二次調用該函數時少作了一件事,後面代碼詳解)函數,獲得設備事件掩碼。假如剛好在此次喚醒後的一輪調用fop->poll()函數的循環中,沒有其餘硬件設備就緒,那麼可想而知,從源碼上看,do_select()會直接返回0。

// mask是每個fop->poll()程序返回的設備狀態掩碼。

if ((mask & POLLIN_SET) && (in & bit)) {

       res_in |= bit;         // fd對應的設備可讀

       retval++;

       wait = NULL;              // 後續有用,避免重複執行__pollwait()

}

(in & bit)這個條件就是用來確認用戶程序有沒有讓你監測該事件的, 若是沒有retval仍然是0,基於前面的假設,那麼do_select()返回給上層的也是0。那又假如應用程序中調用select()的時候沒有傳入超時值,那豈不是和事實不相符合嗎?沒有傳遞超時值,那麼select()函數會一直阻塞直到至少有1個fd的狀態就緒。

因此在這種狀況下,設備驅動中喚醒函數須要用另外的一組:

#define wake_up_poll(x, m)                            /

       __wake_up(x, TASK_NORMAL, 1, (void *) (m))

#define wake_up_interruptible_poll(x, m)               /

       __wake_up(x, TASK_INTERRUPTIBLE, 1, (void *) (m))

這裏的m值,應該和設備發生的事件相符合。設置poll_table_entry結構體的key項的函數是:

#define POLLIN_SET

(POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR)

#define POLLOUT_SET (POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR)

#define POLLEX_SET (POLLPRI)

static inline void wait_key_set(poll_table *wait, unsigned long in,

                            unsigned long out, unsigned long bit)

{

       if (wait) {

              wait->key = POLLEX_SET;

              if (in & bit)

                     wait->key |= POLLIN_SET;

              if (out & bit)

                     wait->key |= POLLOUT_SET;

       }

}

這裏的m值,能夠參考上面的宏來設置,注意傳遞的不是key的指針,而就是其值自己,只不過在wake_up_poll()到pollwake()的傳遞過程當中是將其轉換成指針的。

       若是喚醒函數使用後面一組的話,再加上合理設置key值,我相信pollwake()函數中的if必定會嚴格把關,不讓應用程序沒有監測的事件喚醒應用進程,從而避免了發生誤喚醒

 

3、討論幾個細節

      

3.一、fop->poll()

       fop->poll()函數就是file_operations結構體中的poll函數指針項,該函數相信不少人都知道怎麼寫,網上大把的文章介紹其模板,可是爲何要那麼寫,並且它作了什麼具體的事情?本小節來揭開其神祕面紗,先貼一個模板上來。

       static unsigned int XXX_poll(struct file *filp, poll_table *wait)

{

    unsigned int mask = 0;

        struct XXX_dev *dev = filp->private_data;

    ...

    poll_wait(filp, &dev->r_wait, wait);

    poll_wait(filp ,&dev->w_wait, wait);

   

    if(...)//讀就緒

    {

          mask |= POLLIN | POLLRDNORM;

     }

    if(...)//寫就緒

    {

          mask |= POLLOUT | POLLRDNORM;

     }

    ..

    return mask;

}

       poll_wait()只因有wait字樣,常常給人誤會,覺得它會停在這裏等,也就是常說的阻塞。不過咱們反過來想一想,要是同一個應用進程同時監測多個fd,那麼沒一個fd調用xxx_poll的時候都阻塞在這裏,那和不使用select()又有何區別呢?都會阻塞在當個硬件上而耽誤了被的設備就緒事件的讀取。

       其實,這個poll_wait()函數所作的工做挺簡單,就是添加一個等待等待隊列項到poll_wait ()函數傳遞進去的第二個參數,其表明的是驅動程序中的特定事件的等待隊列頭。

       下面以字符設備evdev爲例,文件drivers/input/evdev.c。

       static unsigned int evdev_poll(struct file *file, poll_table *wait)

{

       struct evdev_client *client = file->private_data;

       struct evdev *evdev = client->evdev;

 

       poll_wait(file, &evdev->wait, wait);

       return ((client->head == client->tail) ? 0 : (POLLIN | POLLRDNORM)) |

              (evdev->exist ? 0 : (POLLHUP | POLLERR));

}

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address,

poll_table *p)

{

       if (p && wait_address)

              p->qproc(filp, wait_address, p);

}

其中wait_address是驅動程序須要提供的等待隊列頭,來容納後續等待該硬件設備就緒的進程對應的等待隊列項。關鍵結構體poll_table, 這個結構體名字也取的很差,什麼table?其實其中沒有table的一丁點概念,容易讓人誤解呀!

 

typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);

typedef struct poll_table_struct {

       poll_queue_proc qproc;

       unsigned long key;

} poll_table;

      fop->poll()函數的poll_table參數是從哪裏傳進來的?好生閱讀過代碼就能夠發現,do_select()函數中存在一個結構體struct poll_wqueues,其內嵌了一個poll_table的結構體,因此在後面的大循環中依次調用各個fd的fop->poll()傳遞的poll_table參數都是poll_wqueues.poll_table。

       poll_table結構體的定義其實蠻簡單,就一個函數指針,一個key值。這個函數指針在整個select過程當中一直不變,而key則會根據不一樣的fd的監測要求而變化。

      qproc函數初始化在函數do_select()àpoll_initwait()àinit_poll_funcptr(&pwq->pt, __pollwait)中實現,回調函數就是__pollwait()。

       int do_select(int n, fd_set_bits *fds, struct timespec *end_time)

{

       struct poll_wqueues table;

       …

       poll_initwait(&table);

       …

}

void poll_initwait(struct poll_wqueues *pwq)

{

       init_poll_funcptr(&pwq->pt, __pollwait);

       …

}

       static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)

{

       pt->qproc = qproc;

       pt->key   = ~0UL; /* all events enabled */

}

      

/* Add a new entry */

static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)

{

       struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);

       struct poll_table_entry *entry = poll_get_entry(pwq);

       if (!entry)

              return;

       get_file(filp);

       entry->filp = filp;         // 保存對應的file結構體

       entry->wait_address = wait_address;  // 保存來自設備驅動程序的等待隊列頭

       entry->key = p->key;   // 保存對該fd關心的事件掩碼

       init_waitqueue_func_entry(&entry->wait, pollwake);

       // 初始化等待隊列項,pollwake是喚醒該等待隊列項時候調用的函數

       entry->wait.private = pwq;

       // poll_wqueues做爲該等待隊列項的私有數據,後面使用

       add_wait_queue(wait_address, &entry->wait);

       // 將該等待隊列項添加到從驅動程序中傳遞過來的等待隊列頭中去。

}

該函數首先經過container_of宏來獲得結構體poll_wqueues的地址,而後調用poll_get_entry()函數來得到一個poll_table_entry結構體,這個結構體是用來鏈接驅動和應用進程的關鍵結構體,其實聯繫很簡單,這個結構體中內嵌了一個等待隊列項wait_queue_t,和一個等待隊列頭 wait_queue_head_t,它就是驅動程序中定義的等待隊列頭,應用進程就是在這裏保存了每個硬件設備驅動程序中的等待隊列頭(固然每個fd都有一個poll_table_entry結構體)。

很容易想到的是,若是這個設備在別的應用程序中也有使用,又剛好別的應用進程中也是用select()來訪問該硬件設備,那麼在另一個應用進程的同一個地方也會調用一樣的函數來初始化一個poll_table_entry結構體,而後將這個結構體中內嵌的等待隊列項添加到同一份驅動程序的等待隊列頭中。此後,若是設備就緒了,那麼驅動程序中將會喚醒這個對於等待隊列頭中全部的等待隊列項(也就是等待在該設備上的全部應用進程,全部等待的應用進程將會獲得同一份數據)。

上面紅色字體的語句保存了一個應用程序select一個fd的硬件設備時候的最全的信息,方便在設備就緒的時候容易獲得對應的數據。這裏的entry->key值就是爲了防止第二節中描述的誤喚醒而準備的。設置這個key值的地方在函數do_select()中。以下:

if (file) {

       f_op = file->f_op;

       mask = DEFAULT_POLLMASK;

       if (f_op && f_op->poll) {

              wait_key_set(wait, in, out, bit);  // 見第二節                                                                  mask = (*f_op->poll)(file, wait);

       }

}

 

fop->poll()函數的返回值都是有規定的,例如函數evdev_poll()中的返回值:

return ((client->head == client->tail) ? 0 : (POLLIN | POLLRDNORM)) |

              (evdev->exist ? 0 : (POLLHUP | POLLERR));

會根據驅動程序中特定的buffer隊列標誌,來返回設備狀態。這裏的判斷條件是讀循環buffer的頭尾指針是否相等:client->head == client->tail。

 

       3.二、poll_wait()函數在select()睡眠先後調用的差別

       static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address,

poll_table *p)

{

       if (p && wait_address)

              p->qproc(filp, wait_address, p);

}

這裏有一個if條件判斷,若是驅動程序中沒有提供等待隊列頭wait_address,那麼將不會往下執行p->qproc(__pollwait()),也就是不會將表明當前應用進程的等待隊列項添加進驅動程序中對應的等待隊列頭中。也就是說,若是應用程序剛好用select來監測這個fd的這個等待隊列頭對應的事件時,是永遠也得不到這個設備的就緒或者錯誤狀態的。

       若是select()中調用fop->poll()時傳遞進來的poll_table是NULL,一般狀況下,只要在應用層傳遞進來的超時時間結構體值不爲0,哪怕這個結構體指針你傳遞NULL,那麼在函數do_select()中第一次睡眠以前的那次全部fd的大循環中調用fop->poll()函數傳遞的poll_table是絕對不會爲NULL的,可是第一次睡眠喚醒以後的又一次全部fd的大循環中再次調用fop->poll()函數時,此時傳遞的poll_table是NULL,可想而知,這一次只是檢查fop->poll()的返回狀態值而已。最後若是從上層調用select時傳遞的超時值結構體賦值成0,那麼do_select()函數的只會調用一次全部fd的大循環,以後再也不進入睡眠,直接返回0給上層,基本上這種狀況是沒有獲得任何有用的狀態。

       爲了不應用進程被喚醒以後再次調用pollwait()的時候重複地調用函數__pollwait(),那麼在傳遞poll_table結構體指針的時候,在睡眠以前保證其爲有效地址,而在喚醒以後保證傳入的poll_table地址是NULL,由於在喚醒以後,再次調用fop->poll()的做用只是爲了再次檢查設備的事件狀態而已。具體詳見代碼。

      

       3.三、喚醒應用進程

       第二節中已經討論過驅動程序喚醒進程的一點注意項,但這裏再次介紹睡眠喚醒的整個流程。

       睡眠是調用函數poll_schedule_timeout()來實現:

       int poll_schedule_timeout(struct poll_wqueues *pwq, int state,

                       ktime_t *expires, unsigned long slack)

{

       int rc = -EINTR;

 

       set_current_state(state);

       if (!pwq->triggered)  // 這個triggered在何時被置1的呢?只要有一個fd

// 對應的設備將當前應用進程喚醒後將會把它設置成1

              rc = schedule_hrtimeout_range(expires, slack, HRTIMER_MODE_ABS);

       __set_current_state(TASK_RUNNING);

      

       set_mb(pwq->triggered, 0);

       return rc;

}

       喚醒的話會調用函數pollwake():

       static int pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key)

{

       struct poll_table_entry *entry;

 

       entry = container_of(wait, struct poll_table_entry, wait);

       if (key && !((unsigned long)key & entry->key))

              return 0;

       return __pollwake(wait, mode, sync, key);

}

       static int __pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key)

{

       struct poll_wqueues *pwq = wait->private;

       DECLARE_WAITQUEUE(dummy_wait, pwq->polling_task);

 

       /*

        * Although this function is called under waitqueue lock, LOCK

        * doesn't imply write barrier and the users expect write

        * barrier semantics on wakeup functions.  The following

        * smp_wmb() is equivalent to smp_wmb() in try_to_wake_up()

        * and is paired with set_mb() in poll_schedule_timeout.

        */

       smp_wmb();

       pwq->triggered = 1;

       // select()用戶進程只要有被喚醒過,就不可能再次進入睡眠,由於這個標誌在睡眠的時候有用

             

       return default_wake_function(&dummy_wait, mode, sync, key);

       // 默認通用的喚醒函數

}

      

參考網址:

1. http://blogold.chinaunix.net/u2/60011/showart_1334783.html

http://yuanbor.blog.163.com/blog/static/56674620201051134748647/

http://www.cnblogs.com/hanyan225/archive/2010/10/13/1850497.html

http://hi.baidu.com/operationsystem/blog/item/208eab9821da8f0e6f068cea.html

2. fs/select.c

drivers/input/evdev.c

include/linux/poll.h

include/linux/wait.h

kernel/wait.c

 

 

 

使用事件驅動模型實現高效穩定的網絡服務器程序

前言

事件驅動爲廣大的程序員所熟悉,其最爲人津津樂道的是在圖形化界面編程中的應用;事實上,在網絡編程中事件驅動也被普遍使用,並大規模部署在高鏈接數高吞吐量的服務器程序中,如 http 服務器程序、ftp 服務器程序等。相比於傳統的網絡編程方式,事件驅動可以極大的下降資源佔用,增大服務接待能力,並提升網絡傳輸效率。

關於本文說起的服務器模型,搜索網絡能夠查閱到不少的實現代碼,因此,本文將不拘泥於源代碼的陳列與分析,而側重模型的介紹和比較。使用 libev 事件驅動庫的服務器模型將給出實現代碼。

本文涉及到線程 / 時間圖例,只爲代表線程在各個 IO 上確實存在阻塞時延,但並不保證時延比例的正確性和 IO 執行前後的正確性;另外,本文所說起到的接口也只是筆者熟悉的 Unix/Linux 接口,並未推薦 Windows 接口,讀者能夠自行查閱對應的 Windows 接口。

阻塞型的網絡編程接口

幾乎全部的程序員第一次接觸到的網絡編程都是從 listen()、send()、recv() 等接口開始的。使用這些接口能夠很方便的構建服務器 / 客戶機的模型。

咱們假設但願創建一個簡單的服務器程序,實現向單個客戶機提供相似於「一問一答」的內容服務。

圖 1. 簡單的一問一答的服務器 / 客戶機模型

圖 1. 簡單的一問一答的服務器 / 客戶機模型

咱們注意到,大部分的 socket 接口都是阻塞型的。所謂阻塞型接口是指系統調用(通常是 IO 接口)不返回調用結果並讓當前線程一直阻塞,只有當該系統調用得到結果或者超時出錯時才返回。

實際上,除非特別指定,幾乎全部的 IO 接口 ( 包括 socket 接口 ) 都是阻塞型的。這給網絡編程帶來了一個很大的問題,如在調用 send() 的同時,線程將被阻塞,在此期間,線程將沒法執行任何運算或響應任何的網絡請求。這給多客戶機、多業務邏輯的網絡編程帶來了挑戰。這時,不少程序員可能會選擇多線程的方式來解決這個問題。

多線程的服務器程序

應對多客戶機的網絡應用,最簡單的解決方式是在服務器端使用多線程(或多進程)。多線程(或多進程)的目的是讓每一個鏈接都擁有獨立的線程(或進程),這樣任何一個鏈接的阻塞都不會影響其餘的鏈接。

具體使用多進程仍是多線程,並無一個特定的模式。傳統意義上,進程的開銷要遠遠大於線程,因此,若是須要同時爲較多的客戶機提供服務,則不推薦使用多進程;若是單個服務執行體須要消耗較多的 CPU 資源,譬如須要進行大規模或長時間的數據運算或文件訪問,則進程較爲安全。一般,使用 pthread_create () 建立新線程,fork() 建立新進程。

咱們假設對上述的服務器 / 客戶機模型,提出更高的要求,即讓服務器同時爲多個客戶機提供一問一答的服務。因而有了以下的模型。

圖 2. 多線程的服務器模型

圖 2. 多線程的服務器模型

在上述的線程 / 時間圖例中,主線程持續等待客戶端的鏈接請求,若是有鏈接,則建立新線程,並在新線程中提供爲前例一樣的問答服務。

不少初學者可能不明白爲什麼一個 socket 能夠 accept 屢次。實際上,socket 的設計者可能特地爲多客戶機的狀況留下了伏筆,讓 accept() 可以返回一個新的 socket。下面是 accept 接口的原型:

int accept(int s, struct sockaddr *addr, socklen_t *addrlen);

輸入參數 s 是從 socket(),bind() 和 listen() 中沿用下來的 socket 句柄值。執行完 bind() 和 listen() 後,操做系統已經開始在指定的端口處監聽全部的鏈接請求,若是有請求,則將該鏈接請求加入請求隊列。調用 accept() 接口正是從 socket s 的請求隊列抽取第一個鏈接信息,建立一個與 s 同類的新的 socket 返回句柄。新的 socket 句柄便是後續 read() 和 recv() 的輸入參數。若是請求隊列當前沒有請求,則 accept() 將進入阻塞狀態直到有請求進入隊列。

上述多線程的服務器模型彷佛完美的解決了爲多個客戶機提供問答服務的要求,但其實並不盡然。若是要同時響應成百上千路的鏈接請求,則不管多線程仍是多進程都會嚴重佔據系統資源,下降系統對外界響應效率,而線程與進程自己也更容易進入假死狀態。

不少程序員可能會考慮使用「線程池」或「鏈接池」。「線程池」旨在減小建立和銷燬線程的頻率,其維持必定合理數量的線程,並讓空閒的線程從新承擔新的執行任務。「鏈接池」維持鏈接的緩存池,儘可能重用已有的鏈接、減小建立和關閉鏈接的頻率。這兩種技術均可以很好的下降系統開銷,都被普遍應用不少大型系統,如 websphere、tomcat 和各類數據庫等。

可是,「線程池」和「鏈接池」技術也只是在必定程度上緩解了頻繁調用 IO 接口帶來的資源佔用。並且,所謂「池」始終有其上限,當請求大大超過上限時,「池」構成的系統對外界的響應並不比沒有池的時候效果好多少。因此使用「池」必須考慮其面臨的響應規模,並根據響應規模調整「池」的大小。

對應上例中的所面臨的可能同時出現的上千甚至上萬次的客戶端請求,「線程池」或「鏈接池」或許能夠緩解部分壓力,可是不能解決全部問題。

總之,多線程模型能夠方便高效的解決小規模的服務請求,但面對大規模的服務請求,多線程模型並非最佳方案。下一章咱們將討論用非阻塞接口來嘗試解決這個問題。

非阻塞的服務器程序

以上面臨的不少問題,必定程度是 IO 接口的阻塞特性致使的。多線程是一個解決方案,還一個方案就是使用非阻塞的接口。

非阻塞的接口相比於阻塞型接口的顯著差別在於,在被調用以後當即返回。使用以下的函數能夠將某句柄 fd 設爲非阻塞狀態。

fcntl( fd, F_SETFL, O_NONBLOCK );

下面將給出只用一個線程,但可以同時從多個鏈接中檢測數據是否送達,而且接受數據。

圖 3. 使用非阻塞的接收數據模型

圖 3. 使用非阻塞的接收數據模型

在非阻塞狀態下,recv() 接口在被調用後當即返回,返回值表明了不一樣的含義。如在本例中,

  • recv() 返回值大於 0,表示接受數據完畢,返回值便是接受到的字節數;
  • recv() 返回 0,表示鏈接已經正常斷開;
  • recv() 返回 -1,且 errno 等於 EAGAIN,表示 recv 操做還沒執行完成;
  • recv() 返回 -1,且 errno 不等於 EAGAIN,表示 recv 操做遇到系統錯誤 errno。

能夠看到服務器線程能夠經過循環調用 recv() 接口,能夠在單個線程內實現對全部鏈接的數據接收工做。

可是上述模型毫不被推薦。由於,循環調用 recv() 將大幅度推高 CPU 佔用率;此外,在這個方案中,recv() 更多的是起到檢測「操做是否完成」的做用,實際操做系統提供了更爲高效的檢測「操做是否完成「做用的接口,例如 select()。

使用 select() 接口的基於事件驅動的服務器模型

大部分 Unix/Linux 都支持 select 函數,該函數用於探測多個文件句柄的狀態變化。下面給出 select 接口的原型:

FD_ZERO(int fd, fd_set* fds)
FD_SET(int fd, fd_set* fds)
FD_ISSET(int fd, fd_set* fds)
FD_CLR(int fd, fd_set* fds)
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,
        struct timeval *timeout)

這裏,fd_set 類型能夠簡單的理解爲按 bit 位標記句柄的隊列,例如要在某 fd_set 中標記一個值爲 16 的句柄,則該 fd_set 的第 16 個 bit 位被標記爲 1。具體的置位、驗證可以使用 FD_SET、FD_ISSET 等宏實現。在 select() 函數中,readfds、writefds 和 exceptfds 同時做爲輸入參數和輸出參數。若是輸入的 readfds 標記了 16 號句柄,則 select() 將檢測 16 號句柄是否可讀。在 select() 返回後,能夠經過檢查 readfds 有否標記 16 號句柄,來判斷該「可讀」事件是否發生。另外,用戶能夠設置 timeout 時間。

下面將從新模擬上例中從多個客戶端接收數據的模型。

圖 4. 使用 select() 的接收數據模型

圖 4. 使用 select() 的接收數據模型

上述模型只是描述了使用 select() 接口同時從多個客戶端接收數據的過程;因爲 select() 接口能夠同時對多個句柄進行讀狀態、寫狀態和錯誤狀態的探測,因此能夠很容易構建爲多個客戶端提供獨立問答服務的服務器系統。

圖 5. 使用 select() 接口的基於事件驅動的服務器模型

圖 5. 使用 select() 接口的基於事件驅動的服務器模型

這裏須要指出的是,客戶端的一個 connect() 操做,將在服務器端激發一個「可讀事件」,因此 select() 也能探測來自客戶端的 connect() 行爲。

上述模型中,最關鍵的地方是如何動態維護 select() 的三個參數 readfds、writefds 和 exceptfds。做爲輸入參數,readfds 應該標記全部的須要探測的「可讀事件」的句柄,其中永遠包括那個探測 connect() 的那個「母」句柄;同時,writefds 和 exceptfds 應該標記全部須要探測的「可寫事件」和「錯誤事件」的句柄 ( 使用 FD_SET() 標記 )。

做爲輸出參數,readfds、writefds 和 exceptfds 中的保存了 select() 捕捉到的全部事件的句柄值。程序員須要檢查的全部的標記位 ( 使用 FD_ISSET() 檢查 ),以肯定到底哪些句柄發生了事件。

上述模型主要模擬的是「一問一答」的服務流程,因此,若是 select() 發現某句柄捕捉到了「可讀事件」,服務器程序應及時作 recv() 操做,並根據接收到的數據準備好待發送數據,並將對應的句柄值加入 writefds,準備下一次的「可寫事件」的 select() 探測。一樣,若是 select() 發現某句柄捕捉到「可寫事件」,則程序應及時作 send() 操做,並準備好下一次的「可讀事件」探測準備。下圖描述的是上述模型中的一個執行週期。

圖 6. 一個執行週期

圖 6. 一個執行週期

這種模型的特徵在於每個執行週期都會探測一次或一組事件,一個特定的事件會觸發某個特定的響應。咱們能夠將這種模型歸類爲「事件驅動模型」。

相比其餘模型,使用 select() 的事件驅動模型只用單線程(進程)執行,佔用資源少,不消耗太多 CPU,同時可以爲多客戶端提供服務。若是試圖創建一個簡單的事件驅動的服務器程序,這個模型有必定的參考價值。

但這個模型依舊有着不少問題。

首先,select() 接口並非實現「事件驅動」的最好選擇。由於當須要探測的句柄值較大時,select() 接口自己須要消耗大量時間去輪詢各個句柄。不少操做系統提供了更爲高效的接口,如 linux 提供了 epoll,BSD 提供了 kqueue,Solaris 提供了 /dev/poll …。若是須要實現更高效的服務器程序,相似 epoll 這樣的接口更被推薦。遺憾的是不一樣的操做系統特供的 epoll 接口有很大差別,因此使用相似於 epoll 的接口實現具備較好跨平臺能力的服務器會比較困難。

其次,該模型將事件探測和事件響應夾雜在一塊兒,一旦事件響應的執行體龐大,則對整個模型是災難性的。以下例,龐大的執行體 1 的將直接致使響應事件 2 的執行體遲遲得不到執行,並在很大程度上下降了事件探測的及時性。

圖 7. 龐大的執行體對使用 select() 的事件驅動模型的影響

圖 7. 龐大的執行體對使用 select() 的事件驅動模型的影響

幸運的是,有不少高效的事件驅動庫能夠屏蔽上述的困難,常見的事件驅動庫有 libevent 庫,還有做爲 libevent 替代者的 libev 庫。這些庫會根據操做系統的特色選擇最合適的事件探測接口,而且加入了信號 (signal) 等技術以支持異步響應,這使得這些庫成爲構建事件驅動模型的不二選擇。下章將介紹如何使用 libev 庫替換 select 或 epoll 接口,實現高效穩定的服務器模型。

使用事件驅動庫 libev 的服務器模型

Libev 是一種高性能事件循環 / 事件驅動庫。做爲 libevent 的替代做品,其第一個版本發佈與 2007 年 11 月。Libev 的設計者聲稱 libev 擁有更快的速度,更小的體積,更多功能等優點,這些優點在不少測評中獲得了證實。正由於其良好的性能,不少系統開始使用 libev 庫。本章將介紹如何使用 Libev 實現提供問答服務的服務器。

(事實上,現存的事件循環 / 事件驅動庫有不少,做者也無心推薦讀者必定使用 libev 庫,而只是爲了說明事件驅動模型給網絡服務器編程帶來的便利和好處。大部分的事件驅動庫都有着與 libev 庫相相似的接口,只要明白大體的原理,便可靈活挑選合適的庫。)

與前章的模型相似,libev 一樣須要循環探測事件是否產生。Libev 的循環體用 ev_loop 結構來表達,並用 ev_loop( ) 來啓動。

void ev_loop( ev_loop* loop, int flags )

Libev 支持八種事件類型,其中包括 IO 事件。一個 IO 事件用 ev_io 來表徵,並用 ev_io_init() 函數來初始化:

void ev_io_init(ev_io *io, callback, int fd, int events)

初始化內容包括回調函數 callback,被探測的句柄 fd 和須要探測的事件,EV_READ 表「可讀事件」,EV_WRITE 表「可寫事件」。

如今,用戶須要作的僅僅是在合適的時候,將某些 ev_io 從 ev_loop 加入或剔除。一旦加入,下個循環即會檢查 ev_io 所指定的事件有否發生;若是該事件被探測到,則 ev_loop 會自動執行 ev_io 的回調函數 callback();若是 ev_io 被註銷,則再也不檢測對應事件。

不管某 ev_loop 啓動與否,均可以對其添加或刪除一個或多個 ev_io,添加刪除的接口是 ev_io_start() 和 ev_io_stop()。

void ev_io_start( ev_loop *loop, ev_io* io )
void ev_io_stop( EV_A_* )

由此,咱們能夠容易得出以下的「一問一答」的服務器模型。因爲沒有考慮服務器端主動終止鏈接機制,因此各個鏈接能夠維持任意時間,客戶端能夠自由選擇退出時機。

圖 8. 使用 libev 庫的服務器模型

圖 8. 使用 libev 庫的服務器模型

上述模型能夠接受任意多個鏈接,且爲各個鏈接提供徹底獨立的問答服務。藉助 libev 提供的事件循環 / 事件驅動接口,上述模型有機會具有其餘模型不能提供的高效率、低資源佔用、穩定性好和編寫簡單等特色。

因爲傳統的 web 服務器,ftp 服務器及其餘網絡應用程序都具備「一問一答」的通信邏輯,因此上述使用 libev 庫的「一問一答」模型對構建相似的服務器程序具備參考價值;另外,對於須要實現遠程監視或遠程遙控的應用程序,上述模型一樣提供了一個可行的實現方案。

總結

本文圍繞如何構建一個提供「一問一答」的服務器程序,前後討論了用阻塞型的 socket 接口實現的模型,使用多線程的模型,使用 select() 接口的基於事件驅動的服務器模型,直到使用 libev 事件驅動庫的服務器模型。文章對各類模型的優缺點都作了比較,從比較中得出結論,即便用「事件驅動模型」能夠的實現更爲高效穩定的服務器程序。文中描述的多種模型能夠爲讀者的網絡編程提供參考價值。

相關主題

 

 

 

select 實現分析 –2 【整理】

select相關的結構體

比較重要的結構體由四個:struct poll_wqueues、struct poll_table_page、struct poll_table_entry、struct poll_table_struct。

 

每個調用select()系統調用的應用進程都會存在一個struct poll_wqueues結構體,用來統一輔佐實現這個進程中全部待監測的fd的輪詢工做,後面全部的工做和都這個結構體有關,因此它很是重要。

struct poll_wqueues {

       poll_table pt;

       struct poll_table_page *table;

       struct task_struct *polling_task; //保存當前調用select的用戶進程struct task_struct結構體

       int triggered;            // 當前用戶進程被喚醒後置成1,以避免該進程接着進睡眠

       int error;                 // 錯誤碼

       int inline_index;        // 數組inline_entries的引用下標

       struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];

};

實際上結構體poll_wqueues內嵌的poll_table_entry數組inline_entries[] 的大小是有限的,若是空間不夠用,後續會動態申請物理內存頁以鏈表的形式掛載poll_wqueues.table上統一管理。接下來的兩個結構體就和這項內容密切相關:

struct poll_table_page { // 申請的物理頁都會將起始地址強制轉換成該結構體指針

       struct poll_table_page   *next;      // 指向下一個申請的物理頁

       struct poll_table_entry  *entry;     // 指向entries[]中首個待分配(空的) poll_table_entry地址

       struct poll_table_entry  entries[0]; // 該page頁後面剩餘的空間都是待分配的poll_table_entry結構體

};

 

對每個fd調用fop->poll() => poll_wait() => __pollwait()都會先從poll_wqueues.inline_entries[]中分配一個poll_table_entry結構體,直到該數組用完纔會分配物理頁掛在鏈表指針poll_wqueues.table上而後纔會分配一個poll_table_entry結構體(poll_get_entry函數)。

poll_table_entry具體用處:函數__pollwait聲明以下:

static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p);

 

該函數調用時須要3個參數,第一個是特定fd對應的file結構體指針,第二個就是特定fd對應的硬件驅動程序中的等待隊列頭指針,第3個是調用select()的應用進程中poll_wqueues結構體的poll_table項(該進程監測的全部fd調用fop->poll函數都用這一個poll_table結構體)。

struct poll_table_entry {

       struct file     *filp;                 // 指向特定fd對應的file結構體;

       unsigned long   key;                   // 等待特定fd對應硬件設備的事件掩碼,如POLLIN、 POLLOUT、POLLERR;

       wait_queue_t    wait;                  // 表明調用select()的應用進程,等待在fd對應設備的特定事件 (讀或者寫)的等待隊列頭上,的等待隊列項;

       wait_queue_head_t   *wait_address;     // 設備驅動程序中特定事件的等待隊列頭(該fd執行fop->poll,須要等待時在哪等,因此叫等待地址);

};

 

總結幾點:

  1. 特定的硬件設備驅動程序的事件等待隊列頭是有限個數的,一般是有讀事件和寫事件的等待隊列頭;
  2. 而一個調用了select()的應用進程只存在一個poll_wqueues結構體;
  3. 該應用程序能夠有多個fd在進行同時監測其各自的事件發生,但該應用進程中每個fd有多少個poll_table_entry存在,那就取決於fd對應的驅動程序中有幾個事件等待隊列頭了,也就是說,一般驅動程序的poll函數中須要對每個事件的等待隊列頭調用poll_wait()函數。好比,若是有讀寫兩個等待隊列頭,那麼就在這個應用進程中存在兩個poll_table_entry結構體,在這兩個事件的等待隊列頭中分別將兩個等待隊列項加入;
  4. 若是有多個應用進程使用select()方式同時在訪問同一個硬件設備,此時硬件驅動程序中加入等待隊列頭中的等待隊列項對每個應用程序來講都是相同數量的(一個事件等待隊列頭一個,數量取決於事件等待隊列頭的個數)。

 

do_select函數中,遍歷全部n個fd,對每個fd調用對應驅動程序中的poll函數。

驅動程序中poll通常具備以下形式:

static unsigned int XXX_poll(struct file *filp, poll_table *wait)

{

   unsigned int mask = 0;

   struct XXX_dev *dev = filp->private_data;

   ...

   poll_wait(filp, &dev->r_wait, wait);

   poll_wait(filp ,&dev->w_wait, wait);

   if(CAN_READ)//讀就緒

   {

      mask |= POLLIN | POLLRDNORM;

   }

   if(CAN_WRITE)//寫就緒

   {

      mask |= POLLOUT | POLLRDNORM;

   }

   ...

   return mask;

}

 

以字符設備evdev爲例(文件drivers/input/evdev.c)

static unsigned int evdev_poll(struct file *file, poll_table *wait)

{

       struct evdev_client *client = file->private_data;

       struct evdev *evdev = client->evdev;

       poll_wait(file, &evdev->wait, wait);

       return ((client->head == client->tail) ? 0 : (POLLIN | POLLRDNORM)) | (evdev->exist ? 0 : (POLLHUP | POLLERR));

}

 

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)

{

       if (p && wait_address)

              p->qproc(filp, wait_address, p);

}

 

其中wait_address是驅動程序須要提供的等待隊列頭,來容納後續等待該硬件設備就緒的進程對應的等待隊列項。

typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);

 

typedef struct poll_table_struct {

       poll_queue_proc qproc;

       unsigned long key;

} poll_table;

 

fop->poll()函數的poll_table參數是從哪裏傳進來的?好生閱讀過代碼就能夠發現,do_select()函數中存在一個結構體struct poll_wqueues,其內嵌了一個poll_table的結構體,因此在後面的大循環中依次調用各個fd的fop->poll()傳遞的poll_table參數都是poll_wqueues.poll_table。

poll_table結構體的定義其實蠻簡單,就一個函數指針,一個key值。這個函數指針在整個select過程當中一直不變,而key則會根據不一樣的fd的監測要求而變化。

qproc函數初始化在函數do_select() –> poll_initwait() -> init_poll_funcptr(&pwq->pt, __pollwait)中實現,回調函數就是__pollwait()。

 

int do_select(int n, fd_set_bits *fds, struct timespec *end_time)

{

       struct poll_wqueues table;

       ...

       poll_initwait(&table);

       ...

}

 

void poll_initwait(struct poll_wqueues *pwq)

{

       init_poll_funcptr(&pwq->pt, __pollwait);

       ...

}

 

static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)

{

       pt->qproc = qproc;

       pt->key   = ~0UL; /* all events enabled */

}

 

/* Add a new entry */

static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)

{

struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);

struct poll_table_entry *entry = poll_get_entry(pwq);

if (!entry)

return;

get_file(filp);

entry->filp = filp;      // 保存對應的file結構體

entry->wait_address = wait_address;  // 保存來自設備驅動程序的等待隊列頭

entry->key = p->key;  // 保存對該fd關心的事件掩碼

init_waitqueue_func_entry(&entry->wait, pollwake);// 初始化等待隊列項,pollwake是喚醒該等待隊列項時候調用的函數

entry->wait.private = pwq; // 將poll_wqueues做爲該等待隊列項的私有數據,後面使用

add_wait_queue(wait_address, &entry->wait);// 將該等待隊列項添加到從驅動程序中傳遞過來的等待隊列頭中去。

}

 

驅動程序在得知設備有IO事件時(一般是該設備上IO事件中斷),會調用wakeup,wakeup –> __wake_up_common -> curr->func(即pollwake)。

static int pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key)

{

       struct poll_table_entry *entry;

       entry = container_of(wait, struct poll_table_entry, wait);// 取得poll_table_entry結構體指針

       if (key && !((unsigned long)key & entry->key))/*這裏的條件判斷相當重要,避免應用進程被誤喚醒,什麼意思?*/

              return 0;

       return __pollwake(wait, mode, sync, key);

}

 

pollwake調用__pollwake,最終調用default_wake_function。

 

static int __pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key)

{

struct poll_wqueues *pwq = wait->private;

DECLARE_WAITQUEUE(dummy_wait, pwq->polling_task);

smp_wmb();

pwq->triggered = 1; // select()用戶進程只要有被喚醒過,就不可能再次進入睡眠,由於這個標誌在睡眠的時候有用

return default_wake_function(&dummy_wait, mode, sync, key); // 默認通用的喚醒函數

}

 

最終喚醒調用select的進程,在do_select函數的schedule_timeout函數以後繼續執行(繼續for(;;),也即重新檢查每個fd是否有事件發生),這次檢查會發現設備的該IO事件,因而select返回用戶層。

結合這兩節的內容,select的實現結構圖以下:

 

 

參考:

http://blog.csdn.net/lizhiguo0532/article/details/6568969

http://blog.csdn.net/lizhiguo0532/article/details/6568968

 

 

select,poll,epoll實現分析—結合內核源代碼

select,poll,epoll都是IO多路複用的機制。所謂I/O多路複用機制,就是說經過一種機制,能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做。但select,poll,epoll本質上都是同步I/O,由於他們都須要在讀寫事件就緒後本身負責進行讀寫,也就是說這個讀寫過程是阻塞的,而異步I/O則無需本身負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間。關於阻塞,非阻塞,同步,異步將在下一篇文章詳細說明。

select和poll的實現比較類似,目前也有不少爲人詬病的缺點,epoll能夠說是select和poll的加強版。

1、select實現

一、使用copy_from_user從用戶空間拷貝fd_set到內核空間

二、註冊回調函數__pollwait

三、遍歷全部fd,調用其對應的poll方法(對於socket,這個poll方法是sock_poll,sock_poll根據狀況會調用到tcp_poll,udp_poll或者datagram_poll)

四、以tcp_poll爲例,其核心實現就是__pollwait,也就是上面註冊的回調函數。

五、__pollwait的主要工做就是把current(當前進程)掛到設備的等待隊列中,不一樣的設備有不一樣的等待隊列,對於tcp_poll來講,其等待隊列是sk->sk_sleep(注意把進程掛到等待隊列中並不表明進程已經睡眠了)。在設備收到一條消息(網絡設備)或填寫完文件數據(磁盤設備)後,會喚醒設備等待隊列上睡眠的進程,這時current便被喚醒了。

六、poll方法返回時會返回一個描述讀寫操做是否就緒的mask掩碼,根據這個mask掩碼給fd_set賦值。

七、若是遍歷完全部的fd,尚未返回一個可讀寫的mask掩碼,則會調用schedule_timeout是調用select的進程(也就是current)進入睡眠。當設備驅動發生自身資源可讀寫後,會喚醒其等待隊列上睡眠的進程。若是超過必定的超時時間(schedule_timeout指定),仍是沒人喚醒,則調用select的進程會從新被喚醒得到CPU,進而從新遍歷fd,判斷有沒有就緒的fd。

八、把fd_set從內核空間拷貝到用戶空間。

總結:

select的幾大缺點:

(1)每次調用select,都須要把fd集合從用戶態拷貝到內核態,這個開銷在fd不少時會很大

(2)同時每次調用select都須要在內核遍歷傳遞進來的全部fd,這個開銷在fd不少時也很大

(3)select支持的文件描述符數量過小了,默認是1024

2、poll實現

poll的實現和select很是類似,只是描述fd集合的方式不一樣,poll使用pollfd結構而不是select的fd_set結構。其餘的都差很少。

 

3、epoll實現

epoll既然是對select和poll的改進,就應該能避免上述的三個缺點。那epoll都是怎麼解決的呢?在此以前,咱們先看一下epoll和select和poll的調用接口上的不一樣,select和poll都只提供了一個函數——select或者poll函數。而epoll提供了三個函數,epoll_create,epoll_ctl和epoll_wait,epoll_create是建立一個epoll句柄;epoll_ctl是註冊要監聽的事件類型;epoll_wait則是等待事件的產生。

對於第一個缺點,epoll的解決方案在epoll_ctl函數中。每次註冊新的事件到epoll句柄中時(在epoll_ctl中指定EPOLL_CTL_ADD),會把全部的fd拷貝進內核,而不是在epoll_wait的時候重複拷貝。epoll保證了每一個fd在整個過程當中只會拷貝一次。

對於第二個缺點,epoll的解決方案不像select或poll同樣每次都把current輪流加入fd對應的設備等待隊列中,而只在epoll_ctl時把current掛一遍(這一遍必不可少)併爲每一個fd指定一個回調函數,當設備就緒,喚醒等待隊列上的等待者時,就會調用這個回調函數,而這個回調函數會把就緒的fd加入一個就緒鏈表)。epoll_wait的工做實際上就是在這個就緒鏈表中查看有沒有就緒的fd(利用schedule_timeout()實現睡一會,判斷一會的效果,和select實現中的第7步是相似的)。

說明一下這個回調機制的原理,其實很簡單,看一下select和epoll在把current加入fd對應的設備等待隊列時使用的代碼:

select:

  1. static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,  
  2.                 poll_table *p)  
  3. {  
  4.     struct poll_table_entry *entry = poll_get_entry(p);  
  5.     if (!entry)  
  6.         return;  
  7.     get_file(filp);  
  8.     entry->filp = filp;  
  9.     entry->wait_address = wait_address;  
  10.     init_waitqueue_entry(&entry->wait, current);  
  11.     add_wait_queue(wait_address, &entry->wait);  
  12. }  

其中init_waitqueue_entry實現以下:

  1. static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p)  
  2. {  
  3.     q->flags = 0;  
  4.     q->private = p;  
  5.     q->func = default_wake_function;  
  6. }  

上面的代碼是說創建一個poll_table_entry結構entry,首先把current設置爲entry->wait的private成員,同時把default_wake_function設爲entry->wait的func成員,而後把entry->wait鏈入到wait_address中(這個wait_address就是設備的等待隊列,在tcp_poll中就是sk_sleep)。

再看一下epoll:

  1. /* 
  2.  * This is the callback that is used to add our wait queue to the 
  3.  * target file wakeup lists. 
  4.  */  
  5. static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,  
  6.                  poll_table *pt)  
  7. {  
  8.     struct epitem *epi = ep_item_from_epqueue(pt);  
  9.     struct eppoll_entry *pwq;  
  10.   
  11.     if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {  
  12.         init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);  
  13.         pwq->whead = whead;  
  14.         pwq->base = epi;  
  15.         add_wait_queue(whead, &pwq->wait);  
  16.         list_add_tail(&pwq->llink, &epi->pwqlist);  
  17.         epi->nwait++;  
  18.     } else {  
  19.         /* We have to signal that an error occurred */  
  20.         epi->nwait = -1;  
  21.     }  
  22. }  

其中init_waitqueue_func_entry的實現以下:

  1. static inline void init_waitqueue_func_entry(wait_queue_t *q,  
  2.                     wait_queue_func_t func)  
  3. {  
  4.     q->flags = 0;  
  5.     q->private = NULL;  
  6.     q->func = func;  

能夠看到,整體和select的實現是相似的,只不過它是建立了一個eppoll_entry結構pwq,只不過pwq->wait的func成員被設置成了回調函數ep_poll_callback(而不是default_wake_function,因此這裏並不會有喚醒操做,而只是執行回調函數),private成員被設置成了NULL。最後吧pwq->wait鏈入到whead中(也就是設備等待隊列中)。這樣,當設備等待隊列中的進程被喚醒時,就會調用ep_poll_callback了。

再梳理一下,當epoll_wait時,它會判斷就緒鏈表中有沒有就緒的fd,若是沒有,則把current進程加入一個等待隊列(file->private_data->wq)中,並在一個while(1)循環中判斷就緒隊列是否爲空,並結合schedule_timeout實現睡一會,判斷一會的效果。若是current進程在睡眠中,設備就緒了,就會調用回調函數。在回調函數中,會把就緒的fd放入就緒鏈表,並喚醒等待隊列(file->private_data->wq)中的current進程,這樣epoll_wait又能繼續執行下去了。

對於第三個缺點,epoll沒有這個限制,它所支持的FD上限是最大能夠打開文件的數目,這個數字通常遠大於2048,舉個例子,在1GB內存的機器上大約是10萬左右,具體數目能夠cat /proc/sys/fs/file-max察看,通常來講這個數目和系統內存關係很大。

總結:

一、select,poll實現須要本身不斷輪詢全部fd集合,直到設備就緒,期間可能要睡眠和喚醒屢次交替。而epoll其實也須要調用epoll_wait不斷輪詢就緒鏈表,期間也可能屢次睡眠和喚醒交替,可是它是設備就緒時,調用回調函數,把就緒fd放入就緒鏈表中,並喚醒在epoll_wait中進入睡眠的進程。雖然都要睡眠和交替,可是select和poll在「醒着」的時候要遍歷整個fd集合,而epoll在「醒着」的時候只要判斷一下就緒鏈表是否爲空就好了,這節省了大量的CPU時間。這就是回調機制帶來的性能提高。

二、select,poll每次調用都要把fd集合從用戶態往內核態拷貝一次,而且要把current往設備等待隊列中掛一次,而epoll只要一次拷貝,並且把current往等待隊列上掛也只掛一次(在epoll_wait的開始,注意這裏的等待隊列並非設備等待隊列,只是一個epoll內部定義的等待隊列)。這也能節省很多的開銷。

 

 

select,poll,epoll比較

select,poll,epoll簡介

 

select

 select本質上是經過設置或者檢查存放fd標誌位的數據結構來進行下一步處理。這樣所帶來的缺點是:

1 單個進程可監視的fd數量被限制

2 須要維護一個用來存放大量fd的數據結構,這樣會使得用戶空間和內核空間在傳遞該結構時複製開銷大

3 對socket進行掃描時是線性掃描

 

poll

poll本質上和select沒有區別,它將用戶傳入的數組拷貝到內核空間,而後查詢每一個fd對應的設備狀態,若是設備就緒則在設備等待隊列中加入一項並繼續遍歷,若是遍歷完全部fd後沒有發現就緒設備,則掛起當前進程,直到設備就緒或者主動超時,被喚醒後它又要再次遍歷fd。這個過程經歷了屢次無謂的遍歷。

它沒有最大鏈接數的限制,緣由是它是基於鏈表來存儲的,可是一樣有一個缺點:大量的fd的數組被總體複製於用戶態和內核地址空間之間,而無論這樣的複製是否是有意義。

poll還有一個特色是「水平觸發」,若是報告了fd後,沒有被處理,那麼下次poll時會再次報告該fd。

 epoll  epoll支持水平觸發和邊緣觸發,最大的特色在於邊緣觸發,它只告訴進程哪些fd剛剛變爲就需態,而且只會通知一次。

在前面說到的複製問題上,epoll使用mmap減小複製開銷。

還有一個特色是,epoll使用「事件」的就緒通知方式,經過epoll_ctl註冊fd,一旦該fd就緒,內核就會採用相似callback的回調機制來激活該fd,epoll_wait即可以收到通知

 

1 支持一個進程所能打開的最大鏈接數

 

 select  單個進程所能打開的最大鏈接數有FD_SETSIZE宏定義,其大小是32個整數的大小(在32位的機器上,大小就是32*32,同理64位機器上 FD_SETSIZE爲32*64),固然咱們能夠對它進行修改,而後從新編譯內核,可是性能可能會受到影響,這須要進一步的測試。
 poll  poll本質上和select沒有區別,可是它沒有最大鏈接數的限制,緣由是它是基於鏈表來存儲的
 

epoll

 雖然鏈接數有上限,可是很大,1G內存的機器上能夠打開10萬左右的鏈接,2G內存的機器能夠打開20萬左右的鏈接。

 

2 FD劇增後帶來的IO效率問題

 

 select

 由於每次調用時都會對鏈接進行線性遍歷,因此隨着FD的增長會形成遍歷速度慢的「線性降低性能問題」。
 poll  同上
 epoll  由於epoll內核中實現是根據每一個fd上的callback函數來實現的,只有活躍的socket纔會主動調用callback,因此在活躍socket較少的狀況下,使用epoll沒有前面二者的線性降低的性能問題,可是全部socket都很活躍的狀況下,可能會有性能問題。

   

3 消息傳遞方式

 

 select  內核須要將消息傳遞到用戶空間,都須要內核拷貝動做
 poll  同上
 epoll  epoll經過內核和用戶空間共享一塊內存來實現的

   

綜上,在選擇select,poll,epoll時要根據具體的使用場合以及這三種方式的自身特色。表面上看epoll的性能最好,可是在鏈接數少而且鏈接都十分活躍的狀況下,select和poll的性能可能比epoll好,畢竟epoll的通知機制須要不少函數回調。

 

 

select、poll、epoll使用小結

Linux上可使用不一樣的I/O模型,咱們能夠經過下圖瞭解經常使用的I/O模型:同步和異步模型,以及阻塞和非阻塞模型,本文主要分析其中的異步阻塞模型。


1、select使用

這個模型中配置的是非阻塞I/O,而後使用阻塞select系統調用來肯定一個I/O描述符什麼時候有操做。使用select調用能夠爲多個描述符提供通知,對於每一個提示符,咱們能夠請求描述符的可寫,可讀以及是否發生錯誤。異步阻塞I/O的系統流程以下圖所示:


使用select經常使用的幾個函數以下:

  1. FD_ZERO(int fd, fd_set* fds)   
  2. FD_SET(int fd, fd_set* fds)   
  3. FD_ISSET(int fd, fd_set* fds)   
  4. FD_CLR(int fd, fd_set* fds)   
  5. int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout)   

fd_set類型能夠簡單的理解爲按bit位標記句柄的隊列。具體的置位、驗證可使用FD_SET,FD_ISSET等宏實現。在select函數中,readfds、writefds和exceptfds同時做爲輸入參數和輸出參數,若是readfds標記了一個位置,則,select將檢測到該標記位可讀。timeout爲設置的超時時間。

下面咱們來看如何使用select:

  1. SOCKADDR_IN addrSrv;  
  2. int reuse = 1;  
  3. SOCKET sockSrv,connsock;  
  4. SOCKADDR_IN addrClient;  
  5. pool pool;  
  6. int len=sizeof(SOCKADDR);  
  7. /*建立TCP*/  
  8. sockSrv=socket(AF_INET,SOCK_STREAM,0);  
  9. /*地址、端口的綁定*/  
  10.   
  11. addrSrv.sin_addr.S_un.S_addr=htonl(INADDR_ANY);  
  12. addrSrv.sin_family=AF_INET;  
  13. addrSrv.sin_port=htons(port);  
  14.   
  15. if(bind(sockSrv,(SOCKADDR*)&addrSrv,sizeof(SOCKADDR))<0)  
  16. {  
  17.     fprintf(stderr,"Failed to bind");  
  18.     return ;  
  19. }  
  20.   
  21. if(listen(sockSrv,5)<0)  
  22. {  
  23.     fprintf(stderr,"Failed to listen socket");  
  24.     return ;  
  25. }  
  26. setsockopt(sockSrv,SOL_SOCKET,SO_REUSEADDR,(const char*)&reuse,sizeof(reuse));  
  27. init_pool(sockSrv,&pool);  
  28. while(1)  
  29. {  
  30.     /*經過selete設置爲異步模式*/  
  31.     pool.ready_set=pool.read_set;  
  32.     pool.nready=select(pool.maxfd+1,&pool.ready_set,NULL,NULL,NULL);  
  33.     if(FD_ISSET(sockSrv,&pool.ready_set))  
  34.     {  
  35.         connsock=accept(sockSrv,(SOCKADDR *)&addrClient,&len);  
  36.         //loadDeal()/*鏈接處理*/  
  37.         //printf("test\n");  
  38.         add_client(connsock,&pool);//添加到鏈接池  
  39.     }  
  40.     /*檢查是否有事件發生*/  
  41.     check_client(&pool);  
  42. }  

上面是一個服務器代碼的關鍵部分,設置爲異步的模式,而後接受到鏈接將其添加到鏈接池中。監聽描述符上使用select,接受客戶端的鏈接請求,在check_client函數中,遍歷鏈接池中的描述符,檢查是否有事件發生。





2、poll使用

poll函數相似於select,可是其調用形式不一樣。poll不是爲每一個條件構造一個描述符集,而是構造一個pollfd結構體數組,每一個數組元素指定一個描述符標號及其所關心的條件。定義以下:

  1. #include <sys/poll.h>  
  2. int poll (struct pollfd *fds, unsigned int nfds, int timeout);  
  3. struct pollfd {  
  4. int fd; /* file descriptor */  
  5. short events; /* requested events to watch */  
  6. short revents; /* returned events witnessed */  
  7. };  

每一個結構體的events域是由用戶來設置,告訴內核咱們關注的是什麼,而revents域是返回時內核設置的,以說明對該描述符發生了什麼事件。這點與select不一樣,select修改其參數以指示哪個描述符準備好了。在《unix環境高級編程》中有一張events取值的表,以下:

POLLIN :可讀除高優級外的數據,不阻塞

POLLRDNORM:可讀普通數據,不阻塞

POLLRDBAND:可讀O優先數據,不阻塞

POLLPRI:可讀高優先數據,不阻塞

POLLOUT :可寫普數據,不阻塞

POLLWRNORM:與POLLOUT相同

POLLWRBAND:寫非0優先數據,不阻塞

其次revents還有下面取值

POLLERR :已出錯

POLLHUP:已掛起,當以描述符被掛起後,就不能再寫向該描述符,可是仍能夠從該描述符讀取到數據。

POLLNVAL:此描述符並不引用一打開文件

對poll函數,nfds表示fds中的元素數,timeout爲超時設置,單位爲毫秒若爲0,表示不等待,爲-1表示描述符中一個已經準備好或捕捉到一個信號返回,大於0表示描述符準備好,或超時返回。函數返回值返回值若爲0,表示沒有事件發生,-1表示錯誤,並設置errno,大於0表示有幾個描述符有事件。

poll的使用和select基本相似。在此再也不介紹。poll相對因而select的優點是監聽的描述符數量沒有限制。

3、epoll學習

epoll有兩種模式,Edge Triggered(簡稱ET) 和 Level Triggered(簡稱LT).在採用這兩種模式時要注意的是,若是採用ET模式,那麼僅當狀態發生變化時纔會通知,而採用LT模式相似於原來的select/poll操做,只要還有沒有處理的事件就會一直通知.

1)epoll數據結構介紹:

  1. typedef union epoll_data  
  2. {  
  3.   void        *ptr;  
  4.   int          fd;  
  5.   __uint32_t   u32;  
  6.   __uint64_t   u64;  
  7. } epoll_data_t;  
  8.   
  9. struct epoll_event  
  10. {  
  11.   __uint32_t   events; /* Epoll events */  
  12.   epoll_data_t data;   /* User data variable */  
  13. };  

常見的事件以下:

EPOLLIN:表示對描述符的能夠讀

EPOLLOUT:表示對描述符的能夠寫

EPOLLPRI:表示對描述符的有緊急數據能夠讀

EPOLLERR:發生錯誤

EPOLLHUP:掛起

EPOLLET:邊緣觸發

EPOLLONESHOT:一次性使用,當監聽完此次事件以後,若是還須要繼續監聽這個socket的話,須要再次把這個socket加入到EPOLL隊列裏

2)函數介紹

epoll的三個函數

  1. int epoll_creae(int size);  

功能:該函數生成一個epoll專用的文件描述符

參數:size爲epoll上能關注的最大描述符數

  1. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);  

功能:用於控制某個epoll文件描述符時間,能夠註冊、修改、刪除

參數:epfd由epoll_create生成的epoll專用描述符

    op操做:EPOLL_CTL_ADD 註冊   EPOLL_CTL_MOD修改  EPOLL_DEL刪除

            fd:關聯的文件描述符

    evnet告訴內核要監聽什麼事件

  1. int epoll_wait(int epfd,struct epoll_event*events,int maxevents,int timeout);  

功能:該函數等待i/o事件的發生。

參數:epfd要檢測的句柄

    events:用於回傳待處理時間的數組

    maxevents:告訴內核這個events有多大,不能超過以前的size

    timeout:爲超時時間

使用方法參考:https://banu.com/blog/2/how-to-use-epoll-a-complete-example-in-c/epoll-example.c


epoll支持的FD上限是最大能夠打開文件的數目(select面臨這樣的問題),IO效率不隨FD數目增長而線性降低(select、poll面臨的問題)使用mmap加速內核與用戶空間的消息傳遞。如今libevent封裝了幾種的實現,能夠經過使用libevent來實現多路複用。

 本文參考:https://banu.com/blog/2/how-to-use-epoll-a-complete-example-in-c/

   http://www.ibm.com/developerworks/cn/linux/l-cn-edntwk/index.html?ca=drs-

             http://www.ibm.com/developerworks/cn/linux/l-async/

相關文章
相關標籤/搜索