epoll完整例子

#include <deque>
#include <map>
#include <vector>
#include <pthread.h>
#include <semaphore.h>
#include <time.h>
#include <sys/time.h>
#include <sys/shm.h>
#include <errno.h>
#include <sys/types.h>
#include <fcntl.h>
#include <stdio.h>

#include <string>
#include <cstdio>
#include <unistd.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/stat.h>

#include <cstdlib>
#include <cctype>
#include <sstream>
#include <utility>
#include <stdexcept>

#include <sys/socket.h> 
#include <sys/epoll.h> 
#include <netinet/in.h> 
#include <arpa/inet.h> 
#include <iostream>
#include <signal.h>

using namespace std;

#pragma pack(1)

//管道消息結構
struct pipemsg {
    int op;
    int fd;
    unsigned int ip;
    unsigned short port;
};

//地址端口結構
struct ipport {
    unsigned int ip;
    unsigned short port;
    bool operator < (const ipport rhs) const {return (ip < rhs.ip || (ip == rhs.ip && port < rhs.port));}
    bool operator == (const ipport rhs) const {return (ip == rhs.ip && port == rhs.port);}
};

//對應於對方地址端口的鏈接信息
struct peerinfo {
    int fd;                    //對應鏈接句柄
    unsigned int contime;    //最後鏈接時間
    unsigned int rcvtime;    //收到數據時間
    unsigned int rcvbyte;    //收到字節個數
    unsigned int sndtime;    //發送數據時間
    unsigned int sndbyte;    //發送字節個數
};

//鏈接結構
struct conninfo {
    int rfd;                                    //管道讀端
    int wfd;                                    //管道寫端
    map<struct ipport, struct peerinfo> peer;    //對方信息
};

#pragma pack()

//全局運行標誌
bool g_bRun;

//全局鏈接信息
struct conninfo g_ConnInfo;

void setnonblocking(int sock) 
{     
    int opts;     
    opts = fcntl(sock,F_GETFL);     
    if (opts < 0)     
    {         
        perror("fcntl(sock,GETFL)");         
        exit(1);     
    }     
    opts = opts|O_NONBLOCK;     
    if (fcntl(sock, F_SETFL, opts) < 0)     
    {         
        perror("fcntl(sock,SETFL,opts)");         
        exit(1);     
    }  
}

void setreuseaddr(int sock)
{
    int opt;
    opt = 1;    
    if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(&opt)) < 0)     
    {         
        perror("setsockopt");         
        exit(1);     
    }  
}

static void sig_pro(int signum)
{
    cout << "sig_pro, recv signal:" << signum << endl;
    
    if (signum == SIGQUIT)
    {
        g_bRun = false;
    }
}

