UNIX epoll 與 Node.js 事件循環多路分解器

關於I / O複用,能夠先查閱《UNIX網絡編程》第六章selectpoll相關內容。html

selectpollepoll都是I / O複用的機制,在《UNIX網絡編程》裏重點講了selectpoll的機制,但selectpoll並非現代高性能服務器的最佳選擇。包括如今的Node.js中的事件循環機制(event loop)也是基於epoll實現的。linux

select和poll的缺點

按照《UNIX網絡編程》中所述,pollselect相似,沒有解決如下的問題:git

  • 每次調用select,都須要把fd集合從用戶態拷貝到內核態,這個開銷在fd不少時會很大
  • 同時每次調用select都須要在內核遍歷傳遞進來的全部fd,這個開銷在fd不少時也很大
  • select支持的文件描述符數量過小了,默認是1024

epoll對於上述缺點的改進

epoll既然是對selectpoll的改進,就應該能避免上述的三個缺點。那epoll都是怎麼解決的呢?在此以前,咱們先看一下epollselectpoll的調用接口上的不一樣,selectpoll都只提供了一個函數——select或者poll函數。而epoll提供了三個函數,epoll_create,epoll_ctlepoll_waitepoll_create是建立一個epoll句柄;epoll_ctl是註冊要監聽的事件類型;epoll_wait則是等待事件的產生。   對於第一個缺點,epoll的解決方案在epoll_ctl函數中。每次註冊新的事件到epoll句柄中時(在epoll_ctl中指定EPOLL_CTL_ADD),會把全部的fd拷貝進內核,而不是在epoll_wait的時候重複拷貝。epoll保證了每一個fd在整個過程當中只會拷貝一次。   對於第二個缺點,epoll的解決方案不像selectpoll同樣每次都把current輪流加入fd對應的設備等待隊列中,而只在epoll_ctl時把current掛一遍(這一遍必不可少)併爲每一個fd指定一個回調函數,當設備就緒,喚醒等待隊列上的等待者時,就會調用這個回調函數,而這個回調函數會把就緒的fd加入一個就緒鏈表)。epoll_wait的工做實際上就是在這個就緒鏈表中查看有沒有就緒的fd(利用schedule_timeout()實現睡一會,判斷一會的效果,和select實現中的第7步是相似的)。   對於第三個缺點,epoll沒有這個限制,它所支持的FD上限是最大能夠打開文件的數目,這個數字通常遠大於2048,舉個例子,在1GB內存的機器上大約是10萬左右,通常來講這個數目和系統內存關係很大。github

epoll接口

epoll操做過程須要三個接口,分別以下:編程

#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
複製代碼

epoll_create方法

#include <sys/epoll.h>
int epoll_create(int size);
複製代碼

建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大,這個參數不一樣於select()中的第一個參數,給出最大監聽的fd+1的值,參數size並非限制了epoll所能監聽的描述符最大個數,只是對內核初始分配內部數據結構的一個建議。 當建立好epoll句柄後,它就會佔用一個fd值,在linux下若是查看/proc/進程id/fd/,是可以看到這個fd的,因此在使用完epoll後,必須調用close()關閉,不然可能致使fd被耗盡。segmentfault

#include <sys/epoll.h>
#define FDSIZE 1024
// ...
int main(int argc,char *argv[]) {
  int epollfd = epoll_create(FDSIZE); // 這裏並非指最大文件描述符數量爲1024,而是給內核初始化數據結構的一個建議。
  return 0;
}
複製代碼

epoll_ctl方法

#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
複製代碼

epoll_ctl方法是epoll的事件註冊函數,它不一樣與select()是在監聽事件時告訴內核要監聽什麼類型的事件,而是在這裏先註冊要監聽的事件類型。設計模式

  • epfd:是epoll_create()的返回值。
  • op:表示對對應的fd文件描述符的操做,通常狀況下表示想要監聽事件、刪除事件和修改事件處理函數,用三個宏來表示:
    • EPOLL_CTL_ADD,表示對於對應的fd文件描述符添加一組事件監聽;
    • EPOLL_CTL_DEL,表示對於對應的fd文件描述符刪除該組事件監聽;
    • EPOLL_CTL_MOD,表示對於對應的fd文件描述符修改該組事件監聽爲新的events
  • fd:表示須要監聽的fd(文件描述符)
  • event:是告訴內核須要監聽的事件集合,傳入一個指針,指向事件集合的第一項,struct epoll_event的結構以下:
