IO多路複用之epoll總結

一、基本知識html

  epoll是在2.6內核中提出的(mac沒有),是以前的select和poll的加強版本。相對於select和poll來講,epoll更加靈活,沒有描述符限制。epoll使用一個文件描述符管理多個描述符,將用戶關係的文件描述符的事件存放到內核的一個事件表中,這樣在用戶空間和內核空間的copy 只需一次。linux

==>內核事件表??
服務器

二、epoll接口併發

  epoll操做過程須要三個接口,分別以下:llinux 系統 $man  epoll 查看是否支持epoll
curl

#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

(1) int epoll_create(int size);
   建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大。這個參數不一樣於select()中的第一個參數,給出最大監聽的fd+1的值。須要注意的是,當建立好epoll句柄後,它就是會佔用一個fd值,在linux下若是查看/proc/進程id/fd/,是可以看到這個fd的,所 以在使用完epoll後,必須調用close()關閉,不然可能致使fd被耗盡。socket

    ==》 cd /proc/進程id/fd         //fd 目錄存儲進程佔用的文件描述符tcp

2)int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  epoll的事件註冊函數,它不一樣與select()是在監聽事件時告訴內核要監聽什麼類型的事件epoll的事件註冊函數,它不一樣與select()是監聽事件時告訴內核要監聽什麼類型的事件,而是在這裏先註冊要監聽的事件類型。第一個參數是epoll_create()的返回值,函數

第二個參數表示動做,用三個宏來表示:
EPOLL_CTL_ADD:註冊新的fd到epfd中;
EPOLL_CTL_MOD:修改已經註冊的fd的監聽事件;
EPOLL_CTL_DEL:從epfd中刪除一個fd;
第三個參數是須要監聽的fd(文件,socket, STDIN_FILENO....)
測試

第四個參數是告訴內核須要監聽什麼事,struct epoll_event結構以下:ui

typedef union epoll_data{
  void        *ptr;
  int          fd;
  __uint32_t   u32;
  __uint64_t   u64;
} epoll_data_t;

struct epoll_event {
  __uint32_t events;  /* Epoll events */
  epoll_data_t data;  /* User data variable */
};

events能夠是如下幾個宏的集合:
EPOLLIN :表示對應的文件描述符能夠讀(包括對端SOCKET正常關閉);
EPOLLOUT:表示對應的文件描述符能夠寫;
EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(這裏應該表示有帶外數據到來);
EPOLLERR:表示對應的文件描述符發生錯誤;
EPOLLHUP:表示對應的文件描述符被掛斷;
EPOLLET將EPOLL設爲邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來講的。
EPOLLONESHOT:只監聽一次事件,當監聽完此次事件以後,若是還須要繼續監聽這個socket的話,須要再次把這個socket加入到EPOLL隊列裏

struct epoll_event ev;  // init a  event
ev.events = EPOLLIN;
ev.data.fd = fd;

3) int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
   等待事件的產生,相似於select()調用。參數events用來從內核獲得事件的集合,maxevents告以內核這個events有多大,這個 maxevents的值不能大於建立epoll_create()時的size,參數timeout是超時時間(毫秒,0會當即返回,-1將不肯定,也有 說法說是永久阻塞)。該函數返回須要處理的事件數目,如返回0表示已超時。

三、工做模式

  epoll對文件描述符的操做有兩種模式:LT(level trigger)和ET(edge trigger)。LT模式是默認模式,LT模式與ET模式的區別以下:

  LT模式:當epoll_wait檢測到描述符事件發生並將此事件通知應用程序,應用程序能夠不當即處理該事件。下次調用epoll_wait時,會再次響應應用程序並通知此事件。

  ET模式:當epoll_wait檢測到描述符事件發生並將此事件通知應用程序,應用程序必須當即處理該事件。若是不處理,下次調用epoll_wait時,不會再次響應應用程序並通知此事件。

  ET模式在很大程度上減小了epoll事件被重複觸發的次數,所以效率要比LT模式高。epoll工做在ET模式的時候,必須使用非阻塞套接口,以免因爲一個文件句柄的阻塞讀/阻塞寫操做把處理多個文件描述符的任務餓死。

四、測試程序

  編寫一個服務器回射程序echo,練習epoll過程。