//接收鏈接線程
void * AcceptThread(void *arg)
{
    cout << "AcceptThread, enter" << endl;
    
    int ret;        //臨時變量,存放返回值
    int epfd;        //監聽用的epoll
    int listenfd;   //監聽socket
    int connfd;        //接收到的鏈接socket臨時變量
    int i;            //臨時變量,輪詢數組用
    int nfds;        //臨時變量,有多少個socket有事件
     
    struct epoll_event ev;                     //事件臨時變量
    const int MAXEVENTS = 1024;                //最大事件數
    struct epoll_event events[MAXEVENTS];    //監聽事件數組
    socklen_t clilen;                         //聲明epoll_event結構體的變量,ev用於註冊事件,數組用於回傳要處理的事件 
    struct sockaddr_in cliaddr;     
    struct sockaddr_in svraddr;
    
    unsigned short uListenPort = 5000;
    int iBacklogSize = 5;
    int iBackStoreSize = 1024;
    
    struct pipemsg msg;                        //消息隊列數據
    
    //建立epoll,對2.6.8之後的版本,其參數無效,只要大於0的數值就行,內核本身動態分配
    epfd = epoll_create(iBackStoreSize);
    if (epfd < 0)
    {
        cout << "AcceptThread, epoll_create fail:" << epfd << ",errno:" << errno << endl;
        
        return NULL;
    }
    
    //建立監聽socket
    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    if (listenfd < 0)
    {
        cout << "AcceptThread, socket fail:" << epfd << ",errno:" << errno << endl;
        
        close(epfd);
        
        return NULL;
    }
       
    //把監聽socket設置爲非阻塞方式     
    setnonblocking(listenfd);
    //設置監聽socket爲端口重用 
    setreuseaddr(listenfd);
      
    //設置與要處理的事件相關的文件描述符     
    ev.data.fd = listenfd;     
    //設置要處理的事件類型     
    ev.events = EPOLLIN|EPOLLET;        
    //註冊epoll事件   
    ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
    if (ret != 0)
    {
        cout << "AcceptThread, epoll_ctl fail:" << ret << ",errno:" << errno << endl;
        
        close(listenfd);
        close(epfd);
        
        return NULL;
    }   
 
    bzero(&svraddr, sizeof(svraddr));     
    svraddr.sin_family = AF_INET;     
    svraddr.sin_addr.s_addr = htonl(INADDR_ANY);   
    svraddr.sin_port=htons(uListenPort);     
    bind(listenfd,(sockaddr *)&svraddr, sizeof(svraddr));
    //監聽,準備接收鏈接   
    ret = listen(listenfd, iBacklogSize);
    if (ret != 0)
    {
        cout << "AcceptThread, listen fail:" << ret << ",errno:" << errno << endl;
        
        close(listenfd);
        close(epfd);
        
        return NULL;
    }   
    
    while (g_bRun)
    {
        //等待epoll事件的發生,若是當前有信號的句柄數大於輸出事件數組的最大大小,超過部分會在下次epoll_wait時輸出,事件不會丟        
        nfds = epoll_wait(epfd, events, MAXEVENTS, 500);
        
        //處理所發生的全部事件             
        for (i = 0; i < nfds && g_bRun; ++i)         
        {
            if (events[i].data.fd == listenfd)        //是本監聽socket上的事件   
            {
                cout << "AcceptThread, events:" << events[i].events << ",errno:" << errno << endl;
                
                if (events[i].events&EPOLLIN)    //有鏈接到來
                {
                    do
                    {
                        clilen = sizeof(struct sockaddr);               
                        connfd = accept(listenfd,(sockaddr *)&cliaddr, &clilen);                
                        if (connfd > 0)
                        {
                            cout << "AcceptThread, accept:" << connfd << ",errno:" << errno << ",connect:" << inet_ntoa(cliaddr.sin_addr) << ":" << ntohs(cliaddr.sin_port) << endl;
                            
                            //往管道寫數據
                            msg.op = 1;
                            msg.fd = connfd;
                            msg.ip = cliaddr.sin_addr.s_addr;
                            msg.port = cliaddr.sin_port;
                            ret = write(g_ConnInfo.wfd, &msg, 14);
                            if (ret !=  14)
                            {
                                cout << "AcceptThread, write fail:" << ret << ",errno:" << errno << endl;
                                
                                close(connfd);
                            }
                        }
                        else
                        {
                            cout << "AcceptThread, accept:" << connfd << ",errno:" << errno << endl;

                            if (errno == EAGAIN)    //沒有鏈接須要接收了
                            {
                                break;
                            }
                            else if (errno == EINTR)    //可能被中斷信號打斷,,通過驗證對非阻塞socket並未收到此錯誤,應該能夠省掉該步判斷
                            {
                                ;
                            }
                            else    //其它狀況能夠認爲該描述字出現錯誤,應該關閉後從新監聽
                            {

                                //此時說明該描述字已經出錯了,須要從新建立和監聽
                                close(listenfd);                     
                                epoll_ctl(epfd, EPOLL_CTL_DEL, listenfd, &ev);
                                
                                //建立監聽socket
                                listenfd = socket(AF_INET, SOCK_STREAM, 0);
                                if (listenfd < 0)
                                {
                                    cout << "AcceptThread, socket fail:" << epfd << ",errno:" << errno << endl;
                                    
                                    close(epfd);
                                    
                                    return NULL;
                                }
                                
                                //把監聽socket設置爲非阻塞方式     
                                setnonblocking(listenfd);
                                //設置監聽socket爲端口重用 
                                setreuseaddr(listenfd);
                                  
                                //設置與要處理的事件相關的文件描述符     
                                ev.data.fd = listenfd;     
                                //設置要處理的事件類型     
                                ev.events = EPOLLIN|EPOLLET;        
                                //註冊epoll事件     
                                ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
                                if (ret != 0)
                                {
                                    cout << "AcceptThread, epoll_ctl fail:" << ret << ",errno:" << errno << endl;
                                    
                                    close(listenfd);
                                    close(epfd);
                                
                                    return NULL;
                                }   
                             
                                bzero(&svraddr, sizeof(svraddr));     
                                svraddr.sin_family = AF_INET;     
                                svraddr.sin_addr.s_addr = htonl(INADDR_ANY);   
                                svraddr.sin_port=htons(uListenPort);     
                                bind(listenfd,(sockaddr *)&svraddr, sizeof(svraddr));
                                //監聽,準備接收鏈接   
                                ret = listen(listenfd, iBacklogSize);
                                if (ret != 0)
                                {
                                    cout << "AcceptThread, listen fail:" << ret << ",errno:" << errno << endl;
                                    
                                    close(listenfd);
                                    close(epfd);
                                    
                                    return NULL;
                                }
                            }
                        }
                    } while (g_bRun);
                }
                else if (events[i].events&EPOLLERR || events[i].events&EPOLLHUP)    //有異常發生
                {
                    //此時說明該描述字已經出錯了,須要從新建立和監聽
                    close(listenfd);                     
                    epoll_ctl(epfd, EPOLL_CTL_DEL, listenfd, &ev);
                    
                    //建立監聽socket
                    listenfd = socket(AF_INET, SOCK_STREAM, 0);
                    if (listenfd < 0)
                    {
                        cout << "AcceptThread, socket fail:" << epfd << ",errno:" << errno << endl;
                        
                        close(epfd);
                        
                        return NULL;
                    }
                    
                    //把監聽socket設置爲非阻塞方式     
                    setnonblocking(listenfd);
                    //設置監聽socket爲端口重用 
                    setreuseaddr(listenfd);
                      
                    //設置與要處理的事件相關的文件描述符     
                    ev.data.fd = listenfd;     
                    //設置要處理的事件類型     
                    ev.events = EPOLLIN|EPOLLET;        
                    //註冊epoll事件     
                    ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
                    if (ret != 0)
                    {
                        cout << "AcceptThread, epoll_ctl fail:" << ret << ",errno:" << errno << endl;
                        
                        close(listenfd);
                        close(epfd);
                        
                        return NULL;
                    }   
                 
                    bzero(&svraddr, sizeof(svraddr));     
                    svraddr.sin_family = AF_INET;     
                    svraddr.sin_addr.s_addr = htonl(INADDR_ANY);   
                    svraddr.sin_port=htons(uListenPort);     
                    bind(listenfd,(sockaddr *)&svraddr, sizeof(svraddr));
                    //監聽,準備接收鏈接   
                    ret = listen(listenfd, iBacklogSize);
                    if (ret != 0)
                    {
                        cout << "AcceptThread, listen fail:" << ret << ",errno:" << errno << endl;
                        
                        close(listenfd);
                        close(epfd);
                        
                        return NULL;
                    }
                }
            }
        }
    }
    
    //關閉監聽描述字
    if (listenfd > 0)
    {
        close(listenfd);
    }
    //關閉建立的epoll
    if (epfd > 0)
    {
        close(epfd);
    }

    cout << "AcceptThread, exit" << endl;

    return NULL;
}

