關於
I / O複用
,能夠先查閱《UNIX網絡編程》第六章select
和poll
相關內容。html
select
、poll
、epoll
都是I / O
複用的機制,在《UNIX網絡編程》
裏重點講了select
、poll
的機制,但select
、poll
並非現代高性能服務器的最佳選擇。包括如今的Node.js
中的事件循環機制(event loop)
也是基於epoll
實現的。linux
按照《UNIX網絡編程》
中所述,poll
與select
相似,沒有解決如下的問題:git
select
,都須要把fd
集合從用戶態拷貝到內核態,這個開銷在fd
不少時會很大select
都須要在內核遍歷傳遞進來的全部fd
,這個開銷在fd
不少時也很大select
支持的文件描述符數量過小了,默認是1024
epoll
既然是對select
和poll
的改進,就應該能避免上述的三個缺點。那epoll
都是怎麼解決的呢?在此以前,咱們先看一下epoll
和select
和poll
的調用接口上的不一樣,select
和poll
都只提供了一個函數——select
或者poll
函數。而epoll
提供了三個函數,epoll_create
,epoll_ctl
和epoll_wait
,epoll_create
是建立一個epoll
句柄;epoll_ctl
是註冊要監聽的事件類型;epoll_wait
則是等待事件的產生。 對於第一個缺點,epoll
的解決方案在epoll_ctl
函數中。每次註冊新的事件到epoll
句柄中時(在epoll_ctl
中指定EPOLL_CTL_ADD
),會把全部的fd
拷貝進內核,而不是在epoll_wait
的時候重複拷貝。epoll
保證了每一個fd
在整個過程當中只會拷貝一次。 對於第二個缺點,epoll
的解決方案不像select
或poll
同樣每次都把current
輪流加入fd
對應的設備等待隊列中,而只在epoll_ctl
時把current
掛一遍(這一遍必不可少)併爲每一個fd
指定一個回調函數,當設備就緒,喚醒等待隊列上的等待者時,就會調用這個回調函數,而這個回調函數會把就緒的fd加入一個就緒鏈表)。epoll_wait
的工做實際上就是在這個就緒鏈表中查看有沒有就緒的fd
(利用schedule_timeout()
實現睡一會,判斷一會的效果,和select
實現中的第7步是相似的)。 對於第三個缺點,epoll
沒有這個限制,它所支持的FD
上限是最大能夠打開文件的數目,這個數字通常遠大於2048
,舉個例子,在1GB
內存的機器上大約是10萬
左右,通常來講這個數目和系統內存關係很大。github
epoll操做過程須要三個接口,分別以下:編程
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
複製代碼
epoll_create
方法#include <sys/epoll.h>
int epoll_create(int size);
複製代碼
建立一個epoll
的句柄,size
用來告訴內核這個監聽的數目一共有多大,這個參數不一樣於select()
中的第一個參數,給出最大監聽的fd+1
的值,參數size
並非限制了epoll
所能監聽的描述符最大個數,只是對內核初始分配內部數據結構的一個建議。 當建立好epoll
句柄後,它就會佔用一個fd
值,在linux
下若是查看/proc/
進程id/fd/
,是可以看到這個fd
的,因此在使用完epoll
後,必須調用close()
關閉,不然可能致使fd
被耗盡。segmentfault
#include <sys/epoll.h>
#define FDSIZE 1024
// ...
int main(int argc,char *argv[]) {
int epollfd = epoll_create(FDSIZE); // 這裏並非指最大文件描述符數量爲1024,而是給內核初始化數據結構的一個建議。
return 0;
}
複製代碼
epoll_ctl
方法#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
複製代碼
epoll_ctl
方法是epoll
的事件註冊函數,它不一樣與select()
是在監聽事件時告訴內核要監聽什麼類型的事件,而是在這裏先註冊要監聽的事件類型。設計模式
epfd
:是epoll_create()
的返回值。op
:表示對對應的fd
文件描述符的操做,通常狀況下表示想要監聽事件、刪除事件和修改事件處理函數,用三個宏來表示:
EPOLL_CTL_ADD
,表示對於對應的fd
文件描述符添加一組事件監聽;EPOLL_CTL_DEL
,表示對於對應的fd
文件描述符刪除該組事件監聽;EPOLL_CTL_MOD
,表示對於對應的fd
文件描述符修改該組事件監聽爲新的events
;fd
:表示須要監聽的fd
(文件描述符)event
:是告訴內核須要監聽的事件集合,傳入一個指針,指向事件集合的第一項,struct epoll_event
的結構以下:struct epoll_event {
__uint32_t events; // 表示一類epoll事件
epoll_data_t data; // 用戶傳遞的數據
}
複製代碼
由於events
表示一類epoll
事件,它能夠是如下幾個宏的集合:服務器
EPOLLIN
:表示對應的文件描述符能夠讀(包括對端SOCKET正常關閉);EPOLLOUT
:表示對應的文件描述符能夠寫;EPOLLPRI
:表示對應的文件描述符有緊急的數據可讀(這裏應該表示有帶外數據到來);EPOLLERR
:表示對應的文件描述符發生錯誤;EPOLLHUP
:表示對應的文件描述符被掛斷;EPOLLET
: 將EPOLL
設爲邊緣觸發(Edge Triggered
)模式,這是相對於水平觸發(Level Triggered
)來講的;EPOLLONESHOT
:只監聽一次事件,當監聽完此次事件以後,若是還須要繼續監聽這個socket
的話,須要再次把這個socket
加入到EPOLL
隊列裏;例如,若是想讓epoll
對於對應的文件描述符fd
添加一組事件,監聽對應的文件描述符可讀的狀況:網絡
static void add_event_epoll_in(int epollfd, int fd) {
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
}
複製代碼
epoll_wait
方法#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
複製代碼
epoll_wait
方法等待事件的產生,相似於select()
調用,返回須要處理的事件數目。數據結構
epfd
:是epoll_create()
的返回值;events
:用來從內核獲得事件的集合,咱們通常把須要處理的事件對應的文件描述符fd
放到events
結構體下data
參數內,這樣咱們能夠在事件處理函數中取到對應的文件描述符fd
,執行對應操做(例如對於TCP
套接字,咱們調用read
,write
等);maxevents
:告以內核這個events
的數量,這個maxevents
的值不能大於建立epoll_create()
時的size
,不然會形成溢出的風險;timeout
:超時時間,以毫秒爲單位,若是設置爲-1
,表示一直等待,設置爲0
表示不等待;舉例:應用程序通常阻塞與epoll_wait
調用,一旦events
中任意一事件觸發,epoll_wait
執行,等待指定的timeout
超時時間,若是I / O
完成則當即返回須要處理的事件數目。
static void do_epoll(int listenfd) {
int epollfd;
struct epoll_event events[EPOLLEVENTS];
int ret;
char buf[MAXSIZE];
memset(buf,0,MAXSIZE);
//建立一個描述符
epollfd = epoll_create(FDSIZE);
//添加監聽描述符事件
add_event_epoll_in(epollfd, listenfd);
for ( ; ; )
{
// 獲取已經準備好的描述符事件數目
ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
handle_events(epollfd, events, ret, listenfd, buf);
}
close(epollfd);
}
static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf) {
int i;
int fd;
//進行選好遍歷
for (i = 0;i < num;i++)
{
// 在這裏取到須要處理的文件描述符
fd = events[i].data.fd;
//根據描述符的類型和事件類型進行處理
if ((fd == listenfd) &&(events[i].events & EPOLLIN))
handle_accpet(epollfd,listenfd);
else if (events[i].events & EPOLLIN)
do_read(epollfd,fd,buf);
else if (events[i].events & EPOLLOUT)
do_write(epollfd,fd,buf);
}
}
複製代碼
epoll
重構服務器回射程序// server.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h>
#define IPADDRESS "127.0.0.1"
#define PORT 8787
#define MAXSIZE 1024
#define LISTENQ 5
#define FDSIZE 1000
#define EPOLLEVENTS 100
//函數聲明
//建立套接字並進行綁定
static int socket_bind(const char* ip,int port);
//IO多路複用epoll
static void do_epoll(int listenfd);
//事件處理函數
static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf);
//處理接收到的鏈接
static void handle_accpet(int epollfd,int listenfd);
//讀處理
static void do_read(int epollfd,int fd,char *buf);
//寫處理
static void do_write(int epollfd,int fd,char *buf);
//添加事件
static void add_event(int epollfd,int fd,int state);
//修改事件
static void modify_event(int epollfd,int fd,int state);
//刪除事件
static void delete_event(int epollfd,int fd,int state);
int main(int argc,char *argv[]) {
int listenfd;
listenfd = socket_bind(IPADDRESS,PORT);
listen(listenfd,LISTENQ);
do_epoll(listenfd);
return 0;
}
static int socket_bind(const char* ip,int port) {
int listenfd;
struct sockaddr_in servaddr;
listenfd = socket(AF_INET,SOCK_STREAM,0);
if (listenfd == -1)
{
perror("socket error:");
exit(1);
}
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET,ip,&servaddr.sin_addr);
servaddr.sin_port = htons(port);
if (bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1)
{
perror("bind error: ");
exit(1);
}
return listenfd;
}
static void do_epoll(int listenfd) {
int epollfd;
struct epoll_event events[EPOLLEVENTS];
int ret;
char buf[MAXSIZE];
memset(buf,0,MAXSIZE);
//建立一個描述符
epollfd = epoll_create(FDSIZE);
//添加監聽描述符事件
add_event(epollfd,listenfd,EPOLLIN);
for ( ; ; )
{
//獲取已經準備好的描述符事件
ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
handle_events(epollfd,events,ret,listenfd,buf);
}
close(epollfd);
}
static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf) {
int i;
int fd;
//進行選好遍歷
for (i = 0;i < num;i++)
{
fd = events[i].data.fd;
//根據描述符的類型和事件類型進行處理
if ((fd == listenfd) &&(events[i].events & EPOLLIN))
handle_accpet(epollfd,listenfd);
else if (events[i].events & EPOLLIN)
do_read(epollfd,fd,buf);
else if (events[i].events & EPOLLOUT)
do_write(epollfd,fd,buf);
}
}
static void handle_accpet(int epollfd,int listenfd) {
int clifd;
struct sockaddr_in cliaddr;
socklen_t cliaddrlen;
clifd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen);
if (clifd == -1)
perror("accpet error:");
else
{
printf("accept a new client: %s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);
//添加一個客戶描述符和事件
add_event(epollfd,clifd,EPOLLIN);
}
}
static void do_read(int epollfd,int fd,char *buf) {
int nread;
nread = read(fd,buf,MAXSIZE);
if (nread == -1)
{
perror("read error:");
close(fd);
delete_event(epollfd,fd,EPOLLIN);
}
else if (nread == 0)
{
fprintf(stderr,"client close.\n");
close(fd);
delete_event(epollfd,fd,EPOLLIN);
}
else
{
printf("read message is : %s",buf);
//修改描述符對應的事件,由讀改成寫
modify_event(epollfd,fd,EPOLLOUT);
}
}
static void do_write(int epollfd,int fd,char *buf) {
int nwrite;
nwrite = write(fd,buf,strlen(buf));
if (nwrite == -1)
{
perror("write error:");
close(fd);
delete_event(epollfd,fd,EPOLLOUT);
}
else
modify_event(epollfd,fd,EPOLLIN);
memset(buf,0,MAXSIZE);
}
static void add_event(int epollfd,int fd,int state) {
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
}
static void delete_event(int epollfd,int fd,int state) {
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
}
static void modify_event(int epollfd,int fd,int state) {
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}
複製代碼
// client.c
#include <netinet/in.h>
#include <sys/socket.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <time.h>
#include <unistd.h>
#include <sys/types.h>
#include <arpa/inet.h>
#define MAXSIZE 1024
#define IPADDRESS "127.0.0.1"
#define SERV_PORT 8787
#define FDSIZE 1024
#define EPOLLEVENTS 20
static void handle_connection(int sockfd);
static void handle_events(int epollfd,struct epoll_event *events,int num,int sockfd,char *buf);
static void do_read(int epollfd,int fd,int sockfd,char *buf);
static void do_read(int epollfd,int fd,int sockfd,char *buf);
static void do_write(int epollfd,int fd,int sockfd,char *buf);
static void add_event(int epollfd,int fd,int state);
static void delete_event(int epollfd,int fd,int state);
static void modify_event(int epollfd,int fd,int state);
int main(int argc,char *argv[]) {
int sockfd;
struct sockaddr_in servaddr;
sockfd = socket(AF_INET,SOCK_STREAM,0);
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
inet_pton(AF_INET,IPADDRESS,&servaddr.sin_addr);
connect(sockfd,(struct sockaddr*)&servaddr,sizeof(servaddr));
//處理鏈接
handle_connection(sockfd);
close(sockfd);
return 0;
}
static void handle_connection(int sockfd) {
int epollfd;
struct epoll_event events[EPOLLEVENTS];
char buf[MAXSIZE];
int ret;
epollfd = epoll_create(FDSIZE);
add_event(epollfd,STDIN_FILENO,EPOLLIN);
for ( ; ; )
{
ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
handle_events(epollfd,events,ret,sockfd,buf);
}
close(epollfd);
}
static void handle_events(int epollfd,struct epoll_event *events,int num,int sockfd,char *buf) {
int fd;
int i;
for (i = 0;i < num;i++)
{
fd = events[i].data.fd;
if (events[i].events & EPOLLIN)
do_read(epollfd,fd,sockfd,buf);
else if (events[i].events & EPOLLOUT)
do_write(epollfd,fd,sockfd,buf);
}
}
static void do_read(int epollfd,int fd,int sockfd,char *buf) {
int nread;
nread = read(fd,buf,MAXSIZE);
if (nread == -1)
{
perror("read error:");
close(fd);
}
else if (nread == 0)
{
fprintf(stderr,"server close.\n");
close(fd);
}
else
{
if (fd == STDIN_FILENO)
add_event(epollfd,sockfd,EPOLLOUT);
else
{
delete_event(epollfd,sockfd,EPOLLIN);
add_event(epollfd,STDOUT_FILENO,EPOLLOUT);
}
}
}
static void do_write(int epollfd,int fd,int sockfd,char *buf) {
int nwrite;
nwrite = write(fd,buf,strlen(buf));
if (nwrite == -1)
{
perror("write error:");
close(fd);
}
else
{
if (fd == STDOUT_FILENO)
delete_event(epollfd,fd,EPOLLOUT);
else
modify_event(epollfd,fd,EPOLLIN);
}
memset(buf,0,MAXSIZE);
}
static void add_event(int epollfd,int fd,int state) {
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
}
static void delete_event(int epollfd,int fd,int state) {
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
}
static void modify_event(int epollfd,int fd,int state) {
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}
複製代碼
運行結果:
衆所周知,Node.js
是單線程的,可用做高性能服務器。顯然,若僅僅限定爲1024
個描述符,對於高併發的請求顯然是不支持的。Node.js
內部的Event Demultiplexer(事件多路分解器)
藉助epoll
來實現事件循環機制。其基本步驟以下:
Event Demultiplexer(事件多路分解器)
提交請求來生成新的I / O
操做。應用程序還指定一個處理程序,當操做完成時將調用該處理程序。向Event Demultiplexer(事件多路分解器)
提交新請求是一種非阻塞調用,它當即將控制權返回給該應用程序。I / O
操做完成時,事件多路分解器將新的事件推入Event Queue(事件隊列)
。Event Loop
遍歷Event Queue
的項目。Event Loop
。可是,在處理程序執行過程當中可能會請求新的異步操做,從而致使新的操做被插入Event Demultiplexer(事件多路分解器)
Event Loop
中的全部項目被處理完時,循環將再次阻塞Event Demultiplexer(事件多路分解器)
,當有新事件可用時,Event Demultiplexer(事件多路分解器)
將觸發另外一個週期。經過epoll
解析上述步驟操做:
epoll_create()
建立的epoll
句柄的抽象,在Node.js
啓動時,事件多路分解器會阻塞於epoll_wait
調用。epoll_ctl()
方法,設定op
參數爲EPOLL_CTL_ADD
,向事件多路分解器添加一組事件。I / O
完成,又調用epoll_ctl()
方法,設定op
參數爲EPOLL_CTL_DEL
,刪除對應事件,此時把控制權返還給應用程序。epoll_wait()
。參考資料: