I/O多路複用——epoll函數

1 select的低效率

  select/poll函數效率比較低,主要有如下兩個緣由:數組

  (1)調用select函數後須要對全部文件描述符進行循環查找服務器

  (2)每次調用select函數時都須要向該函數傳遞監視對象信息併發

  在這兩個緣由中,第二個緣由是主要緣由:每次調用select函數時,應用程序都要將全部文件描述符傳遞給操做系統,這給程序帶來很大的負擔。在高併發的環境下,不管怎樣優化應用程序的代碼,都沒法完成應用的服務。  socket

  因此,select與poll並不適合以Web服務器端開發爲主流的現代開發環境,只在要求知足如下兩個條件是適用:函數

  (1)服務器端接入者少高併發

  (2)程序要求兼容性性能

2 Linux的epoll機制

  由上一節,咱們須要一種相似於select的機制來完成高併發的服務器。須要有如下兩個特色(epoll和select的區別)測試

  (1)應用程序僅向操做系統傳遞1次監視對象優化

  (2)監視範圍或內容發生變化是,操做系統只通知發生變化的事項給應用程序ui

  幸運的是,的確存在這樣的機制。Linux的支持方式是epoll,Windows的支持方式是IOCP。

3 epoll函數原型  

  epoll操做由三個函數組成:  

#include <sys/epoll.h>
int epoll_create(int size);
            //成功時返回epoll文件描述符,失敗時返回-1
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
            //成功時返回0,失敗時返回-1
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
            //成功時返回發生事件的文件描述數,失敗時返回-1

  (1)epoll_create:建立保存epoll文件描述符的空間。

  調用epoll_create函數時建立的文件描述符保存空間稱爲「epoll例程」。但要注意:size參數只是應用程序向操做系統提的建議,操做系統並不必定會生成一個大小爲size的epoll例程。

  (2)epoll_ctl:向空間註冊並註銷文件描述符。

  參數epfd指定註冊監視對象的epoll例程的文件描述符,op指定監視對象的添加、刪除或更改等操做,有如下兩種常量:

    1)EPOLL_CTL_ADD:將文件描述符註冊到epoll例程

    2)EPOLL_CTL_DEL:從epoll例程中刪除文件描述符

    3)EPOLL_CTL_MOD:更改註冊的文件描述符的關注事件發生狀況

  fd指定須要註冊的監視對象文件描述符,event指定監視對象的事件類型。epoll_event結構體以下:  

struct epoll_event
{
      __uint32_t events;
      epoll_data_t data;            
}
typedef union epoll_data
{
      void *ptr;
      int fd;
      __uint32_t u32;
      __uint64_t u64;    
}epoll_data_t;

  epoll_event的成員events中能夠保存的常量及所指的事件類型有如下:

    1)EPOLLIN:須要讀取數據的狀況

    2) EPOLLOUT:輸出緩衝爲空,能夠當即發送數據的狀況  

    3) EPOLLPRI:收到OOBO數據的狀況

    4) EPOLLRDHUP:斷開鏈接或半關閉的狀況,這在邊緣觸發方式下很是有用

    5) EPOLLERR:發生錯誤的狀況

    6) EPOLLET:以邊緣觸發的方式獲得事件通知

    7) EPOLLONESHOT:發生一次事件後,相應文件描述符再也不收到事件通知。所以須要向epoll_ctl函數的第二個參數EPOLL_CTL_MOD,再次設置事件。

  (3)epoll_wait:與select函數相似,等待文件描述符發生變化。操做系統返回epoll_event類型的結構體通知監視對象的變化。timeout函數是爲毫秒爲單位的等待時間,傳遞-1時,一直等待直到事件發生。聲明足夠大的epoll_event結構體數組後,傳遞給epoll_wait函數時,發生變化的文件符信息將被填入該數組。所以,不須要像select函數那樣針對全部文件符進行循環。

4 基於epoll的echo服務器代碼:

#define BUF_SIZE 1024
#define EPOLL_SIZE 50
void error_handling(char *buf);

int main(int argc, char *argv[])
{
    int listenfd, connfd;
    struct sockaddr_in serv_addr;
    socklen_t socklen;
    char buf[BUF_SIZE];

    int epfd, event_cnt;
    struct epoll_event *ep_events;
    struct epoll_event event;

    if (argc != 2)
    {
        printf("Usage: echo <port>\n");
        exit(1);
    }

    listenfd = socket(PF_INET, SOCK_STREAM, 0);
    memset(&serv_addr, 0, sizeof(serv_addr);
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    serv_addr.sin_port = htons(atoi(argv[1]));

    if (bind(listenfd, (struct sockaddr*) &serv_addr, sizeof(serv_addr)) == -1)
        error_handling("bind() error\n");
    if (listen(serv_addr, 5) == -1)
        error_handling("listen() error\n");

    epfd = epoll_create(EPOLL_SIZE);
    ep_events = malloc(sizeof(epoll_event)*EPOLL_SIZE);

    event.event = EPOLLIN;
    event.data.fd = listenfd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &event);

    for (;;)
    {
        event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
        if (event_cnt == -1)
            error_handling("epoll_wait() error\n");
        for (int i = 0; i < event_cnt; ++i)
        {
            if (ep_events[i].data.fd == listenfd)
            {
                connfd = accept(listenfd, NULL, NULL);
                event.events = EPOLLIN;
                event.data.fd = connfd;
                epoll_ctl(pefd, EPOLL_CTL_ADD, connfd, &event);
                printf("connect another client\n");
            }
            else
            {
                int nread = read(ep_events[i].dada.fd, buf, BUF_SIZE);
                if (nread == 0)
                {
                    close(ep_events.data.fd);
                    epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events.data.fd, NULL);
                    printf("disconnect with a client\n");
                }
                else
                {
                    write(ep_events[i].data.fd, buf, nread);
                }
            }
        }
    }
    close(listenfd);
    close(epfd);
    return 0;
}