//讀數據線程
void * ReadThread(void *arg)
{
    cout << "ReadThread, enter" << endl;
    
    int ret;        //臨時變量,存放返回值
    int epfd;        //鏈接用的epoll
    int i;            //臨時變量,輪詢數組用
    int nfds;        //臨時變量,有多少個socket有事件
   
    struct epoll_event ev;                     //事件臨時變量
    const int MAXEVENTS = 1024;                //最大事件數
    struct epoll_event events[MAXEVENTS];    //監聽事件數組

    int iBackStoreSize = 1024;
    
    const int MAXBUFSIZE = 8192;                    //讀數據緩衝區大小
    char buf[MAXBUFSIZE];
    int nread;                                        //讀到的字節數
    struct ipport tIpPort;                            //地址端口信息
    struct peerinfo tPeerInfo;                        //對方鏈接信息
    map<int, struct ipport> mIpPort;                //socket對應的對方地址端口信息
    map<int, struct ipport>::iterator itIpPort;                    //臨時迭代子
    map<struct ipport, struct peerinfo>::iterator itPeerInfo;    //臨時迭代子
    
    struct pipemsg msg;                        //消息隊列數據

    //建立epoll,對2.6.8之後的版本,其參數無效,只要大於0的數值就行,內核本身動態分配
    epfd = epoll_create(iBackStoreSize);
    if (epfd < 0)
    {
        cout << "ReadThread, epoll_create fail:" << epfd << ",errno:" << errno << endl;
        
        return NULL;
    }

    while (g_bRun)
    {
        //從管道讀數據
        do
        {
            ret = read(g_ConnInfo.rfd, &msg, 14);
            if (ret > 0)
            {
                //隊列中的fd必須是有效的
                if (ret == 14 && msg.fd > 0)
                {
                    if (msg.op == 1)    //收到新的鏈接
                    {
                        cout << "ReadThread, recv connect:" << msg.fd << ",errno:" << errno << endl;
                        
                        //把socket設置爲非阻塞方式
                        setnonblocking(msg.fd);  
                        //設置描述符信息和數組下標信息
                        ev.data.fd = msg.fd;                
                        //設置用於注測的讀操做事件                 
                        ev.events = EPOLLIN|EPOLLET;               
                        //註冊ev                 
                        ret = epoll_ctl(epfd, EPOLL_CTL_ADD, msg.fd, &ev);
                        if (ret != 0)
                        {
                            cout << "ReadThread, epoll_ctl fail:" << ret << ",errno:" << errno << endl;
            
                            close(msg.fd);
                        }
                        else
                        {
                            mIpPort[msg.fd] = tIpPort;
                            
                            tPeerInfo.fd = msg.fd;
                            tPeerInfo.contime = time(NULL);
                            tPeerInfo.rcvtime = 0;
                            tPeerInfo.rcvbyte = 0;
                            tPeerInfo.sndtime = 0;
                            tPeerInfo.sndbyte = 0;
                            g_ConnInfo.peer[tIpPort] = tPeerInfo;
                        }
                    }
                    else if (msg.op == 2)    //斷開某個鏈接
                    {
                        cout << "ReadThread, recv close:" << msg.fd << ",errno:" << errno << endl;
                        
                        close(msg.fd);
                        epoll_ctl(epfd, EPOLL_CTL_DEL, msg.fd, &ev);
                        
                        itIpPort = mIpPort.find(msg.fd);
                        if (itIpPort != mIpPort.end())
                        {
                            mIpPort.erase(itIpPort);
                            
                            itPeerInfo = g_ConnInfo.peer.find(itIpPort->second);
                            if (itPeerInfo != g_ConnInfo.peer.end())
                            {
                                g_ConnInfo.peer.erase(itPeerInfo);
                            }
                        }
                    }
                }
            }
            else
            {
                break;
            }
        } while(g_bRun);
        
        //等待epoll事件的發生,若是當前有信號的句柄數大於輸出事件數組的最大大小,超過部分會在下次epoll_wait時輸出,事件不會丟        
        nfds = epoll_wait(epfd, events, MAXEVENTS, 500);
        
        //處理所發生的全部事件             
        for (i = 0; i < nfds && g_bRun; ++i)         
        {
            cout << "ReadThread, events:" << events[i].events << ",errno:" << errno << endl;
                
            if (events[i].events&EPOLLIN)   //有數據可讀          
            {        
                do
                {
                    bzero(buf, MAXBUFSIZE);
                    nread = read(events[i].data.fd, buf, MAXBUFSIZE);
                    if (nread > 0)    //讀到數據
                    {
                        cout << "ReadThread, read:" << nread << ",errno:" << errno << endl;
                        
                        itIpPort = mIpPort.find(events[i].data.fd);
                        if (itIpPort != mIpPort.end())
                        {
                            itPeerInfo = g_ConnInfo.peer.find(itIpPort->second);
                            if (itPeerInfo != g_ConnInfo.peer.end())
                            {
                                itPeerInfo->second.rcvtime = time(NULL);
                                itPeerInfo->second.rcvbyte += nread;
                            }
                        }
                    }
                    else if (nread < 0) //讀取失敗
                    {
                        if (errno == EAGAIN)    //沒有數據了
                        {
                            cout << "ReadThread, read:" << nread << ",errno:" << errno << ",no data" << endl;
                            
                            break;
                        }
                        else if(errno == EINTR)        //可能被內部中斷信號打斷,通過驗證對非阻塞socket並未收到此錯誤,應該能夠省掉該步判斷
                        {
                            cout << "ReadThread, read:" << nread << ",errno:" << errno << ",interrupt" << endl;
                        }
                        else    //客戶端主動關閉
                        {
                            cout << "ReadThread, read:" << nread << ",errno:" << errno << ",peer error" << endl;
                            
                            close(events[i].data.fd);                     
                            epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &ev);                
                            itIpPort = mIpPort.find(events[i].data.fd);
                            if (itIpPort != mIpPort.end())
                            {
                                mIpPort.erase(itIpPort);
                                
                                itPeerInfo = g_ConnInfo.peer.find(itIpPort->second);
                                if (itPeerInfo != g_ConnInfo.peer.end())
                                {
                                    g_ConnInfo.peer.erase(itPeerInfo);
                                }
                            }
                            
                            break;
                        }
                    }
                    else if (nread == 0) //客戶端主動關閉
                    {
                        cout << "ReadThread, read:" << nread << ",errno:" << errno << ",peer close" << endl;
                            
                        close(events[i].data.fd);                     
                        epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &ev);
                        itIpPort = mIpPort.find(events[i].data.fd);
                        if (itIpPort != mIpPort.end())
                        {
                            mIpPort.erase(itIpPort);
                            
                            itPeerInfo = g_ConnInfo.peer.find(itIpPort->second);
                            if (itPeerInfo != g_ConnInfo.peer.end())
                            {
                                g_ConnInfo.peer.erase(itPeerInfo);
                            }
                        }

                        break;      
                    }
                } while (g_bRun);
            }
            else if (events[i].events&EPOLLERR || events[i].events&EPOLLHUP)    //有異常發生
            {
                cout << "ReadThread, read:" << nread << ",errno:" << errno << ",err or hup" << endl;

                close(events[i].data.fd);                     
                epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &ev);
                itIpPort = mIpPort.find(events[i].data.fd);
                if (itIpPort != mIpPort.end())
                {
                    mIpPort.erase(itIpPort);
                    
                    itPeerInfo = g_ConnInfo.peer.find(itIpPort->second);
                    if (itPeerInfo != g_ConnInfo.peer.end())
                    {
                        g_ConnInfo.peer.erase(itPeerInfo);
                    }
                }
            }
        }
    }
    
    //關閉全部鏈接
    for (itIpPort = mIpPort.begin(); itIpPort != mIpPort.end(); itIpPort++)
    {
        if (itIpPort->first > 0)
        {
            close(itIpPort->first);
        }
    }
    //關閉建立的epoll
    if (epfd > 0)
    {
        close(epfd);
    }
    
    cout << "ReadThread, exit" << endl;

    return NULL;
}

