利用epoll寫一個"迷你"的網絡事件庫

  epoll是linux下高性能的IO複用技術,是Linux下多路複用IO接口select/poll的加強版本,它能顯著提升程序在大量併發鏈接中只有少許活躍的狀況下的系統CPU利用率。另外一點緣由就是獲取事件的時候,它無須遍歷整個被偵聽的描述符集,只要遍歷那些被內核IO事件異步喚醒而加入Ready隊列的描述符集合就好了。epoll除了提供select/poll那種IO事件的水平觸發(Level Triggered)外,還提供了邊緣觸發(Edge Triggered),這就使得用戶空間程序有可能緩存IO狀態,減小epoll_wait/epoll_pwait的調用,提升應用程序效率。html

  爲何會出現IO複用技術呢,好比在Web應用中,大量的請求鏈接事件,若是採用多進程方式處理,也就是一個鏈接對應一個fork來處理,這樣開銷太大了,畢竟建立進程仍是很耗資源的;若是採用多線程方式處理,也就是一個鏈接對應一個線程來處理,當請求併發量上去的話,系統中就會充斥着不少處理線程,畢竟一個系統建立線程是有必定上限的。這時,就須要咱們的IO複用技術了。常見的網絡模型中,有多進程+IO複用編程模型,也有多線程+IO複用編程模型,好比大名鼎鼎的nginx默認採用的就是多進程+IO複用技術來處理網絡請求的;開源網絡庫libevent也是基於IO複用技術來完成網絡數據處理的。linux

 

epoll系列函數

  epoll是Linux特有的IO複用函數,它在實現和使用上與select和poll有很大差別,首先,epoll使用一組函數來完成操做,而不是單個函數。其次,epoll把用戶關心的文件描述符上的事件放在內核上的一個事件表中,從而無須像select和poll那樣每次調用都要重複傳入文件描述符集合事件表。但epoll須要使用一個額外的文件描述符,來惟一標識內核中這個事件表,這個文件描述符使用以下epoll_create函數建立:nginx

#include <sys/epoll.h>  
int epoll_create(int size);  // 返回:成功返回建立的內核事件表對應的描述符,出錯-1  

  size參數如今並不起做用,只是給內核一個提示,告訴它內核表須要多大,該函數返回的文件描述符將用做其餘全部epoll函數的第一個參數,以指定要訪問的內核事件表。用epoll_ctl函數操做內核事件表c++

#include <sys/epoll.h>  
int epoll_ctl(int opfd, int op, int fd, struct epoll_event *event);  // 返回:成功返回建立的內核事件表對應的描述符,出錯-1  

  fd參數是要操做的文件描述符,op指定操做類型,操做類型有3種編程

  • EPOLL_CTL_ADD:往事件表中註冊fd上的事件
  • EPOLL_CTL_MOD:修改fd上的註冊事件
  • EPOLL_CTL_DEL:刪除fd上的註冊事件

  event指定事件類型,它是epoll_event結構指針類型:數組

struct epoll_event  
{  
    __uint32_t events;  /* epoll事件 */  
    epoll_data_t data;  /* 用戶數據 */  
};  

  其中events描述事件類型,epoll支持的事件類型和poll基本相同,表示epoll事件類型的宏是在poll對應的宏加上」E」,好比epoll的數據可讀事件是EPOLLIN,但epoll有兩個額外的事件類型-EPOLLET和EPOLLONESHOT,它們對於高效運做很是關鍵,data用於存儲用戶數據,其類型epoll_data_t定義以下:緩存

typedef union epoll_data  
{  
    void *ptr;  
    int fd;  
    uint32_t u32;  
    uint64_t u64;  
}epoll_data_t;  

  epoll_data_t是一個聯合體,其4個成員最多使用的是fd,它指定事件所從屬的目標文件描述符,ptr成員可用來指定fd相關的用戶數據,但因爲opoll_data_t是一個聯合體,咱們不能同時使用fd和ptr,若是要將文件描述符嗯哼用戶數據關聯起來,以實現快速的數據訪問,則只能使用其餘手段,好比放棄使用fd成員,而在ptr指針指向的用戶數據中包含fd。網絡

#include <sys/epoll.h>  
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);  // 返回:成功返回就緒的文件描述符個數,出錯-1  

  timeout參數的含義與poll接口的timeout參數相同,maxevents參數指定最多監聽多少個事件,它必須大於0。數據結構

  epoll_wait若是檢測到事件,就將全部就緒的事件從內核事件表(由epfd指定)中複製到events指定的數組中,這個數組只用來輸epoll_wait檢測到的就緒事件,而不像select和poll的參數數組既傳遞用於用戶註冊的事件,有用於輸出內核檢測到就緒事件,這樣極大提升了應用程序索引就緒文件描述符的效率。多線程

 

