linux 下 epoll 編程 Linux epoll模型

  轉載自 Linux epoll模型 ,這篇文章講的很是詳細!html

  

定義:編程

  epoll是Linux內核爲處理大批句柄而做改進的poll,是Linux下多路複用IO接口select/poll的加強版本,它能顯著的減小程序在大量併發鏈接中只有少許活躍的狀況下的系統CPU利用率。由於它會複用文件描述符集合來傳遞結果而不是迫使開發者每次等待事件以前都必須從新準備要被偵聽的文件描述符集合,另外一個緣由就是獲取事件的時候,它無須遍歷整個被偵聽的描述符集,只要遍歷那些被內核IO事件異步喚醒而加入Ready隊列的描述符集合就好了。epoll除了提供select\poll那種IO事件的電平觸發(Level Triggered)外,還提供了邊沿觸發(Edge Triggered),這就使得用戶空間程序有可能緩存IO狀態,減小epoll_wait/epoll_pwait的調用,提供應用程序的效率。緩存

工做方式:服務器

  LT(level triggered):水平觸發,缺省方式,同時支持block和no-block socket,在這種作法中,內核告訴咱們一個文件描述符是否被就緒了,若是就緒了,你就能夠對這個就緒的fd進行IO操做。若是你不做任何操做,內核仍是會繼續通知你的,因此,這種模式編程出錯的可能性較小。傳統的select\poll都是這種模型的表明。併發

  ET(edge-triggered):邊沿觸發,高速工做方式,只支持no-block socket。在這種模式下,當描述符從未就緒變爲就緒狀態時,內核經過epoll告訴你。而後它會假設你知道文件描述符已經就緒,而且不會再爲那個描述符發送更多的就緒通知,直到你作了某些操做致使那個文件描述符再也不爲就緒狀態了(好比:你在發送、接受或者接受請求,或者發送接受的數據少於必定量時致使了一個EWOULDBLOCK錯誤)。可是請注意,若是一直不對這個fs作IO操做(從而致使它再次變成未就緒狀態),內核不會發送更多的通知。異步

  區別:LT事件不會丟棄,而是隻要讀buffer裏面有數據可讓用戶讀取,則不斷的通知你。而ET則只在事件發生之時通知。socket

使用方式:svn

  一、int epoll_create(int size)函數

建立一個epoll句柄,參數size用來告訴內核監聽的數目。高併發

  二、int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

epoll事件註冊函數,

  參數epfd爲epoll的句柄;

  參數op表示動做,用3個宏來表示:EPOLL_CTL_ADD(註冊新的fd到epfd),EPOLL_CTL_MOD(修改已經註冊的fd的監聽事件),EPOLL_CTL_DEL(從epfd刪除一個fd);

  參數fd爲須要監聽的標示符;

  參數event告訴內核須要監聽的事件,event的結構以下:

struct epoll_event {
  __uint32_t events;  /* Epoll events */
  epoll_data_t data;  /* User data variable */
};

  其中events能夠用如下幾個宏的集合:

  EPOLLIN :表示對應的文件描述符能夠讀(包括對端SOCKET正常關閉)

  EPOLLOUT:表示對應的文件描述符能夠寫

  EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(這裏應該表示有帶外數據到來)

  EPOLLERR:表示對應的文件描述符發生錯誤

  EPOLLHUP:表示對應的文件描述符被掛斷;

  EPOLLET: 將EPOLL設爲邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來講的

  EPOLLONESHOT:只監聽一次事件,當監聽完此次事件以後,若是還須要繼續監聽這個socket的話,須要再次把這個socket加入到EPOLL隊列裏

三、 int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)

  等待事件的產生,相似於select()調用。參數events用來從內核獲得事件的集合,maxevents告以內核這個events有多大,這個maxevents的值不能大於建立epoll_create()時的size,參數timeout是超時時間(毫秒,0會當即返回,-1將不肯定,也有說法說是永久阻塞)。該函數返回須要處理的事件數目,如返回0表示已超時。

