多路複用——epoll

一、基本知識html

  epoll是在2.6內核中提出的,是以前的select和poll的加強版本。相對於select和poll來講,epoll更加靈活,沒有描述符限制。epoll使用一個文件描述符管理多個描述符,將用戶關係的文件描述符的事件存放到內核的一個事件表中,這樣在用戶空間和內核空間的copy只需一次。linux


二、epoll函數
android

  epoll操做過程須要三個接口,分別以下:windows

#include <sys/epoll.h>int epoll_create(int size);int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

(1) int epoll_create(int size);
  建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大。這個參數不一樣於select()中的第一個參數,給出最大監聽的fd+1的值。須要注意的是,當建立好epoll句柄後,它就是會佔用一個fd值,在linux下若是查看/proc/進程id/fd/,是可以看到這個fd的,因此在使用完epoll後,必須調用close()關閉,不然可能致使fd被耗盡。多線程

(2)int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  epoll的事件註冊函數,它不一樣與select()是在監聽事件時告訴內核要監聽什麼類型的事件epoll的事件註冊函數,它不一樣與select()是在監聽事件時告訴內核要監聽什麼類型的事件,而是在這裏先註冊要監聽的事件類型。第一個參數是epoll_create()的返回值,第二個參數表示動做,用三個宏來表示:

EPOLL_CTL_ADD:註冊新的fd到epfd中;
EPOLL_CTL_MOD:修改已經註冊的fd的監聽事件;
EPOLL_CTL_DEL:從epfd中刪除一個fd;
第三個參數是須要監聽的fd,第四個參數是告訴內核須要監聽什麼事,struct epoll_event結構以下:
併發

struct epoll_event {
  __uint32_t events;  /* Epoll events */
  epoll_data_t data;  /* User data variable */};

events能夠是如下幾個宏的集合:
EPOLLIN :表示對應的文件描述符能夠讀(包括對端SOCKET正常關閉);
EPOLLOUT:表示對應的文件描述符能夠寫;
EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(這裏應該表示有帶外數據到來);
EPOLLERR:表示對應的文件描述符發生錯誤;
EPOLLHUP:表示對應的文件描述符被掛斷;
EPOLLET: 將EPOLL設爲邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來講的。
EPOLLONESHOT:只監聽一次事件,當監聽完此次事件以後,若是還須要繼續監聽這個socket的話,須要再次把這個socket加入到EPOLL隊列裏
app

(3) int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
  等待事件的產生,相似於select()調用。參數events用來從內核獲得事件的集合,maxevents告以內核這個events有多大,這個maxevents的值不能大於建立epoll_create()時的size,參數timeout是超時時間(毫秒,0會當即返回,-1將不肯定,也有說法說是永久阻塞)。該函數返回須要處理的事件數目,如返回0表示已超時。
socket


三、工做模式
ide

  epoll對文件描述符的操做有兩種模式:LT(level trigger)和ET(edge trigger)。LT模式是默認模式,LT模式與ET模式的區別以下:函數

  LT模式:當epoll_wait檢測到描述符事件發生並將此事件通知應用程序,應用程序能夠不當即處理該事件。下次調用epoll_wait時,會再次響應應用程序並通知此事件。

  ET模式:當epoll_wait檢測到描述符事件發生並將此事件通知應用程序,應用程序必須當即處理該事件。若是不處理,下次調用epoll_wait時,不會再次響應應用程序並通知此事件。

  ET模式在很大程度上減小了epoll事件被重複觸發的次數,所以效率要比LT模式高。epoll工做在ET模式的時候,必須使用非阻塞套接口,以免因爲一個文件句柄的阻塞讀/阻塞寫操做把處理多個文件描述符的任務餓死。


四、原理

1)調用epoll_create時,作了如下事情:

    內核幫咱們在epoll文件系統裏建了個file結點;

    在內核cache裏建了個紅黑樹用於存儲之後epoll_ctl傳來的socket;

    創建一個list鏈表,用於存儲準備就緒的事件。