int main(int argc, char* argv[])
{
    int ret;
    int fd[2];                    //讀寫管道
    pthread_t iAcceptThreadId;    //接收鏈接線程ID
    pthread_t iReadThreadId;    //讀數據線程ID
    
    //爲讓應用程序沒必要對慢速系統調用的errno作EINTR檢查,能夠採起兩種方式:1.屏蔽中斷信號,2.處理中斷信號
    //1.由signal()函數安裝的信號處理程序,系統默認會自動重啓動被中斷的系統調用,而不是讓它出錯返回,
    //  因此應用程序沒必要對慢速系統調用的errno作EINTR檢查,這就是自動重啓動機制.
    //2.對sigaction()的默認動做是不自動重啓動被中斷的系統調用,
    //  所以若是咱們在使用sigaction()時須要自動重啓動被中斷的系統調用,就須要使用sigaction的SA_RESTART選項

    //忽略信號    
    //sigset_t newmask;
    //sigemptyset(&newmask);
    //sigaddset(&newmask, SIGINT);
    //sigaddset(&newmask, SIGUSR1);
    //sigaddset(&newmask, SIGUSR2);
    //sigaddset(&newmask, SIGQUIT);
    //pthread_sigmask(SIG_BLOCK, &newmask, NULL);
    
    //處理信號
    //默認自動重啓動被中斷的系統調用,而不是讓它出錯返回,應用程序沒必要對慢速系統調用的errno作EINTR檢查
    //signal(SIGINT, sig_pro);
    //signal(SIGUSR1, sig_pro);
    //signal(SIGUSR2, sig_pro);
    //signal(SIGQUIT, sig_pro);

    struct sigaction sa;
    sa.sa_flags = SA_RESTART;
    sa.sa_handler = sig_pro;
    sigaction(SIGINT, &sa, NULL);
    sigaction(SIGUSR1, &sa, NULL);
    sigaction(SIGUSR2, &sa, NULL);
    sigaction(SIGQUIT, &sa, NULL);
    
    //設置爲運行狀態
    g_bRun = true;
    
    //建立管道
    ret = pipe(fd);
    if (ret < 0)
    {
        cout << "main, pipe fail:" << ret << ",errno:" << errno << endl;
        
        g_bRun = false;
        
        return 0;
    }
    g_ConnInfo.rfd = fd[0];
    g_ConnInfo.wfd = fd[1];
    
    //讀端設置爲非阻塞方式
    setnonblocking(g_ConnInfo.rfd);

    //建立線程時採用的參數
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);                 //設置綁定的線程,以獲取較高的響應速度
    //pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);        //設置分離的線程
    
    //建立接收鏈接線程
    ret = pthread_create(&iAcceptThreadId, &attr, AcceptThread, NULL);
    if( ret != 0)
    {
        cout << "main, pthread_create AcceptThread fail:" << ret << ",errno:" << errno << endl;
        
        g_bRun = false;
        close(g_ConnInfo.rfd);
        close(g_ConnInfo.wfd);
        
        return 0;
    }
    
    //建立接收鏈接線程
    ret = pthread_create(&iReadThreadId, &attr, ReadThread, NULL);
    if( ret != 0)
    {
        cout << "main, pthread_create ReadThread fail:" << ret << ",errno:" << errno << endl;
        
        g_bRun = false;
        pthread_join(iAcceptThreadId, NULL);
        close(g_ConnInfo.rfd);
        close(g_ConnInfo.wfd);
        
        return 0;
    }
    
    //主循環什麼事情也不作
    while (g_bRun)
    {
        sleep(1);
    }
    
    //等待子線程終止
    pthread_join(iAcceptThreadId, NULL);
    pthread_join(iReadThreadId, NULL);
    close(g_ConnInfo.rfd);
    close(g_ConnInfo.wfd);

    return 0;
}