struct epoll_event {
  __uint32_t events;  // 表示一類epoll事件
  epoll_data_t data;  // 用戶傳遞的數據
}

複製代碼

由於events表示一類epoll事件,它能夠是如下幾個宏的集合:服務器

  • EPOLLIN:表示對應的文件描述符能夠讀(包括對端SOCKET正常關閉);
  • EPOLLOUT:表示對應的文件描述符能夠寫;
  • EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(這裏應該表示有帶外數據到來);
  • EPOLLERR:表示對應的文件描述符發生錯誤;
  • EPOLLHUP:表示對應的文件描述符被掛斷;
  • EPOLLET: 將EPOLL設爲邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來講的;
  • EPOLLONESHOT:只監聽一次事件,當監聽完此次事件以後,若是還須要繼續監聽這個socket的話,須要再次把這個socket加入到EPOLL隊列裏;

例如,若是想讓epoll對於對應的文件描述符fd添加一組事件,監聽對應的文件描述符可讀的狀況:網絡

static void add_event_epoll_in(int epollfd, int fd) {
  struct epoll_event ev;
  ev.events = EPOLLIN;
  ev.data.fd = fd;
  epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
}
複製代碼

epoll_wait方法

#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
複製代碼

epoll_wait方法等待事件的產生,相似於select()調用,返回須要處理的事件數目。數據結構

  • epfd:是epoll_create()的返回值;
  • events:用來從內核獲得事件的集合,咱們通常把須要處理的事件對應的文件描述符fd放到events結構體下data參數內,這樣咱們能夠在事件處理函數中取到對應的文件描述符fd,執行對應操做(例如對於TCP套接字,咱們調用readwrite等);
  • maxevents:告以內核這個events的數量,這個maxevents的值不能大於建立epoll_create()時的size,不然會形成溢出的風險;
  • timeout:超時時間,以毫秒爲單位,若是設置爲-1,表示一直等待,設置爲0表示不等待;

舉例:應用程序通常阻塞與epoll_wait調用,一旦events中任意一事件觸發,epoll_wait執行,等待指定的timeout超時時間,若是I / O完成則當即返回須要處理的事件數目。

static void do_epoll(int listenfd) {
    int epollfd;
    struct epoll_event events[EPOLLEVENTS];
    int ret;
    char buf[MAXSIZE];
    memset(buf,0,MAXSIZE);
    //建立一個描述符
    epollfd = epoll_create(FDSIZE);
    //添加監聽描述符事件
    add_event_epoll_in(epollfd, listenfd);
    for ( ; ; )
    {
        // 獲取已經準備好的描述符事件數目
        ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
        handle_events(epollfd, events, ret, listenfd, buf);
    }
    close(epollfd);
}

static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf) {
    int i;
    int fd;
    //進行選好遍歷
    for (i = 0;i < num;i++)
    {
        // 在這裏取到須要處理的文件描述符
        fd = events[i].data.fd;
        //根據描述符的類型和事件類型進行處理
        if ((fd == listenfd) &&(events[i].events & EPOLLIN))
            handle_accpet(epollfd,listenfd);
        else if (events[i].events & EPOLLIN)
            do_read(epollfd,fd,buf);
        else if (events[i].events & EPOLLOUT)
            do_write(epollfd,fd,buf);
    }
}
複製代碼

使用epoll重構服務器回射程序

服務端

// server.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h>

#define IPADDRESS "127.0.0.1"
#define PORT 8787
#define MAXSIZE 1024
#define LISTENQ 5
#define FDSIZE 1000
#define EPOLLEVENTS 100

//函數聲明
//建立套接字並進行綁定
static int socket_bind(const char* ip,int port);
//IO多路複用epoll
static void do_epoll(int listenfd);
//事件處理函數
static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf);
//處理接收到的鏈接
static void handle_accpet(int epollfd,int listenfd);
//讀處理
static void do_read(int epollfd,int fd,char *buf);
//寫處理
static void do_write(int epollfd,int fd,char *buf);
//添加事件
static void add_event(int epollfd,int fd,int state);
//修改事件
static void modify_event(int epollfd,int fd,int state);
//刪除事件
static void delete_event(int epollfd,int fd,int state);

int main(int argc,char *argv[]) {
    int  listenfd;
    listenfd = socket_bind(IPADDRESS,PORT);
    listen(listenfd,LISTENQ);
    do_epoll(listenfd);
    return 0;
}