2)調用epoll_ctl時,作了如下事情:

     把socket放到epoll文件系統裏file對象對應的紅黑樹上;

     給內核中斷處理程序註冊一個回調函數,告訴內核,若是這個句柄的中斷到了,就把它放到準備就緒list鏈表裏。

3)調用epoll_wait時,作了如下事情:

      觀察list鏈表裏有沒有數據。有數據就返回,沒有數據就sleep,等到timeout時間到後即便鏈表沒數據也返回。並且,一般狀況下即便咱們要監控百萬計的句柄,大多一次也只返回不多量的準備就緒句柄而已,因此,epoll_wait僅須要從內核態copy少許的句柄到用戶態而已。

總結以下:

         一顆紅黑樹,一張準備就緒句柄鏈表,少許的內核cache,解決了大併發下的socket處理問題。

         執行epoll_create時,建立了紅黑樹和就緒鏈表; 

          執行epoll_ctl時,若是增長socket句柄,則檢查在紅黑樹中是否存在,存在當即返回,不存在則添加到樹幹上,而後向內核註冊回調函數,用於當中斷事件來臨時向

          準備就緒鏈表中插入數據; 

           執行epoll_wait時馬上返回準備就緒鏈表裏的數據便可。

  兩種模式的實現:

           當一個socket句柄上有事件時,內核會把該句柄插入上面所說的準備就緒list鏈表,這時咱們調用epoll_wait,會把準備就緒的socket拷貝到用戶態內存,而後清空準備就緒list鏈表,最後,epoll_wait檢查這些socket,若是是LT模式,而且這些socket上確實有未處理的事件時,又把該句柄放回到剛剛清空的準備就緒鏈表。因此,LT模式的句柄,只要它上面還有事件,epoll_wait每次都會返回。


五、epoll的優勢

    1. 自己沒有最大併發鏈接的限制,僅受系統中進程能打開的最大文件數目限制;

    2. 效率提高:只有活躍的socket纔會主動的去調用callback函數;

    3. 省去沒必要要的內存拷貝:epoll經過內核與用戶空間mmap同一塊內存實現。

固然,以上的優缺點僅僅是特定場景下的狀況:高併發,且任一時間只有少數socket是活躍的。若是在併發量低,socket都比較活躍的狀況下,select就不見得比epoll慢了(就像咱們經常說快排比插入排序快,可是在特定狀況下這並不成立)。


六、epoll的缺點

1. 相對select來講, epoll的跨平臺性不夠用 只能工做在linux下, 而select能夠在windows linux apple上使用, 還有手機端android iOS之類的均可以. android雖然是linux的 內核 但早期版本一樣不支持epoll

 2. 相對select來講 仍是用起來仍是複雜了一些, 不過和IOCP比起來 增長了一點點的複雜度卻基本上達到了IOCP的併發量和性能, 而複雜度遠遠小於IOCP.

3. 相對IOCP來講 對多核/多線程的支持不夠好, 性能也所以在性能要求比較苛刻的狀況下不如IOCP.


七、epoll實現的TCP server

代碼:

