Linux的I/O多路複用機制之--epoll

什麼是epoll

按照man手冊的說法:是爲處理大批量句柄而做了改進的poll。它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。linux

epoll的相關係統調用

int epoll_create(int size);

建立一個epoll的句柄。自從linux2.6.8以後,size參數是被忽略的。須要注意的是,當建立好epoll句柄後,它就是會佔用一個fd值,在linux下若是查看/proc/進程id/fd/,是可以看到這個fd的,因此在使用完epoll後,必須調用close()關閉,不然可能致使fd被耗盡。編程

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件註冊函數,它不一樣於select()是在監聽事件時告訴內核要監聽什麼類型的事件,而是在這裏先註冊要監聽的事件類型。數組

第一個參數是epoll_create()的返回值。瀏覽器

第二個參數表示動做,用三個宏來表示:服務器

EPOLL_CTL_ADD:註冊新的fd到epfd中;網絡

EPOLL_CTL_MOD:修改已經註冊的fd的監聽事件;數據結構

EPOLL_CTL_DEL:從epfd中刪除一個fd;socket

第三個參數是須要監聽的fd。ide

第四個參數是告訴內核須要監聽什麼事,struct epoll_event結構以下:函數

//保存觸發事件的某個文件描述符相關的數據(與具體使用方式有關)

typedef union epoll_data {
    void *ptr;
    int fd;
    __uint32_t u32;
    __uint64_t u64;
} epoll_data_t;
 //感興趣的事件和被觸發的事件
struct epoll_event {
    __uint32_t events; /* Epoll events */
    epoll_data_t data; /* User data variable */
};

events能夠是如下幾個宏的集合:

EPOLLIN :表示對應的文件描述符能夠讀(包括對端SOCKET正常關閉);

EPOLLOUT:表示對應的文件描述符能夠寫;

EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(這裏應該表示有帶外數據到來);

EPOLLERR:表示對應的文件描述符發生錯誤;

EPOLLHUP:表示對應的文件描述符被掛斷;

EPOLLET: 將EPOLL設爲邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來講的。

EPOLLONESHOT:只監聽一次事件,當監聽完此次事件以後,若是還須要繼續監聽這個socket的話,須要再次把這個socket加入到EPOLL隊列裏

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

收集在epoll監控的事件中已經發送的事件。參數events是分配好的epoll_event結構體數組,epoll將會把發生的事件賦值到events數組中(events不能夠是空指針,內核只負責把數據複製到這個events數組中,不會去幫助咱們在用戶態中分配內存)。maxevents告以內核這個events有多大,這個 maxevents的值不能大於建立epoll_create()時的size,參數timeout是超時時間(毫秒,0會當即返回,-1將不肯定,也有說法說是永久阻塞)。若是函數調用成功,返回對應I/O上已準備好的文件描述符數目,如返回0表示已超時。

epoll工做原理

epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。

另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。

epoll的2種工做方式-水平觸發(LT)和邊緣觸發(ET)

假若有這樣一個例子:

1. 咱們已經把一個用來從管道中讀取數據的文件句柄(RFD)添加到epoll描述符

2. 這個時候從管道的另外一端被寫入了2KB的數據

3. 調用epoll_wait(2),而且它會返回RFD,說明它已經準備好讀取操做

4. 而後咱們讀取了1KB的數據

5. 調用epoll_wait(2)......

Edge Triggered工做模式:

若是咱們在第1步將RFD添加到epoll描述符的時候使用了EPOLLET標誌,那麼在第5步調用epoll_wait(2)以後將有可能會掛起,由於剩餘的數據還存在於文件的輸入緩衝區內,並且數據發出端還在等待一個針對已經發出數據的反饋信息。只有在監視的文件句柄上發生了某個事件的時候 ET 工做模式纔會彙報事件。所以在第5步的時候,調用者可能會放棄等待仍在存在於文件輸入緩衝區內的剩餘數據。在上面的例子中,會有一個事件產生在RFD句柄上,由於在第2步執行了一個寫操做,而後,事件將會在第3步被銷燬。由於第4步的讀取操做沒有讀空文件輸入緩衝區內的數據,所以咱們在第5步調用  epoll_wait(2)完成後,是否掛起是不肯定的。epoll工做在ET模式的時候,必須使用非阻塞套接口,以免因爲一個文件句柄的阻塞讀/阻塞寫操做把處理多個文件描述符的任務餓死。最好如下面的方式調用ET模式的epoll接口,在後面會介紹避免可能的缺陷。

   i    基於非阻塞文件句柄

   ii   只有當read(2)或者write(2)返回EAGAIN時才須要掛起,等待。但這並非說每次read()時都須要循環讀,直到讀到產生一個EAGAIN才認爲這次事件處理完成,當read()返回的讀到的數據長度小於請求的數據長度時,就能夠肯定此時緩衝中已沒有數據了,也就能夠認爲此事讀事件已處理完成。

