第15章 高併發服務器編程(3)_事件驅動模型

4. 事件驅動模型:epoll編程

4.1 epoll簡介服務器

(1)epoll是Linux內核爲處理大批量的socket而改進的poll,相對於select/poll來講,epoll更加靈活。它使用一個文件描述符來管理多個socket。網絡

(2)epoll之因此高效,是由於它將用戶關心的socket事件存放到內核的一個事件表中。而不是像select/poll每次調用都須要重複傳入fd_set。好比當一個事件發生(如讀事件),epoll無須遍歷整個被監聽的描述符集,只要遍歷那些被內核IO事件異步喚醒而加入就緒隊列的描述符集就好了。數據結構

4.2 epoll的工做機制異步

 

(1)調用epoll_create,內核會建立一個eventpoll結構體。該結構體中的rbr成員是一顆紅黑樹存儲着全部添加到epoll中須要監控的事件。而rdlist成員是一個雙向鏈表存放着將要經過epoll_wait返回給用戶的知足條件的事件socket

(2)調用epoll_ctl(),會向epoll對象中添加、刪除或修改感興趣的事件。tcp

(3)當調用epoll_wait檢查是否有事件發生時,內核會從eventpoll對象中的rdlist雙向鏈表中檢查是否有元素。若是有,則會把事件複製到用戶態,同時將事件數量返回給用戶。函數

4.3 epoll接口ui

(1)建立和關閉epoll對象spa

頭文件

#include <sys/epoll.h>

函數

int epoll_create(int size);

int close(int epollfd); //關閉epoll對象

參數

size:用於告訴內核要監聽的數目。注意,size並非限制了epoll所能監聽的描述符最大個數,只是對內核初始分配內部數據結構的一個建議。

返回值

成功時返回epoll對象的文件描述符。失敗返回-1

功能

建立一個epoll對象

(2)操做epoll對象

頭文件

#include <sys/epoll.h>

函數

int epoll_ctl (int epfd, int op, int fd, struct epoll_event* event);

參數

epfd:由epoll_create()的返回值

op:表示要將fd添加到epoll對象或從epoll對象刪除/修改fd等。

添加:EPOLL_CTL_ADD,刪除:EPOLL_CTL_DEL,修改:EPOLL_CTL_MOD

fd:須要監聽的fd(文件描述符)

struct epoll_event{  //告訴內核須要監聽什麼事件。
    __uint32_t events;  //如EPOLLIN,EPOLLOUT等。
    epoll_data_t data;  //用戶自定義的數據
};

返回值

成功時返回epoll對象的文件描述符。失敗返回-1

備註

struct epoll_event中的events成員變量的取值:

(1)EPOLLIN:表示對應的文件描述符能夠讀(包括對端socket正常關閉)

(2)EPOLLOUT:表示對應的文件描述符可寫

(3)EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(帶外數據的到來)

(4)EPOLLERR:表示對應的文件描述符發生錯誤

(5)EPOLLHUP:表示對應的文件描述符被掛斷

(6)EPOLLET:將epoll設置爲邊緣觸發

(7)EPOLLONESHOT:只監聽一次,當監聽完此次事件以後,若是還須要繼續監聽這個socket的話,須要再次把這個socket加入到epoll隊列裏。

(3)等待IO事件

頭文件

#include <sys/epoll.h>

函數

int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);

參數

epfd:由epoll_create()的返回值

events:用來接收從內核獲得事件的集合。

maxevents:最多返回多少個事件

timeout:超時時間(毫秒):0會當即返回。

返回值

成功時返回須要處理的事件數目,0表示超時

功能

等待IO事件

4.4 epoll的工做模式

(1)LT模式默認模式,同時支持block和non-block socket。當epoll_wait檢測到socket事件發生並將此事件通知應用程序,用戶能夠對這個就緒的socket進行IO操做,也能夠,能夠不當即處理該事件。若是你不做任何操做,在下次調用epoll_wait時,內核會繼續通知應用程序

(2)ET模式:邊緣模式只支持non-block socket。在這種模式下,只有當socket從未就結果變爲就緒時,內核纔會通知應用程序之後就再也不通知,直到應用程序對該socket作了某些操做,使得該socket再也不處理就緒狀態。值得注意的是,若是不對這個socket做IO操做,內核不會發送更多的通知(only once)