服務器代碼以下所示:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h>

#define IPADDRESS   "127.0.0.1"
#define PORT        8787

#define MAXSIZE     1024   //read  maxsize
#define LISTENQ     5      //

#define FDSIZE      1000   
#define EPOLLEVENTS 100    

//綁定socket
static int socket_bind(const char* ip, int port);
//IO多路服用處理epoll
static void do_epoll(int listenfd);

//處理響應的事件
static void handle_events(int epollfd,struct epoll_event* events, int num,int listenfd, char* buf);

//讀、寫、監聽事件
static void  handle_accept(int epollfd, int listenfd);
static void do_read(int epollfd, int listenfd, char* buf);
static void do_write(int epollfd, int listenfd, char* buf);

//註冊監聽fd
static void add_event(int epollfd, int fd, int state);
static void modify_event(int epollfd, int fd, int state);
static void delete_event(int epollfd, int fd, int state);


// change epoll contains  differerents listenfd
int main(int argc, char** argv)
{
    int listenfd;
    listenfd = socket_bind(IPADDRESS, PORT);
    listen(listenfd, LISTENQ);

    do_epoll(listenfd);

    return 0;
}

static int socket_bind(const char* ip, int port)
{
    int listenfd;
    struct sockaddr_in servaddr;
    listenfd = socket(AF_INET, SOCK_STREAM,0);
    if(listenfd == -1)
    {
       perror("socket error.\n");
       exit(0);
    }
    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    inet_pton(AF_INET, ip,&servaddr.sin_addr);
    servaddr.sin_port = htons(port);
    
    if(bind(listenfd, (struct sockaddr*)&servaddr,sizeof(servaddr)) == -1)
    {
        perror("bind error.\n");
        exit(1);
    }
    return listenfd;
}

static void do_epoll(int listenfd)
{
     int epollfd;
     struct epoll_event events[EPOLLEVENTS];
     int ret;
     char buf[MAXSIZE];
     memset(buf,0,MAXSIZE);

     epollfd = epoll_create(FDSIZE);
     add_event(epollfd, listenfd, EPOLLIN);  //本地端口監聽時間,可讀

     for(;;)
     {
         ret = epoll_wait(epollfd, events,EPOLLEVENTS,-1);
         handle_events(epollfd, events,ret, listenfd,buf);
     }
     close(epollfd);    //關閉epollfd,
}
static void 
handle_events(int epollfd, struct epoll_event* events, int num, int listenfd, char* buf)
{
     int i;
     int fd;
     for( i=0; i<num; i++)
     {
        fd = events[i].data.fd;
        if( (fd == listenfd) && (events[i].events & EPOLLIN))  //處理接受連接事件
              handle_accept(epollfd, listenfd);
        else if (events[i].events & EPOLLIN)
             do_read(epollfd, fd, buf);
        else if (events[i].events & EPOLLOUT)
             do_write(epollfd, fd, buf);        //echo
     }
    
}
static void handle_accept(int epollfd, int listenfd)
{
   int clifd;
   struct sockaddr_in cliaddr;
   socklen_t cliaddrlen;


      //若是不初始化,可能出現arguent error
     bzero(&cliaddr,sizeof(cliaddr));
     cliaddrlen = sizeof(cliaddr);

   clifd = accept(listenfd, (struct sockaddr*)&cliaddr, &cliaddrlen);
   if(clifd == -1)
       perror("accept error.\n");
   else
   {
       printf("accept a new client:%s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);
       add_event(epollfd, clifd, EPOLLIN);   //從cliaddr端口, 可讀
   }
}
static void do_read(int epollfd, int fd, char* buf)
{
    int nread;
    nread = read(fd, buf, MAXSIZE);
    if(nread == -1)
    {
       perror("read error.\n");
       close(fd);
       delete_event(epollfd,fd, EPOLLIN);
    }
    else if (nread == 0)
    {
       fprintf(stderr,"client close.\n");
       close(fd);
       delete_event(epollfd, fd, EPOLLIN);
    }
    else
    {
        printf("read message is: %s",buf);
        modify_event(epollfd, fd, EPOLLOUT);   //實現echo處理
    }
}
static void do_write(int epollfd, int fd, char* buf)
{
    int nwrite;
    nwrite = write(fd, buf, strlen(buf));
    if(nwrite == -1)
    {
       perror("write error:");
       close(fd);
       delete_event(epollfd, fd, EPOLLOUT);
    }
    else
    {
        modify_event(epollfd, fd, EPOLLIN);
        memset(buf, 0, MAXSIZE);  //寫完清空
    }
}

static void add_event(int epollfd, int fd, int state)
{
     struct epoll_event ev;
     ev.events = state;
     ev.data.fd = fd; 
     epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
}
static void modify_event(int epollfd, int fd, int state)
{
     struct epoll_event ev;
     ev.events = state;
     ev.data.fd = fd;
     epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev);
}
static void delete_event(int epollfd, int fd, int state)
{
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev);
}

