Linux下I/O多路轉接之epoll(絕對經典)

epollhtml

關於Linux下I/O多路轉接之epoll函數,什麼返回值,什麼參數,我不想再多的解釋,您不想移駕,我給你移來:ruby

http://blog.csdn.net/colder2008/article/details/5812487      返回值,參數說明等;
服務器

最後將一個用epoll設計的網絡服務器貼上代碼,以供借閱:網絡

下面咱們從流開始提及,再到select-->poll-->epoll:(原文來自:http://www.tuicool.com/articles/6NrMJn),文中這樣說:多線程

一個流能夠是文件,socket,pipe等等能夠進行I/O操做的內核對象。socket

不論是文件,仍是套接字,仍是管道,咱們均可以把他們看做流。函數

  以後咱們來討論I/O的操做,經過read,咱們能夠從流中讀入數據;經過write,咱們能夠往流寫入數據。如今假定一個情形,咱們須要從流中讀數據,可是流中尚未數據,(典型的例子爲,客戶端要從socket讀如數據,可是服務器尚未把數據傳回來),這時候該怎麼辦?學習

  阻塞:阻塞是個什麼概念呢?好比某個時候你在等快遞,可是你不知道快遞何時過來,並且你沒有別的事能夠幹(或者說接下來的事要等快遞來了才能作);那麼你能夠去睡覺了,由於你知道快遞把貨送來時必定會給你打個電話(假定必定能叫醒你)。ui

  非阻塞忙輪詢:接着上面等快遞的例子,若是用忙輪詢的方法,那麼你須要知道快遞員的手機號,而後每分鐘給他掛個電話:「你到了沒?」spa

很明顯通常人不會用第二種作法,不只顯很無腦,浪費話費不說,還佔用了快遞員大量的時間。

大部分程序也不會用第二種作法,由於第一種方法經濟而簡單,經濟是指消耗不多的CPU時間,若是線程睡眠了,就掉出了系統的調度隊列,暫時不會去瓜分CPU寶貴的時間片了。

爲了瞭解阻塞是如何進行的,咱們來討論緩衝區,以及內核緩衝區,最終把I/O事件解釋清楚。緩衝區的引入是爲了減小頻繁I/O操做而引發頻繁的系統調用(你知道它很慢的),當你操做一個流時,更多的是以緩衝區爲單位進行操做,這是相對於用戶空間而言。對於內核來講,也須要緩衝區。

假設有一個管道,進程A爲管道的寫入方,B爲管道的讀出方。

假設一開始內核緩衝區是空的,B做爲讀出方,被阻塞着。而後首先A往管道寫入,這時候內核緩衝區由空的狀態變到非空狀態,內核就會產生一個事件告訴B該醒來了,這個事件姑且稱之爲「緩衝區非空」。

可是「緩衝區非空」事件通知B後,B卻尚未讀出數據;且內核許諾了不能把寫入管道中的數據丟掉這個時候,A寫入的數據會滯留在內核緩衝區中,若是內核也緩衝區滿了,B仍未開始讀數據,最終內核緩衝區會被填滿,這個時候會產生一個I/O事件,告訴進程A,你該等等(阻塞)了,咱們把這個事件定義爲「緩衝區滿」。

假設後來B終於開始讀數據了,因而內核的緩衝區空了出來,這時候內核會告訴A,內核緩衝區有空位了,你能夠從長眠中醒來了,繼續寫數據了,咱們把這個事件叫作「緩衝區非滿」

也許事件Y1已經通知了A,可是A也沒有數據寫入了,而B繼續讀出數據,知道內核緩衝區空了。這個時候內核就告訴B,你須要阻塞了!,咱們把這個時間定爲「緩衝區空」。

這四個情形涵蓋了四個I/O事件,緩衝區滿,緩衝區空,緩衝區非空,緩衝區非滿(注都是說的內核緩衝區,且這四個術語都是我生造的,僅爲解釋其原理而造)。這四個I/O事件是進行阻塞同步的根本。(若是不能理解「同步」是什麼概念,請學習操做系統的鎖,信號量,條件變量等任務同步方面的相關知識)。

而後咱們來講說阻塞I/O的缺點。可是阻塞I/O模式下,一個線程只能處理一個流的I/O事件。若是想要同時處理多個流,要麼多進程(fork),要麼多線程(pthread_create),很不幸這兩種方法效率都不高。

因而再來考慮非阻塞忙輪詢的I/O方式,咱們發現咱們能夠同時處理多個流了(把一個流從阻塞模式切換到非阻塞模式再此不予討論):

while true {
    for i in stream[]; {
        if i has data
        read until unavailable
    }
}

 咱們只要不停的把全部流從頭至尾問一遍,又從頭開始。這樣就能夠處理多個流了,但這樣的作法顯然很差,由於若是全部的流都沒有數據,那麼只會白白浪費CPU。這裏要補充一點,阻塞模式下,內核對於I/O事件的處理是阻塞或者喚醒,而非阻塞模式下則把I/O事件交給其餘對象(後文介紹的select以及epoll)處理甚至直接忽略。

爲了不CPU空轉,能夠引進了一個代理(一開始有一位叫作select的代理,後來又有一位叫作poll的代理,不過二者的本質是同樣的)。這個代理比較厲害,能夠同時觀察許多流的I/O事件,在空閒的時候,會把當前線程阻塞掉,當有一個或多個流有I/O事件時,就從阻塞態中醒來,因而咱們的程序就會輪詢一遍全部的流(因而咱們能夠把「忙」字去掉了)。代碼長這樣:

while true {
    select(streams[])
    for i in streams[] {
        if i has data
        read until unavailable
    }
}

因而,若是沒有I/O事件產生,咱們的程序就會阻塞在select處。可是依然有個問題,咱們從select那裏僅僅知道了,有I/O事件發生了,但卻並不知道是那幾個流(可能有一個,多個,甚至所有),咱們只能無差異輪詢全部流,找出能讀出數據,或者寫入數據的流,對他們進行操做。

可是使用select,咱們有O(n)的無差異輪詢複雜度,同時處理的流越多,沒一次無差異輪詢時間就越長。再次

說了這麼多,終於能好好解釋epoll了

epoll能夠理解爲event poll,不一樣於忙輪詢和無差異輪詢,epoll之會把哪一個流發生了怎樣的I/O事件通知咱們。此時咱們對這些流的操做都是有意義的。(複雜度下降到了O(1))

在討論epoll的實現細節以前,先把epoll的相關操做列出:

epoll_create 建立一個epoll對象,通常epollfd = epoll_create()

epoll_ctl (epoll_add/epoll_del的合體),往epoll對象中增長/刪除某一個流的某一個事件
好比
epoll_ctl(epollfd, EPOLL_CTL_ADD, socket, EPOLLIN);//註冊緩衝區非空事件,即有數據流入
epoll_ctl(epollfd, EPOLL_CTL_DEL, socket, EPOLLOUT);//註冊緩衝區非滿事件,即流能夠被寫入
epoll_wait(epollfd,...)等待直到註冊的事件發生
(注:當對一個非阻塞流的讀寫發生緩衝區滿或緩衝區空,write/read會返回-1,並設置errno=EAGAIN。而epoll只關心緩衝區非滿和緩衝區非空事件)。

一個epoll模式的代碼大概的樣子是:

while true {
    active_stream[] = epoll_wait(epollfd)
    for i in active_stream[] {
        read or write till
    }
 }
以上不會有太多的實例,基本就是原理,而且略帶的說了select、poll和epoll的比較,得出爲何要用epoll:下面是服務器代碼:

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/types.h>
#include<sys/epoll.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<assert.h>
#include<fcntl.h>
#include<unistd.h>


void usage(const char* argv)
{ 
  printf("%s:[ip][port]\n",argv);
}

void set_nonblock(int fd)
{
    int fl = fcntl(fd,F_GETFL);
    fcntl(fd,F_SETFL,fl | O_NONBLOCK);
}

int startup(char* _ip,int _port)  //建立一個套接字,綁定,檢測服務器
{
  //sock
  //1.建立套接字
  int sock=socket(AF_INET,SOCK_STREAM,0);   
  if(sock<0)
  {
      perror("sock");
      exit(2);
  }
  
  int opt = 1;
  setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));

  //2.填充本地 sockaddr_in 結構體(設置本地的IP地址和端口)
  struct sockaddr_in local;       
  local.sin_port=htons(_port);
  local.sin_family=AF_INET;
  local.sin_addr.s_addr=inet_addr(_ip);

  //3.bind()綁定
  if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0) 
  {
      perror("bind");
      exit(3);
  }
  //4.listen()監聽 檢測服務器
  if(listen(sock,5)<0)
  {
      perror("listen");
      exit(4);
  }
  return sock;    //這樣的套接字返回
}