【編程實驗】echo服務器

 

(1)主線程建立epoll對象,並註冊listent socket的「讀就緒」事件到epoll請求隊列中。

(2)主線程調用epoll_wait等待事件發生。若是此時有用戶鏈接進來,則會調知主線程,並調用咱們設置的handle_accept函數。

(3)handle_accept函數中調用accept系統函數來接受請求,並將新的socket的「讀就緒」事件插入epoll對象的請求隊列中,等待客戶端發送數據過來。

(4)若是服務器收到客戶端發送的數據,主線程會從epoll_wait中返回,並將數據分派給handle_event函數去作(這裏能夠開啓一個工做線程來完成!)

(5)handle_event接收並處理數據,而後準備好發送緩衝區,再註冊「寫就緒」事件。當系統檢測到能夠寫數據後,就會調用sendData去發送數據。

(6)數據發送完後,從新註冊「讀就緒」事件,主線程調用epoll_wait等待客戶端發送新的數據過來。

【注意】在Reactor模式中,不必區分所謂的「讀工做線程」和「寫工做線程」。

//myevent.h

#ifndef __MYEVENT_H__
#define __MYEVENT_H__

typedef void (ev_callback)(int fd, int events, void* arg);
typedef struct _tag_myevent
{
    int fd;
    ev_callback*  callback;
    int events;  //事件類型(如EPOLLIN、EPOLLOUT等,也可按位或)
    void* arg;
    int status; //1:in epoll wait list, 0 not in;
    long last_active;   //last active time

    char buff[512];
    int len, s_offset; //標示發送和接收緩衝區當前的大小
}myevent_s;

void copyData(myevent_s* src, myevent_s* obj);
void event_set(myevent_s* ev, int fd, int events, ev_callback* callback);
void event_add(int epollFd, myevent_s* ev);
void event_del(int epollFd, myevent_s* ev);

#endif

//myevent.c

#include "myevent.h"
#include <memory.h>
#include <time.h>
#include <sys/epoll.h>

//拷貝數據
void copyData(myevent_s* src, myevent_s* obj)
{
    memcpy(obj->buff, src->buff, sizeof(obj->buff));
    obj->len = src->len;
       
    obj->s_offset = src->s_offset;
}

//自定義事件的封裝(包含事件發生時的回調函數、事件活躍時間等信息)
void event_set(myevent_s* ev, int fd, int events, ev_callback* callback)
{
    ev->fd = fd;
    ev->callback = callback;
    ev->events = events;
    ev->arg = ev;
    ev->status = 0;
    ev->last_active = time(NULL);

    memset(ev->buff, 0, sizeof(ev->buff));
    ev->len = 0;
    ev->s_offset = 0;
}

//向epoll對象中添加或修改事件
void event_add(int epollFd, myevent_s* ev)
{
    struct epoll_event epv={0, {0}};
    int op;
    epv.data.ptr = ev; //用戶數據,將自定義ev綁定到內核epoll_event上
    epv.events = ev->events;
   
    op = (ev->status == 1) ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
    ev->status = 1;

    if(epoll_ctl(epollFd, op, ev->fd, &epv) < 0){
        perror("event_add error");
    }
}

//將指定的事件從epoll對象中刪除
void event_del(int epollFd, myevent_s* ev)
{
    struct epoll_event epv ={0, {0}};
    if(ev->status != 1) return; //1: in epoll, 0: not in

    epv.data.ptr = ev;
    ev->status = 0;
    
    if(epoll_ctl(epollFd, EPOLL_CTL_DEL, ev->fd, &epv) < 0){
        perror("event_del error");
    }
}

//epoll.c

#include "myevent.h"
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netdb.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include <time.h>

#define IPADDRESS   "127.0.0.1"
#define PORT         8888
#define MAX_EVENTS   100

//全局變量
int g_epollFd;
myevent_s g_Events[MAX_EVENTS + 1]; //g_Events[MAX_EVENTS]用於保存listen fd