Level Triggered 工做模式

相反的,以LT方式調用epoll接口的時候,它就至關於一個速度比較快的poll(2),而且不管後面的數據是否被使用,所以他們具備一樣的職能。由於即便使用ET模式的epoll,在收到多個chunk的數據的時候仍然會產生多個事件。調用者能夠設定EPOLLONESHOT標誌,在 epoll_wait(2)收到事件後epoll會與事件關聯的文件句柄從epoll描述符中禁止掉。所以當EPOLLONESHOT設定後,使用帶有  EPOLL_CTL_MOD標誌的epoll_ctl(2)處理文件句柄就成爲調用者必須做的事情。

LT(level triggered)是epoll缺省的工做方式,而且同時支持block和no-block socket.在這種作法中,內核告訴你一個文件描述符是否就緒了,而後你能夠對這個就緒的fd進行IO操做。若是你不做任何操做,內核仍是會繼續通知你 的,因此,這種模式編程出錯誤可能性要小一點。傳統的select/poll都是這種模型的表明.

ET (edge-triggered)是高速工做方式,只支持no-block socket,它效率要比LT更高。ET與LT的區別在於,當一個新的事件到來時,ET模式下固然能夠從epoll_wait調用中獲取到這個事件,但是若是此次沒有把這個事件對應的套接字緩衝區處理完,在這個套接字中沒有新的事件再次到來時,在ET模式下是沒法再次從epoll_wait調用中獲取這個事件的。而LT模式正好相反,只要一個事件對應的套接字緩衝區還有數據,就總能從epoll_wait中獲取這個事件。

所以,LT模式下開發基於epoll的應用要簡單些,不太容易出錯。而在ET模式下事件發生時,若是沒有完全地將緩衝區數據處理完,則會致使緩衝區中的用戶請求得不到響應。

epoll的優勢:

1.支持一個進程打開大數目的socket描述符(FD)

    select 最不能忍受的是一個進程所打開的FD是有必定限制的,由FD_SETSIZE設置,默認值是2048。對於那些須要支持的上萬鏈接數目的IM服務器來講顯然太少了。不過 epoll則沒有這個限制,它所支持的FD上限是最大能夠打開文件的數目,這個數字通常遠大於2048。

2.IO效率不隨FD數目增長而線性降低

    傳統的select/poll另外一個致命弱點就是當你擁有一個很大的socket集合,不過因爲網絡延時,任一時間只有部分的socket是"活躍"的,可是select/poll每次調用都會線性掃描所有的集合,致使效率呈現線性降低。可是epoll不存在這個問題,它只會對"活躍"的socket進行操做---這是由於在內核實現中epoll是根據每一個fd上面的callback函數實現的。那麼,只有"活躍"的socket纔會主動的去調用 callback函數,其餘idle狀態socket則不會。

3.使用mmap加速內核與用戶空間的消息傳遞

    這點實際上涉及到epoll的具體實現了。不管是select,poll仍是epoll都須要內核把FD消息通知給用戶空間,如何避免沒必要要的內存拷貝就很重要,在這點上,epoll是經過內核於用戶空間mmap同一塊內存實現的。

4.內核微調

(不太懂!)

epoll網絡服務器實例

服務器端:

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <string.h>

#define _MAX_LISTEN_ 5
#define _MAX_SIZE_ 10
#define _BUF_SIZE_ 1024


void Usage(const char* proc)
{
    printf("%s usage: [ip] [port]\n", proc);
}