int main(int argc,char *argv[])                         
{
    if(argc!=3)     //檢測參數個數是否正確
    {
        usage(argv[0]);
        exit(1);
    }

    int listen_sock=startup(argv[1],atoi(argv[2]));      //建立一個綁定了本地 ip 和端口號的套接字描述符


    //1.建立epoll    
    int epfd = epoll_create(256);    //可處理的最大句柄數256個
    if(epfd < 0)
    {
	perror("epoll_create");
	exit(5);
    }

    struct epoll_event _ev;       //epoll結構填充 
    _ev.events = EPOLLIN;         //初始關心事件爲讀
    _ev.data.fd = listen_sock;   

    //2.託管
    epoll_ctl(epfd,EPOLL_CTL_ADD,listen_sock,&_ev);  //將listen sock添加到epfd中,關心讀事件

    struct epoll_event revs[64];

    int timeout = -1;
    int num = 0;
    int done = 0;

    while(!done)
    {
	//epoll_wait()至關於在檢測事件
        switch((num = epoll_wait(epfd,revs,64,timeout)))  //返回須要處理的事件數目  64表示 事件有多大
        {
            case 0:                  //返回0 ,表示監聽超時
                printf("timeout\n");
                break;
            case -1:                 //出錯
                perror("epoll_wait");
                break;
            default:                 //大於零 即就是返回了須要處理事件的數目
              {
		struct sockaddr_in peer;
		socklen_t len = sizeof(peer);

		int i;
                for(i=0;i < num;i++)
                {
		    int rsock = revs[i].data.fd; //準確獲取哪一個事件的描述符
                    if(rsock == listen_sock && (revs[i].events) && EPOLLIN) //若是是初始的 就接受,創建連接
                    {
                        int new_fd = accept(listen_sock,(struct sockaddr*)&peer,&len);  

                    	if(new_fd > 0)
                    	{
                             printf("get a new client:%s:%d\n",inet_ntoa(peer.sin_addr),ntohs(peer.sin_port));

			     set_nonblock(new_fd);
			     _ev.events = EPOLLIN | EPOLLET;
			     _ev.data.fd = new_fd;
			     epoll_ctl(epfd,EPOLL_CTL_ADD,new_fd,&_ev);    //二次託管

                        }
		    }
		    else // 接下來對num - 1 個事件處理
		    {
			if(revs[i].events & EPOLLIN)
			{
			    char buf[1024];
			    ssize_t _s = read(rsock,buf,sizeof(buf)-1);
			    if(_s > 0)
			    {
				buf[_s] = '\0';
				printf("client:%s\n",buf);

			        _ev.events = EPOLLOUT | EPOLLET;
			        _ev.data.fd = rsock;
			        epoll_ctl(epfd,EPOLL_CTL_DEL,rsock,&_ev);    //二次託管
			    }
			    else if(_s == 0)  //client:close
			    {
				printf("client:%d close\n",rsock);
				epoll_ctl(epfd,EPOLL_CTL_DEL,rsock,NULL);

				close(rsock);
			    }
			    else
			    {
				perror("read");
			    }
			}
			else if(revs[i].events & EPOLLOUT)
			{
			    const char *msg = "HTTP/1.0.200 OK\r\n\r\n<html><h2>李寧愛張寧!</h2></html>\r\n";
			    write(rsock,msg,strlen(msg));
			    epoll_ctl(epfd,EPOLL_CTL_DEL,rsock,NULL);
			    close(rsock);
			}
			else
			{}
		    }
		}
	      }
	      break;
	}
    }
    return 0;
}
賜教!
相關文章
相關標籤/搜索