應用舉例:

  下面,我引用google code中別人寫的一個簡單程序來進行說明。svn路徑:http://sechat.googlecode.com/svn/trunk/

  該程序一個簡單的聊天室程序,用Linux C++寫的,服務器主要是用epoll模型實現,支持高併發,我測試在有10000個客戶端鏈接服務器的時候,server處理時間不到1秒,固然客戶端只是與服務器鏈接以後,接受服務器的歡迎消息而已,並無作其餘的通訊。雖然程序比較簡單,可是在咱們考慮服務器高併發時也提供了一個思路。在這個程序中,我已經把全部的調試信息和一些與epoll無關的信息幹掉,並添加必要的註釋,應該很容易理解。

  程序共包含2個頭文件和3個cpp文件。其中3個cpp文件中,每個cpp文件都是一個應用程序,server.cpp:服務器程序,client.cpp:單個客戶端程序,tester.cpp:模擬高併發,開啓10000個客戶端去連服務器。

  utils.h頭文件,就包含一個設置socket爲不阻塞函數,以下:

int setnonblocking(int sockfd)
{
    CHK(fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK));
    return 0;
}

  local.h頭文件,一些常量的定義和函數的聲明,以下:

 1 #include <sys/socket.h>
 2 #include <netinet/in.h>
 3 #include <stdio.h>
 4 #include <error.h>
 5 #include <errno.h>
 6 #include <unistd.h>
 7 #include <string.h>
 8 #include <stdlib.h>
 9 #include <sys/wait.h>