int startup(const char* _ip, const char* _port)
{
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    if(sock < 0)
    {
        perror("socket");
        exit(1);
    }

    int opt = 1;
    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) 
    {
        perror("setsockopt");
        exit(2);
    }  

    struct sockaddr_in local;
    local.sin_family = AF_INET;
    local.sin_port = htons(atoi(_port));
    local.sin_addr.s_addr = inet_addr(_ip);
    if(bind(sock, (struct sockaddr*)&local, sizeof(local)) < 0)
    {
        perror("bind");
        exit(3);
    }

    if(listen(sock, _MAX_LISTEN_) < 0)
    {
        perror("listen");
        exit(4);
    }

    return sock;
}

int main(int argc, char* argv[])
{
    if(argc != 3)
    {
        Usage(argv[0]);
        return 1;
    }

    int listen_sock = startup(argv[1], argv[2]);

    int epoll_fd = epoll_create(128); 
    if(epoll_fd < 0)
    {
        perror("epoll_create");
        close(listen_sock);
        exit(5);
    }

    struct epoll_event ev, revent[_MAX_SIZE_];
    ev.data.fd = listen_sock;
    ev.events = EPOLLIN;

    if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_sock, &ev) < 0)
    {
        perror("epoll_ctl add error");
        exit(6);
    }

    int timeout = -1;
    while(1)
    {
        int revent_len = sizeof(revent)/sizeof(revent[0]);
        int epoll_n = epoll_wait(epoll_fd, revent, revent_len, timeout); 
        switch(epoll_n)
        {
            case -1:
                perror("epoll_wait");
                exit(7);
                break;
            case 0:
                printf("time out\n");
                break;
            default:
                {
                    int index = 0;
                    int new_fd = -1;
                    for(; index < epoll_n; ++index)
                    {
                        new_fd = revent[index].data.fd;
                        if(new_fd == listen_sock) //new accpet
                        {
                            struct sockaddr_in peer;
                            socklen_t len = sizeof(peer);
                            new_fd = accept(listen_sock, (struct sockaddr* )&peer, &len);
                            if(new_fd < 0)
                            {
                                perror("accept");
                                exit(8);
                            }
                            printf("get a new client %d -> ip: %s port: %d\n", new_fd, inet_ntoa(peer.sin_addr), ntohs(peer.sin_port));

                            ev.data.fd = new_fd;
                            ev.events = EPOLLIN;
                            if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_fd, &ev) < 0)
                            {
                                perror("epoll_ctl add error");
                                close(new_fd);
                                exit(9);
                            }

                            continue;
                        }
                        if(revent[index].events & EPOLLIN) //new read
                        {
                            char buf[_BUF_SIZE_];
                            int _s = read(new_fd, buf, sizeof(buf)-1);
                            if(_s > 0)
                            {
                                buf[_s] = '\0';
                                printf("client %d # %s\n",new_fd, buf);
                            }
                            else if(_s == 0)
                            {
                                printf("client %d is closed\n", new_fd);
                                close(new_fd);
                                epoll_ctl(epoll_fd, EPOLL_CTL_DEL, new_fd, NULL);
                            }
                            else
                            {
                                perror("read");
                            }
                        }

                        ev.data.fd = new_fd;
                        ev.events = EPOLLOUT;
                        if(epoll_ctl(epoll_fd, EPOLL_CTL_MOD, new_fd, &ev) < 0)
                        {
                            perror("epoll_ctl mod error");
                            close(new_fd);
                            exit(10);
                        }

                        if(revent[index].events & EPOLLOUT)
                        {
                            const char* msg = "Hello World ^_^";
                            write(new_fd, msg, strlen(msg));
                            close(new_fd);
                            epoll_ctl(epoll_fd, EPOLL_CTL_DEL, new_fd, NULL);
                        }

                    }
                }
        }
    }
    return 0;
}

客戶端:

#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>

void Usage(const char* proc)
{
    printf("usage: %s [ip] [port]\n", proc);
}