在一個非阻塞的socket上調用read/write函數, 返回EAGAIN或者EWOULDBLOCK(注: EAGAIN就是EWOULDBLOCK)html

從字面上看, 意思是:ios

 

* EAGAIN: 再試一次面試

* EWOULDBLOCK: 若是這是一個阻塞socket, 操做將被block數組

* perror輸出: Resource temporarily unavailable服務器

 

總結:網絡

這個錯誤表示資源暫時不夠, 可能read時, 讀緩衝區沒有數據, 或者, write時,socket

寫緩衝區滿了.tcp

遇到這種狀況, 若是是阻塞socket, read/write就要阻塞掉.ide

而若是是非阻塞socket, read/write當即返回-1, 同 時errno設置爲EAGAIN.函數

因此, 對於阻塞socket, read/write返回-1表明網絡出錯了.

但對於非阻塞socket, read/write返回-1不必定網絡真的出錯了.

多是Resource temporarily unavailable. 這時你應該再試, 直到Resource available.

 

綜上, 對於non-blocking的socket, 正確的讀寫操做爲:

讀: 忽略掉errno = EAGAIN的錯誤, 下次繼續讀 

寫: 忽略掉errno = EAGAIN的錯誤, 下次繼續寫 

 

對於select和epoll的LT模式, 這種讀寫方式是沒有問題的. 但對於epoll的ET模式, 這種方式還有漏洞.

 