客戶端也用epoll實現,控制STDIN_FILENO、STDOUT_FILENO、和sockfd三個描述符,程序以下所示:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

#include <netinet/in.h>
#include <sys/epoll.h>
#include <time.h>
#include <unistd.h>
#include <sys/types.h>
#include <arpa/inet.h>


#define MAXSIZE      1024
#define IPADDRESS    "127.0.0.1"
#define SERV_PORT    8787
#define FDSIZE       1024
#define EPOLLEVENTS  20


static void handle_connection(int sockfd);
static void 
handle_events(int epollfd, struct epoll_event* event, int num, int sockfd, char* buf);


static void do_read(int epollfd, int fd, int sockfd, char* buf);
static void do_write(int epollfd, int fd, int sockfd, char* buf);

static void add_event(int epollfd, int fd, int state);
static void modify_event(int epollfd, int fd, int state);
static void delete_event(int epollfd, int fd, int state);


int main(int argc, char**argv)
{
    int  sockfd;
    struct sockaddr_in   servaddr;
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    bzero(&servaddr, sizeof(servaddr));
    
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(SERV_PORT);
    inet_pton(AF_INET, IPADDRESS, &servaddr.sin_addr);
    
    connect(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr));   //連接服務器
   
    handle_connection(sockfd);  //處理連接
    
    close(sockfd);
    return 0;
}

static void handle_connection(int sockfd)
{
    int epollfd;
    struct epoll_event events[EPOLLEVENTS];
    char buf[MAXSIZE];
    int ret;
   
    epollfd = epoll_create(FDSIZE);
    add_event(epollfd, STDIN_FILENO, EPOLLIN);   //從標準輸入讀取
  
    for(;;)
    {
       ret = epoll_wait(epollfd, events, EPOLLEVENTS, -1);
       handle_events(epollfd, events, ret, sockfd, buf);
    }

    close(epollfd);  //關閉epollfd
}

static void 
handle_events(int epollfd, struct epoll_event* events, int num, int sockfd, char* buf)
{
    int fd;
    int i;
    for(i=0; i<num; i++)
    {
        fd = events[i].data.fd;
        if(events[i].events & EPOLLIN)
             do_read(epollfd, fd, sockfd, buf);
        else if(events[i].events & EPOLLOUT)
             do_write(epollfd, fd, sockfd, buf);     
    }
}

static void do_read(int epollfd, int fd, int sockfd, char* buf)
{
    int nread;
    nread = read(fd, buf, MAXSIZE);
    if(nread == -1)
    {
        perror("read error.\n");
        close(fd);
    }
    else if(nread == 0)
    {
        fprintf(stderr,"server close.\n");
        close(fd);
    }
    else
    {
         if(fd == STDIN_FILENO)
               add_event(epollfd, sockfd, EPOLLOUT); //從STDIN_FILENO 讀取結束後,準備向sockfd寫入
         else
         {
               delete_event(epollfd, sockfd, EPOLLIN); //從sockfd讀取結束,
               add_event(epollfd, STDOUT_FILENO, EPOLLOUT);  //添加STDOUT_FILENO輸出
         }
    }
}
static void do_write(int epollfd, int fd, int sockfd, char* buf)
{
    int nwrite;
    nwrite = write(fd, buf, strlen(buf));  //strlen() 字符串未滿
    
    if(nwrite == -1)
    {
        perror("write error.\n");
        close(fd);
    }
    else
    {
       if(fd == STDOUT_FILENO)
            delete_event(epollfd, fd, EPOLLOUT);
       else
            modify_event(epollfd, fd, EPOLLIN);  //向sockfd寫完,添加等待讀取事件
    }
    memset(buf,0,MAXSIZE);  //清空
}