/*函數聲明*/
//設置爲非阻塞模式
static int set_nonblock(int sockfd);
//建立並綁定套接字
static int socket_bind(const char* ip, int port);
//IO多路複用epoll
static void do_epoll(int listenfd);
//事件處理函數
static void handle_events(myevent_s* ev);
//accept回調函數
static void handle_accept(int fd, int events, void* arg);
//接收數據
static void recvData(int fd, int events, void* arg);
//發送數據
static void sendData(int fd, int events, void* arg);

int main(int argc, char* argv[])
{
    int listenfd;
    listenfd = socket_bind(IPADDRESS, PORT);
       
    //監聽鏈接
    listen(listenfd, 5);

    do_epoll(listenfd);

    return 0;
}

//設置爲非阻塞模式
int set_nonblock(int sockfd)
{
    int iret = -1;
    int opts;
    opts = fcntl(sockfd, F_GETFL);
    if(opts < 0){
        perror("fcntl error");
        return -1;
    }

    opts |= O_NONBLOCK;
    if((iret = fcntl(sockfd, F_SETFL, opts)) < 0){
        perror("fcntl error");
    }

    return iret;
}

//建立套接字並進行綁定
int socket_bind(const char* ip, int port)
{
    int listenfd = -1;
    struct sockaddr_in servaddr;
    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    
    if(listen < 0){
        perror("socket error");
        exit(1);
    }
    
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &servaddr.sin_addr);
    servaddr.sin_port = htons(port);

    if(bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0){
        perror("bind error");
        exit(1);
    }
    
   // set_nonblock(listenfd); //設置爲非阻塞模式

    return listenfd;
}

//IO多路複用epoll
void do_epoll(int listenfd)
{
    //建立epoll對象
    g_epollFd = epoll_create(MAX_EVENTS);
    
    //將listen socket加入到epoll對象中
    event_set(&g_Events[MAX_EVENTS], listenfd, EPOLLIN, handle_accept);
    event_add(g_epollFd, &g_Events[MAX_EVENTS]);

    struct epoll_event events[MAX_EVENTS];
    printf("server is running:%s(%d)\n", IPADDRESS, PORT);

    int checkPos = 0;
    while(1){
        //1.檢查是否超時(只檢查前面的100個鏈接)
        long now = time(NULL);
        int i = 0;
        for(; i<100; i++, checkPos++){ //不檢測查listen fd
            if(checkPos == MAX_EVENTS)
                checkPos = 0;

            if(g_Events[checkPos].status != 1) continue;

            long duration = now - g_Events[checkPos].last_active;
            if(duration >=60){ //設置不活動超過60秒爲超時
                close(g_Events[checkPos].fd);
                event_del(g_epollFd, &g_Events[checkPos]);
                printf("[fd=%d] timeout.\n", g_Events[checkPos].fd);   
            }
        }

        //2.獲取己經準備好的socket事件
        int fds = epoll_wait(g_epollFd, events, MAX_EVENTS, 1000); //返回值爲發生的事件數量
        if(fds < 0){
            perror("epoll_wait error, exit\n");
            exit(1);
        }
        
        //處理事件
        for(i=0; i<fds; i++){
            myevent_s* ev = (myevent_s*)events[i].data.ptr;
            if((events[i].events & EPOLLIN) || (events[i].events & EPOLLOUT))
                handle_events(ev); //能夠將任務分派到新線程中處理。本例爲簡單起見,直接在主線程處理
        }
    }

    //關閉epoll對象
    close(g_epollFd);
}

//事件處理函數
void handle_events(myevent_s* ev)
{
     ev->callback(ev->fd, ev->events, ev);  
}

//接受客戶端鏈接
void handle_accept(int fd, int events, void* arg)
{
    struct sockaddr_in cliaddr;
    socklen_t len = sizeof(cliaddr);
    int nfd, i;
    
    //accept系統調用
    if((nfd = accept(fd, (struct sockaddr*)&cliaddr, &len)) < 0){
        perror("accept error");
        return;
    }

    //檢查鏈接數是否己經達到上限
    do{ //使用do...while(0)是個技巧,能夠代替goto
        //查找是否仍有可用鏈接數
        for(i=0; i<MAX_EVENTS; i++){
            if(g_Events[i].status == 0)
            break;
        }

        if(i == MAX_EVENTS){ //最後一個爲listen fd
            printf("max connection limit[%d]\n", MAX_EVENTS);
            break;  //跳出do...while
        }

        //找到可用鏈接,設置非阻塞模式
        //if(set_nonblock(nfd) < 0)
        //    break;  //跳出do...while
        
        //添加一個與客戶通訊的socket描述符事件
        event_set(&g_Events[i], nfd, EPOLLIN, recvData);
        event_add(g_epollFd, &g_Events[i]);
        
    }while(0);
}