epoll的兩種模式 LT 和 ET

兩者的差別在於 level-trigger 模式下只要某個 socket 處於 readable/writable 狀態,不管何時

進行 epoll_wait 都會返回該 socket;而 edge-trigger 模式下只有某個 socket 從 unreadable 變爲 readable 或從

unwritable 變爲 writable 時,epoll_wait 纔會返回該 socket。以下兩個示意圖:

從socket讀數據:

 

 

 

往socket寫數據

因此, 在epoll的ET模式下, 正確的讀寫方式爲:

讀: 只要可讀, 就一直讀, 直到返回0, 或者 errno = EAGAIN

寫: 只要可寫, 就一直寫, 直到數據發送完, 或者 errno = EAGAIN

 

正確的讀:

n = 0;
while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0) {
n += nread;
}
if (nread == -1 && errno != EAGAIN) {
perror("read error");
}

正在寫:

int nwrite, data_size = strlen(buf);
n = data_size;
while (n > 0) {
nwrite = write(fd, buf + data_size - n, n);
if (nwrite < n) {
if (nwrite == -1 && errno != EAGAIN) {
perror("write error");
}
break;
}
n -= nwrite;
}

正確的accept,accept 要考慮 2 個問題

(1) 阻塞模式 accept 存在的問題

考慮這種狀況: TCP 鏈接被客戶端夭折,即在服務器調用 accept 以前,客戶端主動發送 RST 終止