epoll原理與實現

  epoll是怎麼實現的呢?其實很簡單,從這3個方法就能夠看出,它比select聰明的避免了每次頻繁調用「哪些鏈接已經處在消息準備好階段」的 epoll_wait時,是不須要把全部待監控鏈接傳入的。這意味着,它在內核態維護了一個數據結構保存着全部待監控的鏈接。這個數據結構就是一棵紅黑樹,它的結點的增長、減小是經過epoll_ctrl來完成的。

 

圖片來源於CSDN:高性能網絡編程5--IO複用與併發編程

  圖中左下方的紅黑樹由全部待監控的鏈接構成。左上方的鏈表,同是目前全部活躍的鏈接。因而,epoll_wait執行時只是檢查左上方的鏈表,並返回左上方鏈表中的鏈接給用戶。這樣,epoll_wait的執行效率能不高嗎?

  最後,再看看epoll提供的2種玩法ET和LT,即翻譯過來的邊緣觸發和水平觸發。其實這兩個中文名字倒也有些貼切。這2種使用方式針對的仍然是效率問題,只不過變成了epoll_wait返回的鏈接如何可以更準確些。
例如,咱們須要監控一個鏈接的寫緩衝區是否空閒,知足「可寫」時咱們就能夠從用戶態將響應調用write發送給客戶端 。可是,或者鏈接可寫時,咱們的「響應」內容還在磁盤上呢,此時如果磁盤讀取還未完成呢?確定不能使線程阻塞的,那麼就不發送響應了。可是,下一次epoll_wait時可能又把這個鏈接返回給你了,你還得檢查下是否要處理。可能,咱們的程序有另外一個模塊專門處理磁盤IO,它會在磁盤IO完成時再發送響應。那麼,每次epoll_wait都返回這個「可寫」的、卻沒法馬上處理的鏈接,是否符合用戶預期呢?
  因而,ET和LT模式就應運而生了。LT是每次知足期待狀態的鏈接,都得在epoll_wait中返回,因此它一視同仁,都在一條水平線上。ET則否則,它傾向更精確的返回鏈接。在上面的例子中,鏈接第一次變爲可寫後,如果程序未向鏈接上寫入任何數據,那麼下一次epoll_wait是不會返回這個鏈接的。ET叫作 邊緣觸發,就是指,只有鏈接從一個狀態轉到另外一個狀態時,纔會觸發epoll_wait返回它。可見,ET的編程要複雜很多,至少應用程序要當心的防止epoll_wait的返回的鏈接出現:可寫時未寫數據後卻期待下一次「可寫」、可讀時未讀盡數據卻期待下一次「可讀」。 
  固然,從通常應用場景上它們性能是不會有什麼大的差距的,ET可能的優勢是,epoll_wait的調用次數會減小一些,某些場景下鏈接在沒必要要喚醒時不會被喚醒(此喚醒指epoll_wait返回)。但若是像我上面舉例所說的,有時它不單純是一個網絡問題,跟應用場景相關。固然,大部分開源框架都是基於ET寫的,框架嘛,它追求的是純技術問題,固然力求盡善盡美。

 

基於epoll的"迷你"網絡事件庫

  網絡事件庫封裝了底層IO複用函數,同時提供給外部使用的接口,提供的接口能夠多種多樣,可是通常有添加事件、刪除事件、開始事件循環等接口。爲了展現下網絡事件庫的是如何封裝IO複用函數,同時學習epoll的使用,"迷你"網絡事件庫-tomevent今天誕生了 :) (ps:tomevent採用C++語言實現)。

  既然是網絡事件庫,那首先須要定義一個事件的結構,LZ這裏就使用Event結構體了,事件結構體中包含監聽的文件描述符、事件類型、回調函數、傳遞給回調函數的參數,固然,這只是一個簡單的事件結構,若是還須要其餘信息可另外添加。

/**
 * event struct.
 */
struct Event {
    int fd;                                 /* the fd want to monitor */
    short event;                            /* the event you want to monitor */
    void *(*callback)(int fd, void *arg);   /* the callback function */
    void *arg;                               /* the parameter of callback function */
};

  定義一個事件處理接口IEvent,該接口定義了3個基本的事件操做函數,也就是添加事件、刪除事件、開始事件循環。定義IEvent接口,與具體的底層IO技術解耦,使用具體的IO複用類來實現該接口,好比對應select的SelectEvent,或者是對應poll的PollEvent,固然,這裏就用epoll對應的EpollEvent來實現IEvent接口(ps:c++中接口貌似應該稱爲抽象類,不過這裏稱爲接口更合適一點)。

/**
 * the interface of event.
 */
class IEvent {
public:
    virtual int addEvent(const Event &event) = 0;
    virtual int delEvent(const Event &event) = 0;
    virtual int dispatcher() = 0;