//接收數據
void recvData(int fd, int events, void* arg)
{
    myevent_s* ev = (myevent_s*)arg;
    int len;

    //接收數據
    len = recv(fd, ev->buff + ev->len, sizeof(ev->buff) - ev->len, 0);
    event_del(g_epollFd, ev);
    
    if(len > 0){
        ev->len += len;
        ev->buff[len] = '\0';
        printf("Client[%d]:%s\n", fd, ev->buff);

        //設置寫事件
        myevent_s tmp;
        copyData(ev, &tmp);
        event_set(ev, fd, EPOLLOUT, sendData);
        copyData(&tmp, ev);
        event_add(g_epollFd, ev);
    }else if(len == 0){
         close(ev->fd);
         printf("[fd=%d], closed gracefully.\n", fd);
    }else{
        close(ev->fd);
        printf("recv [fd=%d] error.\n",fd);
    }
}

//發送數據
void sendData(int fd, int events, void* arg){
    myevent_s* ev = (myevent_s*)arg;
    int len;
    //發送數據
    len = send(fd, ev->buff + ev->s_offset, ev->len - ev->s_offset, 0);
    if(len > 0){
        printf("send[fd=%d], [%d<->%d]%s\n", fd, len, ev->len, ev->buff);
        ev->s_offset += len;
        if(ev->s_offset == ev->len){
            //設置讀事件
            event_del(g_epollFd, ev);
            event_set(ev, fd, EPOLLIN, recvData);
            event_add(g_epollFd, ev);
        }
    }else{
        close(ev->fd);
        event_del(g_epollFd, ev);
        printf("send[fd=%d] error.\n", fd);
    }
}
/*輸出結果
 [root@localhost 15.AdvNet]# bin/epoll2  
 server is running:127.0.0.1(8888)
 Client[5]:abcdef
 send[fd=5], [512<->512]abcdef
 Client[5]:1234567
 send[fd=5], [512<->512]1234567
 ^C
 */

//echo_tcp_client.c(與上一例相同)

#include <netdb.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>

int main(int argc, char* argv[])
{
    if(argc < 3){
        printf("usage: %s ip port\n", argv[0]);
        exit(1);
    }

    /*步驟1: 建立socket(套接字)*/
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd < 0){
        perror("socket error");
    }

    //往servAddr中填入ip、port和地址族類型
    struct sockaddr_in servAddr;
    memset(&servAddr, 0, sizeof(servAddr));
    servAddr.sin_family = AF_INET;
    servAddr.sin_port = htons(atoi(argv[2]));
    //將ip地址轉換成網絡字節序後填入servAdd中
    inet_pton(AF_INET, argv[1], &servAddr.sin_addr.s_addr);

    /*步驟2: 客戶端調用connect函數鏈接到服務器端*/
    if(connect(sockfd, (struct sockaddr*)&servAddr, sizeof(servAddr)) < 0){
        perror("connect error");
        exit(1);
    }

    /*步驟3: 調用自定義的協議處理函數和服務端進行雙向通訊*/
    char buff[512];
    size_t size;
    char* prompt = ">";

    while(1){
        memset(buff, 0, sizeof(buff));
        write(STDOUT_FILENO, prompt, 1);
        size = read(STDIN_FILENO, buff, sizeof(buff));
        if(size < 0) continue;

        buff[size-1] = '\0';
        //將鍵盤輸入的內容發送到服務端
        if(write(sockfd, buff, sizeof(buff)) < 0){
            perror("write error");
            continue;
        }else{
            memset(buff, 0, sizeof(buff));
            //讀取來自服務端的消息
            if(read(sockfd, buff, sizeof(buff)) < 0){
                perror("read error");
                continue;
            }else{
                printf("%s\n", buff);
            }
        }
    }

    /*關閉套接字*/
    close(sockfd);
}
相關文章
相關標籤/搜索