鏈接,致使剛剛創建的鏈接從就緒隊列中移出,若是套接口被設置成阻塞模式,服務器就會一直阻塞

在 accept 調用上,直到其餘某個客戶創建一個新的鏈接爲止。可是在此期間,服務器單純地阻塞在

accept 調用上,就緒隊列中的其餘描述符都得不處處理.

 

解決辦法是把監聽套接口設置爲非阻塞,當客戶在服務器調用 accept 以前停止某個鏈接時,accept 調用

能夠當即返回 -1, 這時源自 Berkeley 的實現會在內核中處理該事件,並不會將該事件通知給 epool,

而其餘實現把 errno 設置爲 ECONNABORTED 或者 EPROTO 錯誤,咱們應該忽略這兩個錯誤。

 

(2) ET 模式下 accept 存在的問題

考慮這種狀況:多個鏈接同時到達,服務器的 TCP 就緒隊列瞬間積累多個就緒鏈接,因爲是邊緣觸發模式,

epoll 只會通知一次,accept 只處理一個鏈接,致使 TCP 就緒隊列中剩下的鏈接都得不處處理。

 

解決辦法是用 while 循環抱住 accept 調用,處理完 TCP 就緒隊列中的全部鏈接後再退出循環。如何知道

是否處理完就緒隊列中的全部鏈接呢? accept 返回 -1 而且 errno 設置爲 EAGAIN 就表示全部鏈接都處理完。

 

綜合以上兩種狀況,服務器應該使用非阻塞地 accept, accept 在 ET 模式下 的正確使用方式爲:

while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote,
(size_t *)&addrlen)) > 0) {
handle_client(conn_sock);
}
if (conn_sock == -1) {
if (errno != EAGAIN && errno != ECONNABORTED
&& errno != EPROTO && errno != EINTR)
perror("accept");
}

一道騰訊後臺開發的面試題

使用Linux epoll模型,水平觸發模式;當socket可寫時,會不停的觸發 socket 可寫的事件,如何處理?

 

第一種最廣泛的方式:

須要向 socket 寫數據的時候才把 socket 加入 epoll ,等待可寫事件。

接受到可寫事件後,調用 write 或者 send 發送數據。。。

當全部數據都寫完後,把 socket 移出 epoll。 

這種方式的缺點是,即便發送不多的數據,也要把 socket 加入 epoll,寫完後在移出 epoll,有必定操做代價。

一種改進的方式:

開始不把 socket 加入 epoll,須要向 socket 寫數據的時候,直接調用 write 或者 send 發送數據。

若是返回 EAGAIN,把 socket 加入 epoll,在 epoll 的驅動下寫數據,所有數據發送完畢後,再出 epoll。