void error_handling(char* buf)
{
    printf("%s\n", buf);
    exit(1);
}

 5 條件觸發與邊緣觸發

  條件觸發:只要引發epoll_wait返回的事件還存在,再次調用epoll_wait時,該事件還會被註冊

  邊緣觸發:每一個事件在剛發生的時候被註冊一次,以後就不會被註冊,除非又有新的事件發生。

  好比,一個已鏈接的socket套接字收到了數據,而讀取緩衝區小於接收到的數據,這時,兩種觸發方式有如下區別:(1)條件觸發:一次讀取以後,套接字緩衝區裏還有數據,再調用epoll_wait,該套接字的EPOLL_IN事件仍是會被註冊;(2)邊緣觸發:一次讀取以後,套接字緩衝區裏還有數據,再調用epoll_wait,該套接字的EPOLL_IN事件不會被註冊,除非在這期間,該套接字收到了新的數據。

  epoll默認採用條件觸發,上一節的代碼採用的就是條件觸發。

  仍是不太清楚?用代碼來砸!邊緣觸發實現echo服務器:  

//設置較小的讀取緩衝區,以測試邊緣觸發特性
#define BUF_SIZE 4
#define EPOLL_SIZE 50
void error_handling(char *buf);

int main(int argc, char *argv[])
{
    int listenfd, connfd;
    struct sockaddr_in serv_addr;
    socklen_t socklen;
    char buf[BUF_SIZE];

    int epfd, event_cnt;
    struct epoll_event *ep_events;
    struct epoll_event event;

    if (argc != 2)
    {
        printf("Usage: echo <port>\n");
        exit(1);
    }

    listenfd = socket(PF_INET, SOCK_STREAM, 0);
    memset(&serv_addr, 0, sizeof(serv_addr);
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    serv_addr.sin_port = htons(atoi(argv[1]));

    if (bind(listenfd, (struct sockaddr*) &serv_addr, sizeof(serv_addr)) == -1)
        error_handling("bind() error\n");
    if (listen(serv_addr, 5) == -1)
        error_handling("listen() error\n");

    epfd = epoll_create(EPOLL_SIZE);
    ep_events = malloc(sizeof(epoll_event)*EPOLL_SIZE);

    event.event = EPOLLIN;
    event.data.fd = listenfd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &event);

    for (;;)
    {
        event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
        if (event_cnt == -1)
            error_handling("epoll_wait() error\n");
        printf("event_cnt() return\n");            //指示一次返回
        for (int i = 0; i < event_cnt; ++i)
        {
            if (ep_events[i].data.fd == listenfd)
            {
                connfd = accept(listenfd, NULL, NULL);
                //設置爲非阻塞I/O
                int flag = fcntl(fd, F_GETFL, 0);
                fcntl(fd, F_SETFL, flag | O_NONBLOCK);

                event.events = EPOLLIN|EPOLLET;       //邊緣觸發
                event.data.fd = connfd;
                epoll_ctl(pefd, EPOLL_CTL_ADD, connfd, &event);
                printf("connect another client\n");
            }
            else
            {
                //讀完每一個已鏈接socket的緩衝區裏的數據
                while (1)
                {
                    int nread = read(ep_events[i].dada.fd, buf, BUF_SIZE);
                    if (nread == 0)
                    {
                        close(ep_events.data.fd);
                        epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events.data.fd, NULL);
                        printf("disconnect with a client\n");
                    }
                    else if (nread < 0)
                    {
                        //errno爲EAGAIN,則緩衝區內已沒有數據
                        if (errno == EAGAIN)
                            break;
                    }
                    else
                    {
                        write(ep_events[i].data.fd, buf, nread);
                    }
                }
                
            }
        }
    }
    close(listenfd);
    close(epfd);
    return 0;
}

void error_handling(char* buf)
{
    printf("%s\n", buf);
    exit(1);
}

  幾個說明:

  (1)在使用epoll_ctl註冊事件的時候,選擇邊緣觸發,|EPOLLET

  (2)處理已發生的邊緣觸發的事件時,要處理完全部的數據再返回。例中,使用了循環的方式讀取了套接字中的全部數據

  (3)讀/寫套接字的時候採用非阻塞式I/O。爲什麼?邊緣觸發方式下,以阻塞方式工做的read&write函數有可能引發服務器端的長時間停頓。

  那麼邊緣觸發好很差?有什麼優勢呢?書上說,邊緣觸發能夠分離接收數據和處理數據的時間點。也就是說,在事件發生的時候,咱們只記錄事件已經發生,而不去處理數據,等到之後的某段時間纔去處理數據,即分離接收數據和處理數據的時間點。好奇的我必定會問:條件觸發沒辦法分離接收數據和處理數據的時間點嗎?答案是能夠的。但存在問題:在數據被處理以前,每次調用epoll_wait都會產生相應的事件,在一個具備大量這樣的事件的繁忙服務器上,這是不現實的。

  但是。尚未說邊緣觸發和條件觸發哪一個更好呀?馬克思說,要辯證地看問題。so,邊緣觸發更有可能帶來高性能,但不能簡單地認爲「只要使用邊緣觸發就必定能提升速度」,要具體問題具體分析。好吧,馬克思的這一個「具體問題具體分析」適用於回答絕大部分比較類問題,已和「多喝水」,「重啓一下試試看」,「不行就分」並列成爲最簡單粗暴的4個通用回答。

相關文章
相關標籤/搜索