static void add_event(int epollfd, int fd, int state)
{
   struct epoll_event ev;
   ev.events = state;
   ev.data.fd = fd;
   epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
}
static void modify_event(int epollfd, int fd, int state)
{
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev);
}
static void delete_event(int epollfd, int fd, int state)
{
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev);
}

參考資料:

http://www.cnblogs.com/OnlyXP/archive/2007/08/10/851222.html

http://www.cnblogs.com/lexus/archive/2011/11/19/2254798.html

https://banu.com/blog/2/how-to-use-epoll-a-complete-example-in-c/



epoll 實現的同時處理 監聽TCP,UDP的 併發 服務器代碼:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>

#include <unistd.h>
#include <sys/types.h>
#include <errno.h>

#define IPADDRESS   "127.0.0.1"
#define TCP_PORT    8787
#define UDP_PORT    8989

#define MAXSIZE     1024
#define LISTENQ     5

#define FDSIZE      1000
#define EPOLLEVENTS 100


static void 
handle_events(int epollfd,struct epoll_event* events, int num, int tcpfd, int udpfd, char* buf);

static void  handle_accept(int epollfd, int tcpfd);

static void do_read(int epollfd, int listenfd, char* buf);
static void do_write(int epollfd, int listenfd, char* buf);

//處理數據轉發
static void tcp_read(int epollfd, int fd, char* buf);
static void udp_read(int epollfd, int fd, char* buf);

static void add_event(int epollfd, int fd, int state);
static void modify_event(int epollfd, int fd, int state);
static void delete_event(int epollfd, int fd, int state);


// change epoll contains  differerents listenfd
int main(int argc, char** argv)
{
    int tcpfd,udpfd;
    struct sockaddr_in servaddr,addr;

    int epollfd;
    struct epoll_event events[EPOLLEVENTS];
    
    int ret;
    char buf[MAXSIZE];
    memset(buf,0,MAXSIZE);
    
    //tcp socket 
    tcpfd = socket(AF_INET, SOCK_STREAM,0);
    if(tcpfd == -1)
    {
       perror("tcp socket error.\n");
       exit(0);
    }
    
    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    inet_pton(AF_INET, IPADDRESS,&servaddr.sin_addr);
    servaddr.sin_port = htons(TCP_PORT);
    
    if(bind(tcpfd, (struct sockaddr*)&servaddr,sizeof(servaddr)) == -1)
    {
        perror("bind tcpfd error.\n");
        exit(1);
    }

    //udp socket
    udpfd = socket(AF_INET, SOCK_DGRAM,0);
    if(udpfd == -1)
    {
       perror("udp socket error.\n");
       exit(1);
    }
  
    bzero(&addr, sizeof(addr));
    addr.sin_family = AF_INET;
    inet_pton(AF_INET, IPADDRESS, &addr.sin_addr);
    addr.sin_port = htons(UDP_PORT);
    
    if(bind(udpfd, (struct sockaddr*)&addr, sizeof(addr)) == -1)
    {
        perror("bind udpfd error.\n");
        exit(1);
    }

   /*
     while(1)
    {
           memset(buf,0,MAXSIZE);
           if(recvfrom(udpfd, buf, MAXSIZE, 0, NULL, NULL) == -1)
           {
                printf("recv error.\n");
                exit(1);
           }
           printf("recv:%s\n",buf);
    }
   */
   
    listen(tcpfd, LISTENQ);
    
    epollfd = epoll_create(FDSIZE);
    add_event(epollfd, tcpfd, EPOLLIN); 
    add_event(epollfd, udpfd, EPOLLIN);

    for(;;)
    {
        ret = epoll_wait(epollfd, events,EPOLLEVENTS,-1);
        handle_events(epollfd, events, ret, tcpfd, udpfd,buf);
    }    
    close(epollfd);
    return 0;
}