10 #include <limits.h>
11 #include <poll.h>
12 #include <sys/stropts.h>
13 #include <signal.h>
14 #include <fcntl.h>
15 #include <sys/epoll.h>
16 #include <time.h>
17 #include <list>
18 #include <arpa/inet.h>
19 #define BUF_SIZE 1024                 //默認緩衝區
20 #define SERVER_PORT 8888             //監聽端口
21 #define SERVER_HOST "127.0.0.1"   //服務器IP地址
22 #define EPOLL_RUN_TIMEOUT -1          //epoll的超時時間
23 #define EPOLL_SIZE 1000              //epoll監聽的客戶端的最大數目
24 
25 #define STR_WELCOME "Welcome to seChat! You ID is: Client #%d"
26 #define STR_MESSAGE "Client #%d>> %s"
27 #define STR_NOONE_CONNECTED "Noone connected to server except you!"
28 #define CMD_EXIT "EXIT"
29 
30 //兩個有用的宏定義:檢查和賦值而且檢測
31 #define CHK(eval) if(eval < 0){perror("eval"); exit(-1);}
32 #define CHK2(res, eval) if((res = eval) < 0){perror("eval"); exit(-1);}
33 
34 //================================================================================================
35 //函數名:                  setnonblocking
36 //函數描述:                設置socket爲不阻塞
37 //輸入:                    [in] sockfd socket標示符
38 //輸出:                    無
39 //返回:                    0
40 //================================================================================================
41 int setnonblocking(int sockfd);
42 
43 //================================================================================================
44 //函數名:                  handle_message
45 //函數描述:                處理每一個客戶端socket
46 //輸入:                    [in] new_fd socket標示符
47 //輸出:                    無
48 //返回:                    返回從客戶端接受的數據的長度
49 //================================================================================================
50 int handle_message(int new_fd);

  server.cpp文件,epoll模型就在這裏實現,以下:

  1 #include "local.h"
  2 #include "utils.h"
  3 
  4 using namespace std;
  5 
  6 // 存放客戶端socket描述符的list
  7 list<int> clients_list;
  8 
  9 int main(int argc, char *argv[])
 10 {
 11     int listener;   //監聽socket
 12     struct sockaddr_in addr, their_addr;  
 13     addr.sin_family = PF_INET;
 14     addr.sin_port = htons(SERVER_PORT);
 15     addr.sin_addr.s_addr = inet_addr(SERVER_HOST);
 16     socklen_t socklen;
 17     socklen = sizeof(struct sockaddr_in);
 18 
 19     static struct epoll_event ev, events[EPOLL_SIZE];
 20     ev.events = EPOLLIN | EPOLLET;     //對讀感興趣,邊沿觸發
 21 
 22     char message[BUF_SIZE];
 23 
 24     int epfd;  //epoll描述符
 25     clock_t tStart;  //計算程序運行時間
 26 
 27     int client, res, epoll_events_count;
 28 
 29     CHK2(listener, socket(PF_INET, SOCK_STREAM, 0));             //初始化監聽socket
 30     setnonblocking(listener);                                    //設置監聽socket爲不阻塞
 31     CHK(bind(listener, (struct sockaddr *)&addr, sizeof(addr))); //綁定監聽socket
 32     CHK(listen(listener, 1));                                    //設置監聽
 33 
 34     CHK2(epfd,epoll_create(EPOLL_SIZE));                         //建立一個epoll描述符,並將監聽socket加入epoll
 35     ev.data.fd = listener;
 36     CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, listener, &ev));
 37 
 38     while(1)
 39     {
 40         CHK2(epoll_events_count,epoll_wait(epfd, events, EPOLL_SIZE, EPOLL_RUN_TIMEOUT));
 41         tStart = clock();
 42         for(int i = 0; i < epoll_events_count ; i++)
 43         {
 44             if(events[i].data.fd == listener)                    //新的鏈接到來,將鏈接添加到epoll中,併發送歡迎消息
 45             {
 46                 CHK2(client,accept(listener, (struct sockaddr *) &their_addr, &socklen));
 47                 setnonblocking(client);
 48                 ev.data.fd = client;
 49                 CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, client, &ev));
 50 
 51                 clients_list.push_back(client);                  // 添加新的客戶端到list
 52                 bzero(message, BUF_SIZE);
 53                 res = sprintf(message, STR_WELCOME, client);
 54                 CHK2(res, send(client, message, BUF_SIZE, 0));
 55 
 56             }else 
 57             {
 58                 CHK2(res,handle_message(events[i].data.fd)); //注意:這裏並無調用epoll_ctl從新設置socket的事件類型,但仍是能夠繼續收到客戶端發送過來的信息
 59             }
 60         }
 61         printf("Statistics: %d events handled at: %.2f second(s)\n", epoll_events_count, (double)(clock() - tStart)/CLOCKS_PER_SEC);
 62     }
 63 
 64     close(listener);
 65     close(epfd);
 66 
 67     return 0;
 68 }
 69 
 70 int handle_message(int client)  
 71 {
 72     char buf[BUF_SIZE], message[BUF_SIZE];
 73     bzero(buf, BUF_SIZE);
 74     bzero(message, BUF_SIZE);
 75 
 76     int len;
 77 
 78     CHK2(len,recv(client, buf, BUF_SIZE, 0));  //接受客戶端信息
 79 
 80     if(len == 0)   //客戶端關閉或出錯,關閉socket,並從list移除socket
 81     {
 82         CHK(close(client));
 83         clients_list.remove(client);
 84     }
 85     else          //向客戶端發送信息
 86     { 
 87         if(clients_list.size() == 1) 
 88         { 
 89             CHK(send(client, STR_NOONE_CONNECTED, strlen(STR_NOONE_CONNECTED), 0));
 90                 return len;
 91         }
 92         
 93         sprintf(message, STR_MESSAGE, client, buf);
 94         list<int>::iterator it;
 95         for(it = clients_list.begin(); it != clients_list.end(); it++)
 96         {
 97            if(*it != client)
 98            { 
 99                 CHK(send(*it, message, BUF_SIZE, 0));
100            }
101         }
102     }
103 
104     return len;
105 }

 

  tester.cpp文件,模擬服務器的高併發,開啓10000個客戶端去鏈接服務器,以下:

 1 #include "local.h"
 2 #include "utils.h"
 3 
 4 using namespace std;
 5 
 6 char message[BUF_SIZE];     //接受服務器信息
 7 list<int> list_of_clients;  //存放全部客戶端
 8 int res;
 9 clock_t tStart;