這種方式的優勢是:數據很少的時候能夠避免 epoll 的事件處理,提升效率。

最後貼一個使用epoll, ET模式的簡單HTTP服務器代碼:

#include <sys/socket.h> 
#include <sys/wait.h> 
#include <netinet/in.h> 
#include <netinet/tcp.h> 
#include <sys/epoll.h> 
#include <sys/sendfile.h> 
#include <sys/stat.h> 
#include <unistd.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <string.h> 
#include <strings.h> 
#include <fcntl.h> 
#include <errno.h> 

#define MAX_EVENTS 10 
#define PORT 8080 

//設置socket鏈接爲非阻塞模式 
void setnonblocking(int sockfd) { 
int opts; 

opts = fcntl(sockfd, F_GETFL); 
if(opts < 0) { 
perror("fcntl(F_GETFL)\n"); 
exit(1); 
} 
opts = (opts | O_NONBLOCK); 
if(fcntl(sockfd, F_SETFL, opts) < 0) { 
perror("fcntl(F_SETFL)\n"); 
exit(1); 
} 
} 

int main(){ 
struct epoll_event ev, events[MAX_EVENTS]; 
int addrlen, listenfd, conn_sock, nfds, epfd, fd, i, nread, n; 
struct sockaddr_in local, remote; 
char buf[BUFSIZ]; 

//建立listen socket 
if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { 
perror("sockfd\n"); 
exit(1); 
} 
setnonblocking(listenfd); 
bzero(&local, sizeof(local)); 
local.sin_family = AF_INET; 
local.sin_addr.s_addr = htonl(INADDR_ANY);; 
local.sin_port = htons(PORT); 
if( bind(listenfd, (struct sockaddr *) &local, sizeof(local)) < 0) { 
perror("bind\n"); 
exit(1); 
} 
listen(listenfd, 20); 

epfd = epoll_create(MAX_EVENTS); 
if (epfd == -1) { 
perror("epoll_create"); 
exit(EXIT_FAILURE); 
} 

ev.events = EPOLLIN; 
ev.data.fd = listenfd; 
if (epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev) == -1) { 
perror("epoll_ctl: listen_sock"); 
exit(EXIT_FAILURE); 
} 

for (;;) { 
nfds = epoll_wait(epfd, events, MAX_EVENTS, -1); 
if (nfds == -1) { 
perror("epoll_pwait"); 
exit(EXIT_FAILURE); 
} 

for (i = 0; i < nfds; ++i) { 
fd = events[i].data.fd; 
if (fd == listenfd) { 
while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote, 
(size_t *)&addrlen)) > 0) { 
setnonblocking(conn_sock); 
ev.events = EPOLLIN | EPOLLET; 
ev.data.fd = conn_sock; 
if (epoll_ctl(epfd, EPOLL_CTL_ADD, conn_sock, 
&ev) == -1) { 
perror("epoll_ctl: add"); 
exit(EXIT_FAILURE); 
} 
} 
if (conn_sock == -1) { 
if (errno != EAGAIN && errno != ECONNABORTED 
&& errno != EPROTO && errno != EINTR) 
perror("accept"); 
} 
continue; 
} 
if (events[i].events & EPOLLIN) { 
n = 0; 
while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0) { 
n += nread; 
} 
if (nread == -1 && errno != EAGAIN) { 
perror("read error"); 
} 
ev.data.fd = fd; 
ev.events = events[i].events | EPOLLOUT; 
if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) == -1) { 
perror("epoll_ctl: mod"); 
} 
} 
if (events[i].events & EPOLLOUT) { 
sprintf(buf, "HTTP/1.1 200 OK\r\nContent-Length: %d\r\n\r\nHello World", 11); 
int nwrite, data_size = strlen(buf); 
n = data_size; 
while (n > 0) { 
nwrite = write(fd, buf + data_size - n, n); 
if (nwrite < n) { 
if (nwrite == -1 && errno != EAGAIN) { 
perror("write error"); 
} 
break; 
} 
n -= nwrite; 
} 
close(fd); 
} 
} 
} 

return 0; 
} 
View Code

轉自:http://www.cppblog.com/API/archive/2013/07/01/201424.html

相關文章
相關標籤/搜索