#include <deque> #include <map> #include <vector> #include <pthread.h> #include <semaphore.h> #include <time.h> #include <sys/time.h> #include <sys/shm.h> #include <errno.h> #include <sys/types.h> #include <fcntl.h> #include <stdio.h> #include <string> #include <cstdio> #include <unistd.h> #include <signal.h> #include <sys/types.h> #include <sys/stat.h> #include <cstdlib> #include <cctype> #include <sstream> #include <utility> #include <stdexcept> #include <sys/socket.h> #include <sys/epoll.h> #include <netinet/in.h> #include <arpa/inet.h> #include <iostream> #include <signal.h> using namespace std; #pragma pack(1) //管道消息結構 struct pipemsg { int op; int fd; unsigned int ip; unsigned short port; }; //地址端口結構 struct ipport { unsigned int ip; unsigned short port; bool operator < (const ipport rhs) const {return (ip < rhs.ip || (ip == rhs.ip && port < rhs.port));} bool operator == (const ipport rhs) const {return (ip == rhs.ip && port == rhs.port);} }; //對應於對方地址端口的鏈接信息 struct peerinfo { int fd; //對應鏈接句柄 unsigned int contime; //最後鏈接時間 unsigned int rcvtime; //收到數據時間 unsigned int rcvbyte; //收到字節個數 unsigned int sndtime; //發送數據時間 unsigned int sndbyte; //發送字節個數 }; //鏈接結構 struct conninfo { int rfd; //管道讀端 int wfd; //管道寫端 map<struct ipport, struct peerinfo> peer; //對方信息 }; #pragma pack() //全局運行標誌 bool g_bRun; //全局鏈接信息 struct conninfo g_ConnInfo; void setnonblocking(int sock) { int opts; opts = fcntl(sock,F_GETFL); if (opts < 0) { perror("fcntl(sock,GETFL)"); exit(1); } opts = opts|O_NONBLOCK; if (fcntl(sock, F_SETFL, opts) < 0) { perror("fcntl(sock,SETFL,opts)"); exit(1); } } void setreuseaddr(int sock) { int opt; opt = 1; if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(&opt)) < 0) { perror("setsockopt"); exit(1); } } static void sig_pro(int signum) { cout << "sig_pro, recv signal:" << signum << endl; if (signum == SIGQUIT) { g_bRun = false; } } //接收鏈接線程 void * AcceptThread(void *arg) { cout << "AcceptThread, enter" << endl; int ret; //臨時變量,存放返回值 int epfd; //監聽用的epoll int listenfd; //監聽socket int connfd; //接收到的鏈接socket臨時變量 int i; //臨時變量,輪詢數組用 int nfds; //臨時變量,有多少個socket有事件 struct epoll_event ev; //事件臨時變量 const int MAXEVENTS = 1024; //最大事件數 struct epoll_event events[MAXEVENTS]; //監聽事件數組 socklen_t clilen; //聲明epoll_event結構體的變量,ev用於註冊事件,數組用於回傳要處理的事件 struct sockaddr_in cliaddr; struct sockaddr_in svraddr; unsigned short uListenPort = 5000; int iBacklogSize = 5; int iBackStoreSize = 1024; struct pipemsg msg; //消息隊列數據 //建立epoll,對2.6.8之後的版本,其參數無效,只要大於0的數值就行,內核本身動態分配 epfd = epoll_create(iBackStoreSize); if (epfd < 0) { cout << "AcceptThread, epoll_create fail:" << epfd << ",errno:" << errno << endl; return NULL; } //建立監聽socket listenfd = socket(AF_INET, SOCK_STREAM, 0); if (listenfd < 0) { cout << "AcceptThread, socket fail:" << epfd << ",errno:" << errno << endl; close(epfd); return NULL; } //把監聽socket設置爲非阻塞方式 setnonblocking(listenfd); //設置監聽socket爲端口重用 setreuseaddr(listenfd); //設置與要處理的事件相關的文件描述符 ev.data.fd = listenfd; //設置要處理的事件類型 ev.events = EPOLLIN|EPOLLET; //註冊epoll事件 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); if (ret != 0) { cout << "AcceptThread, epoll_ctl fail:" << ret << ",errno:" << errno << endl; close(listenfd); close(epfd); return NULL; } bzero(&svraddr, sizeof(svraddr)); svraddr.sin_family = AF_INET; svraddr.sin_addr.s_addr = htonl(INADDR_ANY); svraddr.sin_port=htons(uListenPort); bind(listenfd,(sockaddr *)&svraddr, sizeof(svraddr)); //監聽,準備接收鏈接 ret = listen(listenfd, iBacklogSize); if (ret != 0) { cout << "AcceptThread, listen fail:" << ret << ",errno:" << errno << endl; close(listenfd); close(epfd); return NULL; } while (g_bRun) { //等待epoll事件的發生,若是當前有信號的句柄數大於輸出事件數組的最大大小,超過部分會在下次epoll_wait時輸出,事件不會丟 nfds = epoll_wait(epfd, events, MAXEVENTS, 500); //處理所發生的全部事件 for (i = 0; i < nfds && g_bRun; ++i) { if (events[i].data.fd == listenfd) //是本監聽socket上的事件 { cout << "AcceptThread, events:" << events[i].events << ",errno:" << errno << endl; if (events[i].events&EPOLLIN) //有鏈接到來 { do { clilen = sizeof(struct sockaddr); connfd = accept(listenfd,(sockaddr *)&cliaddr, &clilen); if (connfd > 0) { cout << "AcceptThread, accept:" << connfd << ",errno:" << errno << ",connect:" << inet_ntoa(cliaddr.sin_addr) << ":" << ntohs(cliaddr.sin_port) << endl; //往管道寫數據 msg.op = 1; msg.fd = connfd; msg.ip = cliaddr.sin_addr.s_addr; msg.port = cliaddr.sin_port; ret = write(g_ConnInfo.wfd, &msg, 14); if (ret != 14) { cout << "AcceptThread, write fail:" << ret << ",errno:" << errno << endl; close(connfd); } } else { cout << "AcceptThread, accept:" << connfd << ",errno:" << errno << endl; if (errno == EAGAIN) //沒有鏈接須要接收了 { break; } else if (errno == EINTR) //可能被中斷信號打斷,,通過驗證對非阻塞socket並未收到此錯誤,應該能夠省掉該步判斷 { ; } else //其它狀況能夠認爲該描述字出現錯誤,應該關閉後從新監聽 { //此時說明該描述字已經出錯了,須要從新建立和監聽 close(listenfd); epoll_ctl(epfd, EPOLL_CTL_DEL, listenfd, &ev); //建立監聽socket listenfd = socket(AF_INET, SOCK_STREAM, 0); if (listenfd < 0) { cout << "AcceptThread, socket fail:" << epfd << ",errno:" << errno << endl; close(epfd); return NULL; } //把監聽socket設置爲非阻塞方式 setnonblocking(listenfd); //設置監聽socket爲端口重用 setreuseaddr(listenfd); //設置與要處理的事件相關的文件描述符 ev.data.fd = listenfd; //設置要處理的事件類型 ev.events = EPOLLIN|EPOLLET; //註冊epoll事件 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); if (ret != 0) { cout << "AcceptThread, epoll_ctl fail:" << ret << ",errno:" << errno << endl; close(listenfd); close(epfd); return NULL; } bzero(&svraddr, sizeof(svraddr)); svraddr.sin_family = AF_INET; svraddr.sin_addr.s_addr = htonl(INADDR_ANY); svraddr.sin_port=htons(uListenPort); bind(listenfd,(sockaddr *)&svraddr, sizeof(svraddr)); //監聽,準備接收鏈接 ret = listen(listenfd, iBacklogSize); if (ret != 0) { cout << "AcceptThread, listen fail:" << ret << ",errno:" << errno << endl; close(listenfd); close(epfd); return NULL; } } } } while (g_bRun); } else if (events[i].events&EPOLLERR || events[i].events&EPOLLHUP) //有異常發生 { //此時說明該描述字已經出錯了,須要從新建立和監聽 close(listenfd); epoll_ctl(epfd, EPOLL_CTL_DEL, listenfd, &ev); //建立監聽socket listenfd = socket(AF_INET, SOCK_STREAM, 0); if (listenfd < 0) { cout << "AcceptThread, socket fail:" << epfd << ",errno:" << errno << endl; close(epfd); return NULL; } //把監聽socket設置爲非阻塞方式 setnonblocking(listenfd); //設置監聽socket爲端口重用 setreuseaddr(listenfd); //設置與要處理的事件相關的文件描述符 ev.data.fd = listenfd; //設置要處理的事件類型 ev.events = EPOLLIN|EPOLLET; //註冊epoll事件 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); if (ret != 0) { cout << "AcceptThread, epoll_ctl fail:" << ret << ",errno:" << errno << endl; close(listenfd); close(epfd); return NULL; } bzero(&svraddr, sizeof(svraddr)); svraddr.sin_family = AF_INET; svraddr.sin_addr.s_addr = htonl(INADDR_ANY); svraddr.sin_port=htons(uListenPort); bind(listenfd,(sockaddr *)&svraddr, sizeof(svraddr)); //監聽,準備接收鏈接 ret = listen(listenfd, iBacklogSize); if (ret != 0) { cout << "AcceptThread, listen fail:" << ret << ",errno:" << errno << endl; close(listenfd); close(epfd); return NULL; } } } } } //關閉監聽描述字 if (listenfd > 0) { close(listenfd); } //關閉建立的epoll if (epfd > 0) { close(epfd); } cout << "AcceptThread, exit" << endl; return NULL; } //讀數據線程 void * ReadThread(void *arg) { cout << "ReadThread, enter" << endl; int ret; //臨時變量,存放返回值 int epfd; //鏈接用的epoll int i; //臨時變量,輪詢數組用 int nfds; //臨時變量,有多少個socket有事件 struct epoll_event ev; //事件臨時變量 const int MAXEVENTS = 1024; //最大事件數 struct epoll_event events[MAXEVENTS]; //監聽事件數組 int iBackStoreSize = 1024; const int MAXBUFSIZE = 8192; //讀數據緩衝區大小 char buf[MAXBUFSIZE]; int nread; //讀到的字節數 struct ipport tIpPort; //地址端口信息 struct peerinfo tPeerInfo; //對方鏈接信息 map<int, struct ipport> mIpPort; //socket對應的對方地址端口信息 map<int, struct ipport>::iterator itIpPort; //臨時迭代子 map<struct ipport, struct peerinfo>::iterator itPeerInfo; //臨時迭代子 struct pipemsg msg; //消息隊列數據 //建立epoll,對2.6.8之後的版本,其參數無效,只要大於0的數值就行,內核本身動態分配 epfd = epoll_create(iBackStoreSize); if (epfd < 0) { cout << "ReadThread, epoll_create fail:" << epfd << ",errno:" << errno << endl; return NULL; } while (g_bRun) { //從管道讀數據 do { ret = read(g_ConnInfo.rfd, &msg, 14); if (ret > 0) { //隊列中的fd必須是有效的 if (ret == 14 && msg.fd > 0) { if (msg.op == 1) //收到新的鏈接 { cout << "ReadThread, recv connect:" << msg.fd << ",errno:" << errno << endl; //把socket設置爲非阻塞方式 setnonblocking(msg.fd); //設置描述符信息和數組下標信息 ev.data.fd = msg.fd; //設置用於注測的讀操做事件 ev.events = EPOLLIN|EPOLLET; //註冊ev ret = epoll_ctl(epfd, EPOLL_CTL_ADD, msg.fd, &ev); if (ret != 0) { cout << "ReadThread, epoll_ctl fail:" << ret << ",errno:" << errno << endl; close(msg.fd); } else { mIpPort[msg.fd] = tIpPort; tPeerInfo.fd = msg.fd; tPeerInfo.contime = time(NULL); tPeerInfo.rcvtime = 0; tPeerInfo.rcvbyte = 0; tPeerInfo.sndtime = 0; tPeerInfo.sndbyte = 0; g_ConnInfo.peer[tIpPort] = tPeerInfo; } } else if (msg.op == 2) //斷開某個鏈接 { cout << "ReadThread, recv close:" << msg.fd << ",errno:" << errno << endl; close(msg.fd); epoll_ctl(epfd, EPOLL_CTL_DEL, msg.fd, &ev); itIpPort = mIpPort.find(msg.fd); if (itIpPort != mIpPort.end()) { mIpPort.erase(itIpPort); itPeerInfo = g_ConnInfo.peer.find(itIpPort->second); if (itPeerInfo != g_ConnInfo.peer.end()) { g_ConnInfo.peer.erase(itPeerInfo); } } } } } else { break; } } while(g_bRun); //等待epoll事件的發生,若是當前有信號的句柄數大於輸出事件數組的最大大小,超過部分會在下次epoll_wait時輸出,事件不會丟 nfds = epoll_wait(epfd, events, MAXEVENTS, 500); //處理所發生的全部事件 for (i = 0; i < nfds && g_bRun; ++i) { cout << "ReadThread, events:" << events[i].events << ",errno:" << errno << endl; if (events[i].events&EPOLLIN) //有數據可讀 { do { bzero(buf, MAXBUFSIZE); nread = read(events[i].data.fd, buf, MAXBUFSIZE); if (nread > 0) //讀到數據 { cout << "ReadThread, read:" << nread << ",errno:" << errno << endl; itIpPort = mIpPort.find(events[i].data.fd); if (itIpPort != mIpPort.end()) { itPeerInfo = g_ConnInfo.peer.find(itIpPort->second); if (itPeerInfo != g_ConnInfo.peer.end()) { itPeerInfo->second.rcvtime = time(NULL); itPeerInfo->second.rcvbyte += nread; } } } else if (nread < 0) //讀取失敗 { if (errno == EAGAIN) //沒有數據了 { cout << "ReadThread, read:" << nread << ",errno:" << errno << ",no data" << endl; break; } else if(errno == EINTR) //可能被內部中斷信號打斷,通過驗證對非阻塞socket並未收到此錯誤,應該能夠省掉該步判斷 { cout << "ReadThread, read:" << nread << ",errno:" << errno << ",interrupt" << endl; } else //客戶端主動關閉 { cout << "ReadThread, read:" << nread << ",errno:" << errno << ",peer error" << endl; close(events[i].data.fd); epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &ev); itIpPort = mIpPort.find(events[i].data.fd); if (itIpPort != mIpPort.end()) { mIpPort.erase(itIpPort); itPeerInfo = g_ConnInfo.peer.find(itIpPort->second); if (itPeerInfo != g_ConnInfo.peer.end()) { g_ConnInfo.peer.erase(itPeerInfo); } } break; } } else if (nread == 0) //客戶端主動關閉 { cout << "ReadThread, read:" << nread << ",errno:" << errno << ",peer close" << endl; close(events[i].data.fd); epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &ev); itIpPort = mIpPort.find(events[i].data.fd); if (itIpPort != mIpPort.end()) { mIpPort.erase(itIpPort); itPeerInfo = g_ConnInfo.peer.find(itIpPort->second); if (itPeerInfo != g_ConnInfo.peer.end()) { g_ConnInfo.peer.erase(itPeerInfo); } } break; } } while (g_bRun); } else if (events[i].events&EPOLLERR || events[i].events&EPOLLHUP) //有異常發生 { cout << "ReadThread, read:" << nread << ",errno:" << errno << ",err or hup" << endl; close(events[i].data.fd); epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &ev); itIpPort = mIpPort.find(events[i].data.fd); if (itIpPort != mIpPort.end()) { mIpPort.erase(itIpPort); itPeerInfo = g_ConnInfo.peer.find(itIpPort->second); if (itPeerInfo != g_ConnInfo.peer.end()) { g_ConnInfo.peer.erase(itPeerInfo); } } } } } //關閉全部鏈接 for (itIpPort = mIpPort.begin(); itIpPort != mIpPort.end(); itIpPort++) { if (itIpPort->first > 0) { close(itIpPort->first); } } //關閉建立的epoll if (epfd > 0) { close(epfd); } cout << "ReadThread, exit" << endl; return NULL; } int main(int argc, char* argv[]) { int ret; int fd[2]; //讀寫管道 pthread_t iAcceptThreadId; //接收鏈接線程ID pthread_t iReadThreadId; //讀數據線程ID //爲讓應用程序沒必要對慢速系統調用的errno作EINTR檢查,能夠採起兩種方式:1.屏蔽中斷信號,2.處理中斷信號 //1.由signal()函數安裝的信號處理程序,系統默認會自動重啓動被中斷的系統調用,而不是讓它出錯返回, // 因此應用程序沒必要對慢速系統調用的errno作EINTR檢查,這就是自動重啓動機制. //2.對sigaction()的默認動做是不自動重啓動被中斷的系統調用, // 所以若是咱們在使用sigaction()時須要自動重啓動被中斷的系統調用,就須要使用sigaction的SA_RESTART選項 //忽略信號 //sigset_t newmask; //sigemptyset(&newmask); //sigaddset(&newmask, SIGINT); //sigaddset(&newmask, SIGUSR1); //sigaddset(&newmask, SIGUSR2); //sigaddset(&newmask, SIGQUIT); //pthread_sigmask(SIG_BLOCK, &newmask, NULL); //處理信號 //默認自動重啓動被中斷的系統調用,而不是讓它出錯返回,應用程序沒必要對慢速系統調用的errno作EINTR檢查 //signal(SIGINT, sig_pro); //signal(SIGUSR1, sig_pro); //signal(SIGUSR2, sig_pro); //signal(SIGQUIT, sig_pro); struct sigaction sa; sa.sa_flags = SA_RESTART; sa.sa_handler = sig_pro; sigaction(SIGINT, &sa, NULL); sigaction(SIGUSR1, &sa, NULL); sigaction(SIGUSR2, &sa, NULL); sigaction(SIGQUIT, &sa, NULL); //設置爲運行狀態 g_bRun = true; //建立管道 ret = pipe(fd); if (ret < 0) { cout << "main, pipe fail:" << ret << ",errno:" << errno << endl; g_bRun = false; return 0; } g_ConnInfo.rfd = fd[0]; g_ConnInfo.wfd = fd[1]; //讀端設置爲非阻塞方式 setnonblocking(g_ConnInfo.rfd); //建立線程時採用的參數 pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); //設置綁定的線程,以獲取較高的響應速度 //pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); //設置分離的線程 //建立接收鏈接線程 ret = pthread_create(&iAcceptThreadId, &attr, AcceptThread, NULL); if( ret != 0) { cout << "main, pthread_create AcceptThread fail:" << ret << ",errno:" << errno << endl; g_bRun = false; close(g_ConnInfo.rfd); close(g_ConnInfo.wfd); return 0; } //建立接收鏈接線程 ret = pthread_create(&iReadThreadId, &attr, ReadThread, NULL); if( ret != 0) { cout << "main, pthread_create ReadThread fail:" << ret << ",errno:" << errno << endl; g_bRun = false; pthread_join(iAcceptThreadId, NULL); close(g_ConnInfo.rfd); close(g_ConnInfo.wfd); return 0; } //主循環什麼事情也不作 while (g_bRun) { sleep(1); } //等待子線程終止 pthread_join(iAcceptThreadId, NULL); pthread_join(iReadThreadId, NULL); close(g_ConnInfo.rfd); close(g_ConnInfo.wfd); return 0; }
在一個非阻塞的socket上調用read/write函數, 返回EAGAIN或者EWOULDBLOCK(注: EAGAIN就是EWOULDBLOCK)html
從字面上看, 意思是:ios
* EAGAIN: 再試一次面試
* EWOULDBLOCK: 若是這是一個阻塞socket, 操做將被block數組
* perror輸出: Resource temporarily unavailable服務器
總結:網絡
這個錯誤表示資源暫時不夠, 可能read時, 讀緩衝區沒有數據, 或者, write時,socket
寫緩衝區滿了.tcp
遇到這種狀況, 若是是阻塞socket, read/write就要阻塞掉.ide
而若是是非阻塞socket, read/write當即返回-1, 同 時errno設置爲EAGAIN.函數
因此, 對於阻塞socket, read/write返回-1表明網絡出錯了.
但對於非阻塞socket, read/write返回-1不必定網絡真的出錯了.
多是Resource temporarily unavailable. 這時你應該再試, 直到Resource available.
綜上, 對於non-blocking的socket, 正確的讀寫操做爲:
讀: 忽略掉errno = EAGAIN的錯誤, 下次繼續讀
寫: 忽略掉errno = EAGAIN的錯誤, 下次繼續寫
對於select和epoll的LT模式, 這種讀寫方式是沒有問題的. 但對於epoll的ET模式, 這種方式還有漏洞.
epoll的兩種模式 LT 和 ET
兩者的差別在於 level-trigger 模式下只要某個 socket 處於 readable/writable 狀態,不管何時
進行 epoll_wait 都會返回該 socket;而 edge-trigger 模式下只有某個 socket 從 unreadable 變爲 readable 或從
unwritable 變爲 writable 時,epoll_wait 纔會返回該 socket。以下兩個示意圖:
從socket讀數據:
往socket寫數據
因此, 在epoll的ET模式下, 正確的讀寫方式爲:
讀: 只要可讀, 就一直讀, 直到返回0, 或者 errno = EAGAIN
寫: 只要可寫, 就一直寫, 直到數據發送完, 或者 errno = EAGAIN
正確的讀:
n = 0; while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0) { n += nread; } if (nread == -1 && errno != EAGAIN) { perror("read error"); }
正在寫:
int nwrite, data_size = strlen(buf); n = data_size; while (n > 0) { nwrite = write(fd, buf + data_size - n, n); if (nwrite < n) { if (nwrite == -1 && errno != EAGAIN) { perror("write error"); } break; } n -= nwrite; }
正確的accept,accept 要考慮 2 個問題
(1) 阻塞模式 accept 存在的問題
考慮這種狀況: TCP 鏈接被客戶端夭折,即在服務器調用 accept 以前,客戶端主動發送 RST 終止
鏈接,致使剛剛創建的鏈接從就緒隊列中移出,若是套接口被設置成阻塞模式,服務器就會一直阻塞
在 accept 調用上,直到其餘某個客戶創建一個新的鏈接爲止。可是在此期間,服務器單純地阻塞在
accept 調用上,就緒隊列中的其餘描述符都得不處處理.
解決辦法是把監聽套接口設置爲非阻塞,當客戶在服務器調用 accept 以前停止某個鏈接時,accept 調用
能夠當即返回 -1, 這時源自 Berkeley 的實現會在內核中處理該事件,並不會將該事件通知給 epool,
而其餘實現把 errno 設置爲 ECONNABORTED 或者 EPROTO 錯誤,咱們應該忽略這兩個錯誤。
(2) ET 模式下 accept 存在的問題
考慮這種狀況:多個鏈接同時到達,服務器的 TCP 就緒隊列瞬間積累多個就緒鏈接,因爲是邊緣觸發模式,
epoll 只會通知一次,accept 只處理一個鏈接,致使 TCP 就緒隊列中剩下的鏈接都得不處處理。
解決辦法是用 while 循環抱住 accept 調用,處理完 TCP 就緒隊列中的全部鏈接後再退出循環。如何知道
是否處理完就緒隊列中的全部鏈接呢? accept 返回 -1 而且 errno 設置爲 EAGAIN 就表示全部鏈接都處理完。
綜合以上兩種狀況,服務器應該使用非阻塞地 accept, accept 在 ET 模式下 的正確使用方式爲:
while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote, (size_t *)&addrlen)) > 0) { handle_client(conn_sock); } if (conn_sock == -1) { if (errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR) perror("accept"); }
一道騰訊後臺開發的面試題
使用Linux epoll模型,水平觸發模式;當socket可寫時,會不停的觸發 socket 可寫的事件,如何處理?
第一種最廣泛的方式:
須要向 socket 寫數據的時候才把 socket 加入 epoll ,等待可寫事件。
接受到可寫事件後,調用 write 或者 send 發送數據。。。
當全部數據都寫完後,把 socket 移出 epoll。
這種方式的缺點是,即便發送不多的數據,也要把 socket 加入 epoll,寫完後在移出 epoll,有必定操做代價。
一種改進的方式:
開始不把 socket 加入 epoll,須要向 socket 寫數據的時候,直接調用 write 或者 send 發送數據。
若是返回 EAGAIN,把 socket 加入 epoll,在 epoll 的驅動下寫數據,所有數據發送完畢後,再出 epoll。
這種方式的優勢是:數據很少的時候能夠避免 epoll 的事件處理,提升效率。
最後貼一個使用epoll, ET模式的簡單HTTP服務器代碼:
#include <sys/socket.h> #include <sys/wait.h> #include <netinet/in.h> #include <netinet/tcp.h> #include <sys/epoll.h> #include <sys/sendfile.h> #include <sys/stat.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <strings.h> #include <fcntl.h> #include <errno.h> #define MAX_EVENTS 10 #define PORT 8080 //設置socket鏈接爲非阻塞模式 void setnonblocking(int sockfd) { int opts; opts = fcntl(sockfd, F_GETFL); if(opts < 0) { perror("fcntl(F_GETFL)\n"); exit(1); } opts = (opts | O_NONBLOCK); if(fcntl(sockfd, F_SETFL, opts) < 0) { perror("fcntl(F_SETFL)\n"); exit(1); } } int main(){ struct epoll_event ev, events[MAX_EVENTS]; int addrlen, listenfd, conn_sock, nfds, epfd, fd, i, nread, n; struct sockaddr_in local, remote; char buf[BUFSIZ]; //建立listen socket if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("sockfd\n"); exit(1); } setnonblocking(listenfd); bzero(&local, sizeof(local)); local.sin_family = AF_INET; local.sin_addr.s_addr = htonl(INADDR_ANY);; local.sin_port = htons(PORT); if( bind(listenfd, (struct sockaddr *) &local, sizeof(local)) < 0) { perror("bind\n"); exit(1); } listen(listenfd, 20); epfd = epoll_create(MAX_EVENTS); if (epfd == -1) { perror("epoll_create"); exit(EXIT_FAILURE); } ev.events = EPOLLIN; ev.data.fd = listenfd; if (epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev) == -1) { perror("epoll_ctl: listen_sock"); exit(EXIT_FAILURE); } for (;;) { nfds = epoll_wait(epfd, events, MAX_EVENTS, -1); if (nfds == -1) { perror("epoll_pwait"); exit(EXIT_FAILURE); } for (i = 0; i < nfds; ++i) { fd = events[i].data.fd; if (fd == listenfd) { while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote, (size_t *)&addrlen)) > 0) { setnonblocking(conn_sock); ev.events = EPOLLIN | EPOLLET; ev.data.fd = conn_sock; if (epoll_ctl(epfd, EPOLL_CTL_ADD, conn_sock, &ev) == -1) { perror("epoll_ctl: add"); exit(EXIT_FAILURE); } } if (conn_sock == -1) { if (errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR) perror("accept"); } continue; } if (events[i].events & EPOLLIN) { n = 0; while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0) { n += nread; } if (nread == -1 && errno != EAGAIN) { perror("read error"); } ev.data.fd = fd; ev.events = events[i].events | EPOLLOUT; if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) == -1) { perror("epoll_ctl: mod"); } } if (events[i].events & EPOLLOUT) { sprintf(buf, "HTTP/1.1 200 OK\r\nContent-Length: %d\r\n\r\nHello World", 11); int nwrite, data_size = strlen(buf); n = data_size; while (n > 0) { nwrite = write(fd, buf + data_size - n, n); if (nwrite < n) { if (nwrite == -1 && errno != EAGAIN) { perror("write error"); } break; } n -= nwrite; } close(fd); } } } return 0; }
轉自:http://www.cppblog.com/API/archive/2013/07/01/201424.html