    virtual ~IEvent() { }
};

  IEvent的實現類EpollEvent,其中封裝了epoll相關的函數。EpollEvent有3個成員,分別是pollCreateSize、epollFd、events,pollCreateSize表示調用epoll_create時傳遞的參數值,epollFd表示epoll_create的返回值,events是記錄事件的map,events中記錄了監聽事件的信息,當事件來臨時被用到。

class EpollEvent : public IEvent {
public:
    EpollEvent() : EpollEvent(16) {

    }
    EpollEvent(int createSize) {
        if (createSize < 16) {
            createSize = 16;
        }

        epollCreateSize = createSize;
        initEvent();
    }

    virtual int addEvent(const Event &event);
    virtual int delEvent(const Event &event);
    virtual int dispatcher();

private:
    int initEvent() {
        int epollFd = epoll_create(this->epollCreateSize);
        if (epollFd <= 0) {
            perror("create_create error:");
            return epollFd; /* here epollFd is -1 */
        }

        this->epollFd = epollFd;return 0;
    }

    int epollCreateSize;
    int epollFd;

    //Event event;
    map<int, Event> events;
};
int EpollEvent::addEvent(const Event &event) {
    struct epoll_event epollEvent;

    epollEvent.data.fd = event.fd;
    epollEvent.events = event.event;
    int retCode = epoll_ctl(this->epollFd, EPOLL_CTL_ADD, event.fd, &epollEvent);
    if (retCode < 0) {
        perror("epoll_ctl error:");
        return retCode;
    }

    /* add event to this->events */
    this->events[event.fd] = event;return 0;
}

int EpollEvent::delEvent(const Event &event) {
    struct epoll_event epollEvent;

    epollEvent.data.fd = event.fd;
    epollEvent.events = event.event;
    int retCode = epoll_ctl(this->epollFd, EPOLL_CTL_DEL, event.fd, &epollEvent);
    if (retCode < 0) {
        perror("epoll_ctl error:");
        return retCode;
    }

    this->events.erase(event.fd);
    return 0;
}

int EpollEvent::dispatcher() {
    struct epoll_event epollEvents[32];

    //cout << "epoll_wait before" << endl;
    int nEvents = epoll_wait(epollFd, epollEvents, 32, -1);
    if (nEvents <= 0) {
        perror("epoll_wait error:");
        return -1;
    }
    //cout << "epoll_wait after nEvent" << endl;
    for (int i = 0; i < nEvents; i++) {
        int fd = epollEvents[i].data.fd;
        Event event = this->events[fd];
        if (event.callback) {
            event.callback(fd, event.arg);
        }
    }
    return 0;
}

  到這裏整個tomevent的框架代碼就結束了,那麼該如何使用呢,如下是一個測試用例。使用tomevent來同時監聽2個文件描述符,一個是標準輸入(fd爲0),另外一個是提供UDP服務的一個文件描述符。

void *test(int fd, void *arg) {
    cout << "****************test(): fd=" << fd << endl;

    char buff[256];
    int len = recvfrom(fd, buff, sizeof(buff), 0, NULL, NULL);
    if (len > 0) {
        buff[len] = '\0';
        cout << buff << endl;
    }
    else {
        perror("recvfrom error:");
    }
    cout << "****************test()**********" << endl;
}

void *inTest(int fd, void *arg) {
    cout << "****************inTest(): fd=" << fd << endl;

    char buff[256];
    int len = read(fd, buff, sizeof(buff));
    if (len > 0) {
        buff[len] = '\0';
        cout << buff << endl;
    }
    else {
        perror("read stdin error:");
    }
    cout << "****************inTest()**********" << endl;
}

int main(int argc, char **argv) {
    int listenFd = -1;
    int connFd = -1;
    struct sockaddr_in servAddr;

    listenFd = socket(AF_INET, SOCK_DGRAM, 0);

    memset(&servAddr, 0, sizeof(servAddr));
    servAddr.sin_family = AF_INET;
    servAddr.sin_port = htons(8080);
    servAddr.sin_addr.s_addr = INADDR_ANY;
    bind(listenFd, (struct sockaddr *)&servAddr, sizeof(servAddr));

    listen(listenFd, 5);

    Event event, inEvent;
    EpollEvent eventBase;

    event.fd = listenFd;
    event.event = EPOLLIN;
    event.arg = NULL;
    event.callback = test;

    inEvent.fd = 0;
    inEvent.event = EPOLLIN;
    inEvent.arg = NULL;
    inEvent.callback = inTest;

    eventBase.addEvent(event);
    eventBase.addEvent(inEvent);

    for (; ;) {
        eventBase.dispatcher();
    }

    return 0;
}

  如下是測試結果 ,同時提供UDP服務和響應鍵盤輸入。

 

參考

  一、epoll-百度百科

  二、高性能網絡編程5--IO複用與併發編程

  三、Libevent初探

相關文章
相關標籤/搜索