4. 事件驅動模型:epoll編程
4.1 epoll簡介服務器
(1)epoll是Linux內核爲處理大批量的socket而改進的poll,相對於select/poll來講,epoll更加靈活。它使用一個文件描述符來管理多個socket。網絡
(2)epoll之因此高效,是由於它將用戶關心的socket事件存放到內核的一個事件表中。而不是像select/poll每次調用都須要重複傳入fd_set。好比當一個事件發生(如讀事件),epoll無須遍歷整個被監聽的描述符集,只要遍歷那些被內核IO事件異步喚醒而加入就緒隊列的描述符集就好了。數據結構
4.2 epoll的工做機制異步
(1)調用epoll_create,內核會建立一個eventpoll結構體。該結構體中的rbr成員是一顆紅黑樹,存儲着全部添加到epoll中須要監控的事件。而rdlist成員是一個雙向鏈表,存放着將要經過epoll_wait返回給用戶的知足條件的事件。socket
(2)調用epoll_ctl(),會向epoll對象中添加、刪除或修改感興趣的事件。tcp
(3)當調用epoll_wait檢查是否有事件發生時,內核會從eventpoll對象中的rdlist雙向鏈表中檢查是否有元素。若是有,則會把事件複製到用戶態,同時將事件數量返回給用戶。函數
4.3 epoll接口ui
(1)建立和關閉epoll對象spa
頭文件 |
#include <sys/epoll.h> |
函數 |
int epoll_create(int size); int close(int epollfd); //關閉epoll對象 |
參數 |
size:用於告訴內核要監聽的數目。注意,size並非限制了epoll所能監聽的描述符最大個數,只是對內核初始分配內部數據結構的一個建議。 |
返回值 |
成功時返回epoll對象的文件描述符。失敗返回-1 |
功能 |
建立一個epoll對象 |
(2)操做epoll對象
頭文件 |
#include <sys/epoll.h> |
函數 |
int epoll_ctl (int epfd, int op, int fd, struct epoll_event* event); |
參數 |
epfd:由epoll_create()的返回值 op:表示要將fd添加到epoll對象或從epoll對象刪除/修改fd等。 添加:EPOLL_CTL_ADD,刪除:EPOLL_CTL_DEL,修改:EPOLL_CTL_MOD fd:須要監聽的fd(文件描述符) struct epoll_event{ //告訴內核須要監聽什麼事件。 __uint32_t events; //如EPOLLIN,EPOLLOUT等。 epoll_data_t data; //用戶自定義的數據 }; |
返回值 |
成功時返回epoll對象的文件描述符。失敗返回-1 |
備註 |
struct epoll_event中的events成員變量的取值: (1)EPOLLIN:表示對應的文件描述符能夠讀(包括對端socket正常關閉) (2)EPOLLOUT:表示對應的文件描述符可寫 (3)EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(帶外數據的到來) (4)EPOLLERR:表示對應的文件描述符發生錯誤 (5)EPOLLHUP:表示對應的文件描述符被掛斷 (6)EPOLLET:將epoll設置爲邊緣觸發 (7)EPOLLONESHOT:只監聽一次,當監聽完此次事件以後,若是還須要繼續監聽這個socket的話,須要再次把這個socket加入到epoll隊列裏。 |
(3)等待IO事件
頭文件 |
#include <sys/epoll.h> |
函數 |
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout); |
參數 |
epfd:由epoll_create()的返回值 events:用來接收從內核獲得事件的集合。 maxevents:最多返回多少個事件 timeout:超時時間(毫秒):0會當即返回。 |
返回值 |
成功時返回須要處理的事件數目,0表示超時 |
功能 |
等待IO事件 |
4.4 epoll的工做模式
(1)LT模式:默認模式,同時支持block和non-block socket。當epoll_wait檢測到socket事件發生並將此事件通知應用程序,用戶能夠對這個就緒的socket進行IO操做,也能夠,能夠不當即處理該事件。若是你不做任何操做,在下次調用epoll_wait時,內核會繼續通知應用程序。
(2)ET模式:邊緣模式,只支持non-block socket。在這種模式下,只有當socket從未就結果變爲就緒時,內核纔會通知應用程序,之後就再也不通知,直到應用程序對該socket作了某些操做,使得該socket再也不處理就緒狀態。值得注意的是,若是不對這個socket做IO操做,內核不會發送更多的通知(only once)
【編程實驗】echo服務器
(1)主線程建立epoll對象,並註冊listent socket的「讀就緒」事件到epoll請求隊列中。
(2)主線程調用epoll_wait等待事件發生。若是此時有用戶鏈接進來,則會調知主線程,並調用咱們設置的handle_accept函數。
(3)handle_accept函數中調用accept系統函數來接受請求,並將新的socket的「讀就緒」事件插入epoll對象的請求隊列中,等待客戶端發送數據過來。
(4)若是服務器收到客戶端發送的數據,主線程會從epoll_wait中返回,並將數據分派給handle_event函數去作(這裏能夠開啓一個工做線程來完成!)
(5)handle_event接收並處理數據,而後準備好發送緩衝區,再註冊「寫就緒」事件。當系統檢測到能夠寫數據後,就會調用sendData去發送數據。
(6)數據發送完後,從新註冊「讀就緒」事件,主線程調用epoll_wait等待客戶端發送新的數據過來。
【注意】在Reactor模式中,不必區分所謂的「讀工做線程」和「寫工做線程」。
//myevent.h
#ifndef __MYEVENT_H__ #define __MYEVENT_H__ typedef void (ev_callback)(int fd, int events, void* arg); typedef struct _tag_myevent { int fd; ev_callback* callback; int events; //事件類型(如EPOLLIN、EPOLLOUT等,也可按位或) void* arg; int status; //1:in epoll wait list, 0 not in; long last_active; //last active time char buff[512]; int len, s_offset; //標示發送和接收緩衝區當前的大小 }myevent_s; void copyData(myevent_s* src, myevent_s* obj); void event_set(myevent_s* ev, int fd, int events, ev_callback* callback); void event_add(int epollFd, myevent_s* ev); void event_del(int epollFd, myevent_s* ev); #endif
//myevent.c
#include "myevent.h" #include <memory.h> #include <time.h> #include <sys/epoll.h> //拷貝數據 void copyData(myevent_s* src, myevent_s* obj) { memcpy(obj->buff, src->buff, sizeof(obj->buff)); obj->len = src->len; obj->s_offset = src->s_offset; } //自定義事件的封裝(包含事件發生時的回調函數、事件活躍時間等信息) void event_set(myevent_s* ev, int fd, int events, ev_callback* callback) { ev->fd = fd; ev->callback = callback; ev->events = events; ev->arg = ev; ev->status = 0; ev->last_active = time(NULL); memset(ev->buff, 0, sizeof(ev->buff)); ev->len = 0; ev->s_offset = 0; } //向epoll對象中添加或修改事件 void event_add(int epollFd, myevent_s* ev) { struct epoll_event epv={0, {0}}; int op; epv.data.ptr = ev; //用戶數據,將自定義ev綁定到內核epoll_event上 epv.events = ev->events; op = (ev->status == 1) ? EPOLL_CTL_MOD : EPOLL_CTL_ADD; ev->status = 1; if(epoll_ctl(epollFd, op, ev->fd, &epv) < 0){ perror("event_add error"); } } //將指定的事件從epoll對象中刪除 void event_del(int epollFd, myevent_s* ev) { struct epoll_event epv ={0, {0}}; if(ev->status != 1) return; //1: in epoll, 0: not in epv.data.ptr = ev; ev->status = 0; if(epoll_ctl(epollFd, EPOLL_CTL_DEL, ev->fd, &epv) < 0){ perror("event_del error"); } }
//epoll.c
#include "myevent.h" #include <sys/socket.h> #include <sys/epoll.h> #include <netdb.h> #include <fcntl.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <memory.h> #include <time.h> #define IPADDRESS "127.0.0.1" #define PORT 8888 #define MAX_EVENTS 100 //全局變量 int g_epollFd; myevent_s g_Events[MAX_EVENTS + 1]; //g_Events[MAX_EVENTS]用於保存listen fd /*函數聲明*/ //設置爲非阻塞模式 static int set_nonblock(int sockfd); //建立並綁定套接字 static int socket_bind(const char* ip, int port); //IO多路複用epoll static void do_epoll(int listenfd); //事件處理函數 static void handle_events(myevent_s* ev); //accept回調函數 static void handle_accept(int fd, int events, void* arg); //接收數據 static void recvData(int fd, int events, void* arg); //發送數據 static void sendData(int fd, int events, void* arg); int main(int argc, char* argv[]) { int listenfd; listenfd = socket_bind(IPADDRESS, PORT); //監聽鏈接 listen(listenfd, 5); do_epoll(listenfd); return 0; } //設置爲非阻塞模式 int set_nonblock(int sockfd) { int iret = -1; int opts; opts = fcntl(sockfd, F_GETFL); if(opts < 0){ perror("fcntl error"); return -1; } opts |= O_NONBLOCK; if((iret = fcntl(sockfd, F_SETFL, opts)) < 0){ perror("fcntl error"); } return iret; } //建立套接字並進行綁定 int socket_bind(const char* ip, int port) { int listenfd = -1; struct sockaddr_in servaddr; listenfd = socket(AF_INET, SOCK_STREAM, 0); if(listen < 0){ perror("socket error"); exit(1); } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; inet_pton(AF_INET, ip, &servaddr.sin_addr); servaddr.sin_port = htons(port); if(bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0){ perror("bind error"); exit(1); } // set_nonblock(listenfd); //設置爲非阻塞模式 return listenfd; } //IO多路複用epoll void do_epoll(int listenfd) { //建立epoll對象 g_epollFd = epoll_create(MAX_EVENTS); //將listen socket加入到epoll對象中 event_set(&g_Events[MAX_EVENTS], listenfd, EPOLLIN, handle_accept); event_add(g_epollFd, &g_Events[MAX_EVENTS]); struct epoll_event events[MAX_EVENTS]; printf("server is running:%s(%d)\n", IPADDRESS, PORT); int checkPos = 0; while(1){ //1.檢查是否超時(只檢查前面的100個鏈接) long now = time(NULL); int i = 0; for(; i<100; i++, checkPos++){ //不檢測查listen fd if(checkPos == MAX_EVENTS) checkPos = 0; if(g_Events[checkPos].status != 1) continue; long duration = now - g_Events[checkPos].last_active; if(duration >=60){ //設置不活動超過60秒爲超時 close(g_Events[checkPos].fd); event_del(g_epollFd, &g_Events[checkPos]); printf("[fd=%d] timeout.\n", g_Events[checkPos].fd); } } //2.獲取己經準備好的socket事件 int fds = epoll_wait(g_epollFd, events, MAX_EVENTS, 1000); //返回值爲發生的事件數量 if(fds < 0){ perror("epoll_wait error, exit\n"); exit(1); } //處理事件 for(i=0; i<fds; i++){ myevent_s* ev = (myevent_s*)events[i].data.ptr; if((events[i].events & EPOLLIN) || (events[i].events & EPOLLOUT)) handle_events(ev); //能夠將任務分派到新線程中處理。本例爲簡單起見,直接在主線程處理 } } //關閉epoll對象 close(g_epollFd); } //事件處理函數 void handle_events(myevent_s* ev) { ev->callback(ev->fd, ev->events, ev); } //接受客戶端鏈接 void handle_accept(int fd, int events, void* arg) { struct sockaddr_in cliaddr; socklen_t len = sizeof(cliaddr); int nfd, i; //accept系統調用 if((nfd = accept(fd, (struct sockaddr*)&cliaddr, &len)) < 0){ perror("accept error"); return; } //檢查鏈接數是否己經達到上限 do{ //使用do...while(0)是個技巧,能夠代替goto //查找是否仍有可用鏈接數 for(i=0; i<MAX_EVENTS; i++){ if(g_Events[i].status == 0) break; } if(i == MAX_EVENTS){ //最後一個爲listen fd printf("max connection limit[%d]\n", MAX_EVENTS); break; //跳出do...while } //找到可用鏈接,設置非阻塞模式 //if(set_nonblock(nfd) < 0) // break; //跳出do...while //添加一個與客戶通訊的socket描述符事件 event_set(&g_Events[i], nfd, EPOLLIN, recvData); event_add(g_epollFd, &g_Events[i]); }while(0); } //接收數據 void recvData(int fd, int events, void* arg) { myevent_s* ev = (myevent_s*)arg; int len; //接收數據 len = recv(fd, ev->buff + ev->len, sizeof(ev->buff) - ev->len, 0); event_del(g_epollFd, ev); if(len > 0){ ev->len += len; ev->buff[len] = '\0'; printf("Client[%d]:%s\n", fd, ev->buff); //設置寫事件 myevent_s tmp; copyData(ev, &tmp); event_set(ev, fd, EPOLLOUT, sendData); copyData(&tmp, ev); event_add(g_epollFd, ev); }else if(len == 0){ close(ev->fd); printf("[fd=%d], closed gracefully.\n", fd); }else{ close(ev->fd); printf("recv [fd=%d] error.\n",fd); } } //發送數據 void sendData(int fd, int events, void* arg){ myevent_s* ev = (myevent_s*)arg; int len; //發送數據 len = send(fd, ev->buff + ev->s_offset, ev->len - ev->s_offset, 0); if(len > 0){ printf("send[fd=%d], [%d<->%d]%s\n", fd, len, ev->len, ev->buff); ev->s_offset += len; if(ev->s_offset == ev->len){ //設置讀事件 event_del(g_epollFd, ev); event_set(ev, fd, EPOLLIN, recvData); event_add(g_epollFd, ev); } }else{ close(ev->fd); event_del(g_epollFd, ev); printf("send[fd=%d] error.\n", fd); } } /*輸出結果 [root@localhost 15.AdvNet]# bin/epoll2 server is running:127.0.0.1(8888) Client[5]:abcdef send[fd=5], [512<->512]abcdef Client[5]:1234567 send[fd=5], [512<->512]1234567 ^C */
//echo_tcp_client.c(與上一例相同)
#include <netdb.h> #include <sys/socket.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <memory.h> int main(int argc, char* argv[]) { if(argc < 3){ printf("usage: %s ip port\n", argv[0]); exit(1); } /*步驟1: 建立socket(套接字)*/ int sockfd = socket(AF_INET, SOCK_STREAM, 0); if(sockfd < 0){ perror("socket error"); } //往servAddr中填入ip、port和地址族類型 struct sockaddr_in servAddr; memset(&servAddr, 0, sizeof(servAddr)); servAddr.sin_family = AF_INET; servAddr.sin_port = htons(atoi(argv[2])); //將ip地址轉換成網絡字節序後填入servAdd中 inet_pton(AF_INET, argv[1], &servAddr.sin_addr.s_addr); /*步驟2: 客戶端調用connect函數鏈接到服務器端*/ if(connect(sockfd, (struct sockaddr*)&servAddr, sizeof(servAddr)) < 0){ perror("connect error"); exit(1); } /*步驟3: 調用自定義的協議處理函數和服務端進行雙向通訊*/ char buff[512]; size_t size; char* prompt = ">"; while(1){ memset(buff, 0, sizeof(buff)); write(STDOUT_FILENO, prompt, 1); size = read(STDIN_FILENO, buff, sizeof(buff)); if(size < 0) continue; buff[size-1] = '\0'; //將鍵盤輸入的內容發送到服務端 if(write(sockfd, buff, sizeof(buff)) < 0){ perror("write error"); continue; }else{ memset(buff, 0, sizeof(buff)); //讀取來自服務端的消息 if(read(sockfd, buff, sizeof(buff)) < 0){ perror("read error"); continue; }else{ printf("%s\n", buff); } } } /*關閉套接字*/ close(sockfd); }