static int socket_bind(const char* ip,int port) {
    int  listenfd;
    struct sockaddr_in servaddr;
    listenfd = socket(AF_INET,SOCK_STREAM,0);
    if (listenfd == -1)
    {
        perror("socket error:");
        exit(1);
    }
    bzero(&servaddr,sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    inet_pton(AF_INET,ip,&servaddr.sin_addr);
    servaddr.sin_port = htons(port);
    if (bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1)
    {
        perror("bind error: ");
        exit(1);
    }
    return listenfd;
}

static void do_epoll(int listenfd) {
    int epollfd;
    struct epoll_event events[EPOLLEVENTS];
    int ret;
    char buf[MAXSIZE];
    memset(buf,0,MAXSIZE);
    //建立一個描述符
    epollfd = epoll_create(FDSIZE);
    //添加監聽描述符事件
    add_event(epollfd,listenfd,EPOLLIN);
    for ( ; ; )
    {
        //獲取已經準備好的描述符事件
        ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
        handle_events(epollfd,events,ret,listenfd,buf);
    }
    close(epollfd);
}

static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf) {
    int i;
    int fd;
    //進行選好遍歷
    for (i = 0;i < num;i++)
    {
        fd = events[i].data.fd;
        //根據描述符的類型和事件類型進行處理
        if ((fd == listenfd) &&(events[i].events & EPOLLIN))
            handle_accpet(epollfd,listenfd);
        else if (events[i].events & EPOLLIN)
            do_read(epollfd,fd,buf);
        else if (events[i].events & EPOLLOUT)
            do_write(epollfd,fd,buf);
    }
}
static void handle_accpet(int epollfd,int listenfd) {
    int clifd;
    struct sockaddr_in cliaddr;
    socklen_t  cliaddrlen;
    clifd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen);
    if (clifd == -1)
        perror("accpet error:");
    else
    {
        printf("accept a new client: %s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);
        //添加一個客戶描述符和事件
        add_event(epollfd,clifd,EPOLLIN);
    }
}

static void do_read(int epollfd,int fd,char *buf) {
    int nread;
    nread = read(fd,buf,MAXSIZE);
    if (nread == -1)
    {
        perror("read error:");
        close(fd);
        delete_event(epollfd,fd,EPOLLIN);
    }
    else if (nread == 0)
    {
        fprintf(stderr,"client close.\n");
        close(fd);
        delete_event(epollfd,fd,EPOLLIN);
    }
    else
    {
        printf("read message is : %s",buf);
        //修改描述符對應的事件,由讀改成寫
        modify_event(epollfd,fd,EPOLLOUT);
    }
}

static void do_write(int epollfd,int fd,char *buf) {
    int nwrite;
    nwrite = write(fd,buf,strlen(buf));
    if (nwrite == -1)
    {
        perror("write error:");
        close(fd);
        delete_event(epollfd,fd,EPOLLOUT);
    }
    else
        modify_event(epollfd,fd,EPOLLIN);
    memset(buf,0,MAXSIZE);
}

static void add_event(int epollfd,int fd,int state) {
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
}

static void delete_event(int epollfd,int fd,int state) {
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
}

static void modify_event(int epollfd,int fd,int state) {
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}
複製代碼

客戶端

// client.c
#include <netinet/in.h>
#include <sys/socket.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <time.h>
#include <unistd.h>
#include <sys/types.h>
#include <arpa/inet.h>

#define MAXSIZE 1024
#define IPADDRESS "127.0.0.1"
#define SERV_PORT 8787
#define FDSIZE 1024
#define EPOLLEVENTS 20

static void handle_connection(int sockfd);
static void handle_events(int epollfd,struct epoll_event *events,int num,int sockfd,char *buf);
static void do_read(int epollfd,int fd,int sockfd,char *buf);
static void do_read(int epollfd,int fd,int sockfd,char *buf);
static void do_write(int epollfd,int fd,int sockfd,char *buf);
static void add_event(int epollfd,int fd,int state);
static void delete_event(int epollfd,int fd,int state);
static void modify_event(int epollfd,int fd,int state);