10 
11 int main(int argc, char *argv[])
12 {
13     int sock; 
14     struct sockaddr_in addr;
15     addr.sin_family = PF_INET;
16     addr.sin_port = htons(SERVER_PORT);
17     addr.sin_addr.s_addr = inet_addr(SERVER_HOST);
18     
19     tStart = clock();
20 
21     for(int i=0 ; i<EPOLL_SIZE; i++)  //生成EPOLL_SIZE個客戶端,這裏是10000個,模擬高併發
22     {
23        CHK2(sock,socket(PF_INET, SOCK_STREAM, 0));
24        CHK(connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0);
25        list_of_clients.push_back(sock);
26 
27        bzero(&message, BUF_SIZE);
28        CHK2(res,recv(sock, message, BUF_SIZE, 0));
29        printf("%s\n", message);
30     }
31    
32     list<int>::iterator it;          //移除全部客戶端
33     for(it = list_of_clients.begin(); it != list_of_clients.end() ; it++)
34        close(*it);
35 
36     printf("Test passed at: %.2f second(s)\n", (double)(clock() - tStart)/CLOCKS_PER_SEC); 
37     printf("Total server connections was: %d\n", EPOLL_SIZE);
38     
39     return 0;
40 }

 

  我就不給出程序的執行結果的截圖了,不過下面這張截圖是代碼做者本身測試的,能夠看出,併發10000無壓力呀 

  單個客戶端去鏈接服務器,client.cpp文件,以下:

  1 #include "local.h"
  2 #include "utils.h"
  3 
  4 using namespace std;
  5 
  6 char message[BUF_SIZE];
  7 
  8 /*
  9     流程:
 10         調用fork產生兩個進程,兩個進程經過管道進行通訊
 11         子進程:等待客戶輸入,並將客戶輸入的信息經過管道寫給父進程
 12         父進程:接受服務器的信息並顯示,將從子進程接受到的信息發送給服務器
 13 */
 14 int main(int argc, char *argv[])
 15 {
 16     int sock, pid, pipe_fd[2], epfd;
 17 
 18     struct sockaddr_in addr;
 19     addr.sin_family = PF_INET;
 20     addr.sin_port = htons(SERVER_PORT);
 21     addr.sin_addr.s_addr = inet_addr(SERVER_HOST);
 22 
 23     static struct epoll_event ev, events[2]; 
 24     ev.events = EPOLLIN | EPOLLET;
 25 
 26     //退出標誌
 27     int continue_to_work = 1;
 28 
 29     CHK2(sock,socket(PF_INET, SOCK_STREAM, 0));
 30     CHK(connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0);
 31 
 32     CHK(pipe(pipe_fd));
 33     
 34     CHK2(epfd,epoll_create(EPOLL_SIZE));
 35 
 36     ev.data.fd = sock;
 37     CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev));
 38 
 39     ev.data.fd = pipe_fd[0];
 40     CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_fd[0], &ev));
 41 
 42     // 調用fork產生兩個進程
 43     CHK2(pid,fork());
 44     switch(pid)
 45     {
 46         case 0:                   // 子進程
 47             close(pipe_fd[0]);    // 關閉讀端
 48             printf("Enter 'exit' to exit\n");
 49             while(continue_to_work)
 50             {
 51                 bzero(&message, BUF_SIZE);
 52                 fgets(message, BUF_SIZE, stdin);
 53 
 54                 // 當收到exit命令時,退出
 55                 if(strncasecmp(message, CMD_EXIT, strlen(CMD_EXIT)) == 0)
 56                 {
 57                     continue_to_work = 0;
 58                 }
 59                 else
 60                 {            
 61                     CHK(write(pipe_fd[1], message, strlen(message) - 1));
 62                 }
 63             }
 64             break;
 65         default:                 // 父進程
 66             close(pipe_fd[1]);   // 關閉寫端
 67             int epoll_events_count, res;
 68             while(continue_to_work) 
 69             {
 70                 CHK2(epoll_events_count,epoll_wait(epfd, events, 2, EPOLL_RUN_TIMEOUT));
 71 
 72                 for(int i = 0; i < epoll_events_count ; i++)
 73                 {
 74                     bzero(&message, BUF_SIZE);
 75                     if(events[i].data.fd == sock)   //從服務器接受信息
 76                     {
 77                         CHK2(res,recv(sock, message, BUF_SIZE, 0));
 78                         if(res == 0)               //服務器已關閉
 79                         {
 80                             CHK(close(sock));
 81                             continue_to_work = 0;
 82                         }
 83                         else 
 84                         {
 85                             printf("%s\n", message);
 86                         }
 87                     }
 88                     else        //從子進程接受信息
 89                     {
 90                         CHK2(res, read(events[i].data.fd, message, BUF_SIZE));
 91                         if(res == 0)
 92                         {
 93                             continue_to_work = 0; 
 94                         }
 95                         else
 96                         {
 97                             CHK(send(sock, message, BUF_SIZE, 0));
 98                         }
 99                     }
100                 }
101             }
102     }
103     if(pid)
104     {
105         close(pipe_fd[0]);
106         close(sock);
107     }else
108     {
109         close(pipe_fd[1]);
110     }
111 
112     return 0;
113 }
相關文章
相關標籤/搜索