#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <string.h>
static void my_read(int epfd,int fd,char *buf,int len)
{
  int rs=1;
  while(rs)
  {
    ssize_t size=recv(fd,buf,len,0);
    if(size<0)
    {
      if(errno=EAGAIN)
      {
        break;
      }
      else
      {
        perror("recv");
        return 9;
      }
    }
    else if(size==0)
    {
      //表示對端的sock已經正常關閉
      printf("client close ...\n");
      struct epoll_event ev;
      epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);
      close(fd);
      break;
    }
    else{
      buf[size-1]='\0';
      printf("client # %s\n",buf);
      if(size==len)
      {
        rs=1;
      }
      else
        rs=0;
    }
  }
  
}
static void my_write(int fd,char *buf,int len) 
{
  int ws=1;
  while(ws)
  {
    ssize_t size=send(fd,buf,len,0);
    if(size<0)
    {
      //緩衝區已經滿了,延時重試
      if(errno==EAGAIN)
      {
        usleep(1000);
        continue;
      }
      if(errno==EINTR)
      {
        return -1;
      }
    }
    if(size==len)
    {
      continue;
    }
    len-=size;
    buf+=size;
  }
}
static void set_no_block(int fd)
{
  //先獲得以前的狀態,在以前狀態上在加入新狀態
  int before=fcntl(fd,F_GETFL);
  fcntl(fd,F_SETFL,before|O_NONBLOCK);
}
static int startup(const char *ip,int port)
{
  int sock=socket(AF_INET,SOCK_STREAM,0);
  if(sock<0)
  {
    perror("sock");
    return 2;
  }
  int opt=1;
  setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
  struct sockaddr_in local;
  local.sin_family=AF_INET;
  local.sin_port=htons(port);
  local.sin_addr.s_addr=inet_addr(ip);
  if(bind(sock,(struct sockaddr *)&local,sizeof(local))<0)
  {
    perror("bind");
    return 3;
  }
  if(listen(sock,5)<0)
  {
    perror("listen");
    return 4;
  }
  return sock;
}
static void usage(const char *proc)
{
  printf("%s [ip] [port]",proc);
}
int main(int argc,char *argv[])
{
  if(argc!=3)
  {
    usage(argv[0]);
    return 1;
  }
  int listen_sock=startup(argv[1],atoi(argv[2]));
  int epfd=epoll_create(128);
  if(epfd<0)
  {
    perror("epoll_create");
    return 5;
  }
  struct epoll_event ev;
  ev.events=EPOLLIN;
  ev.data.fd=listen_sock;
  if(epoll_ctl(epfd,EPOLL_CTL_ADD,listen_sock,&ev)<0)
  {
    perror("epoll_ctl");
    return 6;
  }
  struct epoll_event evs[128];
  int len=sizeof(evs)/sizeof(evs[0]);
  int ready=0;
  int timeout= -1;
  while(1)
  {
    switch(ready=epoll_wait(epfd,evs,len,timeout))
    {
      case 0:
        printf("timeout..\n");
        break;
      case -1:
        perror("epoll_wait");
        return 7;
        break;
      default:
        {
          int i=0;
          for(i;i<ready;i++)
          {
            //LISTEN  SOCKET
            int fd=evs[i].data.fd;
            if(i==0&&fd==listen_sock&&evs[i].events&EPOLLIN)
            {
              struct sockaddr_in peer;
              socklen_t len=sizeof(peer);
              int new_sock=accept(listen_sock,(struct sockaddr*)&peer,&len);
              if(new_sock<0)
              {
                perror("accept");
                return 8;
              }
              else
              {
                //在epoll_wait以後改變其文件狀態,爲非阻塞,ET工做
                set_no_block(new_sock);
                printf("get new socket:ip %s:port %d\n",inet_ntoa(peer.sin_addr),ntohs(peer.sin_port));
                ev.events=EPOLLIN|EPOLLET;
                ev.data.fd=new_sock;
                if(epoll_ctl(epfd,EPOLL_CTL_ADD,new_sock,&ev)<0)
                {
                  perror("epoll_ctl");
                  return 9;
                }
              }
	          }	
            else
            {
              //read
                if(evs[i].events&EPOLLIN)
                {
                  char buf[1024];
                  //my_read(fd,buf,len);
                  ssize_t _s=recv(fd,buf,sizeof(buf)-1,0);
                  if(_s>0)
                  {
                    buf[_s-1]='\0';
                    printf("client # %s\n",buf);
                    //read finish change to write
                    ev.data.fd=fd;
                    ev.events=EPOLLOUT;
                    epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev);
                  }
                  else if(_s==0)
                  {
                    printf("client close ....\n");
                    epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);
                    close(fd);
                  }
                  else
                  {
                    perror("recv");
                    return 10;
                  }
                }
                //write
                else if(evs[i].events&EPOLLOUT)
                {
                    char *msg="HTTP/1.1 200 OK\r\n\r\n<html><h1>hello momo</h1></html>\r\n";
                    //my_send(fd,msg,sizeof(msg)-1);
                    send(fd,msg,strlen(msg),0);
                    epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);
                    close(fd);
                }
                else
	              {
                    continue;
                }
                  
              }
            }
        }
        break;
    }
  }
  return 0;
  
}
相關文章
相關標籤/搜索