int main(int argc, char* argv[])
{
    if(argc != 3)
    {
        Usage(argv[0]);
        exit(1);
    }
    
    int conn_sock = socket(AF_INET, SOCK_STREAM, 0);

    struct sockaddr_in conn;
    conn.sin_family = AF_INET;
    conn.sin_port = htons(atoi(argv[2]));
    conn.sin_addr.s_addr = inet_addr(argv[1]);

    if(connect(conn_sock, (const struct sockaddr*)&conn, sizeof(conn)) < 0)
    {
        perror("connect");
        exit(2);
    }

    char buf[1024];
    memset(buf, '\0', sizeof(buf));
    while(1)
    {
        printf("please enter # ");
        fflush(stdout);

        ssize_t _s = read(0, buf, sizeof(buf)-1);
        if(_s > 0)
        {
            buf[_s-1] = '\0';
            write(conn_sock, buf, strlen(buf));
        }

        _s = read(conn_sock, buf, sizeof(buf)-1);
        if(_s > 0)
        {
            buf[_s] = '\0';
            printf("sever # %s\n", buf);
        }

    }

    return 0;
}

程序演示:

瀏覽器

wKiom1erBILgFddhAAFcUQ03eMQ050.png

客戶端

wKiom1eq85OCdnfRAABINBAz_nE028.png


select/poll/epoll優缺點分析

select


select本質是經過設置或檢查存放fd標誌位的數據結構來進行下一步的處理。會阻塞,直到有一個或多個I/O就緒。

監視的文件描述符分爲三類set,每一種對應不一樣的事件。readfds、writefds和exceptfds是指向描述符集的指針。

readfds列出的文件描述符被監視是否有數據可供讀取。(可讀)

writefds列出的文件描述符被監視是否有寫入操做完成。(可寫)

exceptfds列出的文件描述符被監視是否發生異常,或沒法控制的數據是否可用。(僅僅用於socket)

這三類set爲NULL時,select()不監視其對應的該類事件。

select()成功返回時,每組set都被修改以使它只包含準備好的I/O描述符。

特色:

(a)單個進程可監視的fd數量被限制;

(b)須要維護一個用來存放大量fd的數據結構,這樣會使用戶空間和內核空間在傳遞該結構時複製開銷大;

(c)對fd進行掃描是線性的,fd劇增後,IO效率較低,由於每次調用都對fd進行線性掃描遍歷,因此隨着fd的增長會形成遍歷速度慢的性能問題;

(d)內核須要將消息傳遞用戶空間,須要內核拷貝動做;

(e)最大支持1024個fd。

poll

和select基本同樣,除了poll沒有使用低效的三個基於位的文件描述符set,而是採用了一個單獨的結構體pollfd數組,由fds指針指向這個組。

特色

(a)它將用戶傳入的數組拷貝到內核空間,而後查詢每一個fd對應的設備狀態,若是設備就緒則在設備等待隊列中加入一項並繼續遍歷。若是遍歷完全部fd後沒有發現就緒設備,則掛起當前進程,直到設備就緒或主動超時,被喚醒後它又要再次遍歷fd;


(b)沒有最大鏈接數的限制,緣由是它是基於鏈表來存儲的;

(c)大量的fd的數組被總體複製於用戶態和內核地址空間;

(d)對fd的掃描是線性的;

(e)水平觸發:若是報告了fd後,沒有被處理,那麼下次poll時會再次報告該fd。

epoll

介紹如上

特色:

(a)支持一個進程打開最大數目的socket描述符(FD)。所支持的FD上限是最大能夠打開文件的數組,在1GB機器上,大約爲10萬左右;

(b)IO效率不隨fd數目增長而線性降低;(select/poll每次調用都會線性掃描所有的集合;epoll中只有活躍的socket纔會主動調用callback函數,其餘idle狀態的socket則不會)

(c)使用mmap減小複製開銷,加速內核與用戶空間的消息傳遞;(epoll是經過內核和用戶空間共享同一塊內存實現的)

(d)支持邊緣觸發,只告訴進程中哪些fd剛剛變爲就緒態,而且只通知一次。(epoll使用事件的就緒通知方式,經過epoll_ctl函數註冊fd。一旦該fd就緒,內核就會採用相似callback的回調機制激活該fd,epoll_wait即可以收到通知。)


j_0033.gifj_0033.gifj_0033.gifj_0033.gifj_0033.gifj_0033.gifj_0033.gifj_0033.gifj_0033.gifj_0033.gifj_0033.gifj_0033.gifj_0033.gif

相關文章
相關標籤/搜索