static void 
handle_events(int epollfd, struct epoll_event* events, int num, int tcpfd, int udpfd, char* buf)
{
     int i;
     int fd;
     for( i=0; i<num; i++)
     {
        
        fd = events[i].data.fd;
        printf("num:%d,fd:%d,tcpfd:%d,udp:%d\n",num,fd,tcpfd,udpfd);
        if((fd == tcpfd) && (events[i].events & EPOLLIN))
        {     //獲取新的TCP連接
              handle_accept(epollfd, tcpfd);
        }
        else if((fd == udpfd) && (events[i].events & EPOLLIN))
        {
              //接收UDP數據
              udp_read(epollfd, fd, buf);
        }
        else if((fd == udpfd) && (events[i].events & EPOLLOUT))
        {
              //udp端口echo
        }   
        else if ((fd != udpfd) && (events[i].events & EPOLLIN))
        {
             //接受普通端口數據 
             tcp_read(epollfd, fd, buf);
        }
        else if ((fd != udpfd) && (events[i].events & EPOLLOUT))
        {
            //do_write(epollfd, fd, buf);
        } 
        else
        {
            //處理其餘事件
        }
     }  
}

static void handle_accept(int epollfd, int listenfd)
{
   int clifd;
   struct sockaddr_in cliaddr;
   socklen_t cliaddrlen;

   //若是不初始化,可能出現arguent error
   bzero(&cliaddr,sizeof(cliaddr));
   cliaddrlen = sizeof(cliaddr);
 
   clifd = accept(listenfd, (struct sockaddr*)&cliaddr, &cliaddrlen);
   if(clifd == -1)
       perror("accept error.\n");
   else
   {
       printf("accept a new tcp:%s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);
       add_event(epollfd, clifd, EPOLLIN);   
   }
}

static void tcp_read(int epollfd, int fd, char* buf)
{
     int nread;
     nread = read(fd, buf, MAXSIZE);
     if(nread <= 0)
     {
           fprintf(stderr,"tcp error.\n");
           close(fd);
           delete_event(epollfd, fd, EPOLLIN);
     }
     else
     {
           //to-do curl
           printf("recv tcp: %s\n",buf);       
     }
    // close(fd);
    // delete_event(epollfd, fd, EPOLLIN);
    // add_event(epollfd, fd, EPOLLIN);
    memset(buf,0,MAXSIZE);
}
static void udp_read(int epollfd, int fd, char* buf)
{
     int nread;
     struct sockaddr_in cliaddr;
     socklen_t     cliaddrlen;
     
   //若是不初始化,可能出現arguent error
     bzero(&cliaddr,sizeof(cliaddr));
     cliaddrlen = sizeof(cliaddr);
     
     nread=recvfrom(fd, buf, MAXSIZE,0,(struct sockaddr*)&cliaddr, &cliaddrlen);
     if(nread <= 0)
     {
         fprintf(stderr,"udp error.\n");
     }
     else
     {
         //to-do curl
         printf("recv udp: %s\n",buf);
     }
     memset(buf,0,MAXSIZE);    
}


static void do_read(int epollfd, int fd, char* buf)
{
    int nread;
    nread = read(fd, buf, MAXSIZE);
    if(nread == -1)
    {
       perror("read error.\n");
       close(fd);
       delete_event(epollfd,fd, EPOLLIN);
    }
    else if (nread == 0)
    {
       fprintf(stderr,"client close.\n");
       close(fd);
       delete_event(epollfd, fd, EPOLLIN);
    }
    else
    {
        printf("read message is: %s",buf);
        modify_event(epollfd, fd, EPOLLOUT);
    }
}
static void do_write(int epollfd, int fd, char* buf)
{
    int nwrite;
    nwrite = write(fd, buf, strlen(buf));
    if(nwrite == -1)
    {
       perror("write error:");
       close(fd);
       delete_event(epollfd, fd, EPOLLOUT);
    }
    else
    {
        modify_event(epollfd, fd, EPOLLIN);
    }
    memset(buf, 0, MAXSIZE);
}

static void add_event(int epollfd, int fd, int state)
{
     struct epoll_event ev;
     ev.events = state;
     ev.data.fd = fd; 
     epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
}
static void modify_event(int epollfd, int fd, int state)
{
     struct epoll_event ev;
     ev.events = state;
     ev.data.fd = fd;
     epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev);
}
static void delete_event(int epollfd, int fd, int state)
{
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev);
}
相關文章
相關標籤/搜索