int main(int argc,char *argv[]) {
    int                 sockfd;
    struct sockaddr_in servaddr;
    sockfd = socket(AF_INET,SOCK_STREAM,0);
    bzero(&servaddr,sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(SERV_PORT);
    inet_pton(AF_INET,IPADDRESS,&servaddr.sin_addr);
    connect(sockfd,(struct sockaddr*)&servaddr,sizeof(servaddr));
    //處理鏈接
    handle_connection(sockfd);
    close(sockfd);
    return 0;
}


static void handle_connection(int sockfd) {
    int epollfd;
    struct epoll_event events[EPOLLEVENTS];
    char buf[MAXSIZE];
    int ret;
    epollfd = epoll_create(FDSIZE);
    add_event(epollfd,STDIN_FILENO,EPOLLIN);
    for ( ; ; )
    {
        ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
        handle_events(epollfd,events,ret,sockfd,buf);
    }
    close(epollfd);
}

static void handle_events(int epollfd,struct epoll_event *events,int num,int sockfd,char *buf) {
    int fd;
    int i;
    for (i = 0;i < num;i++)
    {
        fd = events[i].data.fd;
        if (events[i].events & EPOLLIN)
            do_read(epollfd,fd,sockfd,buf);
        else if (events[i].events & EPOLLOUT)
            do_write(epollfd,fd,sockfd,buf);
    }
}

static void do_read(int epollfd,int fd,int sockfd,char *buf) {
    int nread;
    nread = read(fd,buf,MAXSIZE);
        if (nread == -1)
    {
        perror("read error:");
        close(fd);
    }
    else if (nread == 0)
    {
        fprintf(stderr,"server close.\n");
        close(fd);
    }
    else
    {
        if (fd == STDIN_FILENO)
            add_event(epollfd,sockfd,EPOLLOUT);
        else
        {
            delete_event(epollfd,sockfd,EPOLLIN);
            add_event(epollfd,STDOUT_FILENO,EPOLLOUT);
        }
    }
}

static void do_write(int epollfd,int fd,int sockfd,char *buf) {
    int nwrite;
    nwrite = write(fd,buf,strlen(buf));
    if (nwrite == -1)
    {
        perror("write error:");
        close(fd);
    }
    else
    {
        if (fd == STDOUT_FILENO)
            delete_event(epollfd,fd,EPOLLOUT);
        else
            modify_event(epollfd,fd,EPOLLIN);
    }
    memset(buf,0,MAXSIZE);
}

static void add_event(int epollfd,int fd,int state) {
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
}

static void delete_event(int epollfd,int fd,int state) {
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
}

static void modify_event(int epollfd,int fd,int state) {
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}
複製代碼

運行結果:

Node.js的Event Loop

衆所周知,Node.js是單線程的,可用做高性能服務器。顯然,若僅僅限定爲1024個描述符,對於高併發的請求顯然是不支持的。Node.js內部的Event Demultiplexer(事件多路分解器)藉助epoll來實現事件循環機制。其基本步驟以下:

  1. 應用程序經過向Event Demultiplexer(事件多路分解器)提交請求來生成新的I / O操做。應用程序還指定一個處理程序,當操做完成時將調用該處理程序。向Event Demultiplexer(事件多路分解器)提交新請求是一種非阻塞調用,它當即將控制權返回給該應用程序。
  2. 當一組I / O操做完成時,事件多路分解器將新的事件推入Event Queue(事件隊列)
  3. 此時Event Loop遍歷Event Queue的項目。
  4. 對於每一個事件,調用關聯的處理程序。
  5. 處理程序是應用程序代碼的一部分,當它執行完成時將控制權返回給Event Loop。可是,在處理程序執行過程當中可能會請求新的異步操做,從而致使新的操做被插入Event Demultiplexer(事件多路分解器)
  6. Event Loop中的全部項目被處理完時,循環將再次阻塞Event Demultiplexer(事件多路分解器),當有新事件可用時,Event Demultiplexer(事件多路分解器)將觸發另外一個週期。

經過epoll解析上述步驟操做:

  • 事件多路分解器即爲經過epoll_create()建立的epoll句柄的抽象,在Node.js啓動時,事件多路分解器會阻塞於epoll_wait調用。
  • 對於第一步,註冊事件處理程序時,調用epoll_ctl()方法,設定op參數爲EPOLL_CTL_ADD,向事件多路分解器添加一組事件。
  • 對於第二步,一旦I / O完成,又調用epoll_ctl()方法,設定op參數爲EPOLL_CTL_DEL,刪除對應事件,此時把控制權返還給應用程序。
  • 事件循環遍歷事件隊列,只要沒有事件,就阻塞於epoll_wait()
  • 不斷重複上述步驟,實現Node.js`的事件循環機制。

參考資料:

相關文章
相關標籤/搜索