IO複用算法
I/O複用使得程序能同一時候監聽多個文件描寫敘述符。一般網絡程序在下列狀況下需要使用I/O複用技術:數組
client程序要同一時候處理多個socket服務器
client程序要同一時候處理用戶輸入和網絡鏈接網絡
TCPserver要同一時候處理監聽socket和鏈接socket,這是I/O複用使用最多的場合多線程
server要同一時候處理TCP請求和UDP請求。比方本章將要討論的會社server併發
server要同一時候監聽多個port。或者處理多種服務。app
I/O複用儘管能同一時候監聽多個文件描寫敘述符,但它自己是堵塞的。並且當多個文件描寫敘述符同一時候就緒時,假設不採用額外措施,程序就僅僅能按順序依次處理當中的每一個文件描寫敘述符,這使得server程序看起來像是串行工做。socket
假設要實現併發,僅僅能使用多進程或多線程等變成手段。函數
select系統調用的用途是:在一段指定時間內。監聽用戶感興趣的文件描寫敘述符上的可讀可寫和異常等事件。ui
#include <sys/select.h>
int select(int nfds, fd_set *readfds,fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
nfds參數指定被監聽的文件描寫敘述符的總數。
一般被設置爲select監聽的所有文件描寫敘述符中的最大值加1,因爲文件描寫敘述符是從0開始計數的
readfds, writefds和exceptfds參數分別指向可讀、可寫和異常等事件相應的文件描寫敘述符集合。
fd_set結構體僅包括一個整形數組。高數組的每個元素的每一位標記一個文件描寫敘述符。
可用例如如下宏來訪問fd_set結構體中的位:
voidFD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
voidFD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set*set);
timeout參數用來設置select函數的超時時間。它是一個timeval指針。timeval結構體定義例如如下:
struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};
假設給timeout傳遞NULL。則select將一直堵塞,直到某個文件描寫敘述符就緒。
select成功時返回就緒文件描寫敘述符的總數,假設在超時時間內沒有不論什麼文件描寫敘述符就緒返回0,失敗返回-1,並設置errno;假設select在等待期間收到信號,則select立刻返回-1,並設置errno爲EINTR。
poll系統調用和select相似,也是在指定時間內倫旭必定數量的文件描寫敘述符。以測試當中是否有就緒。
poll原型例如如下:
#include<poll.h>
int poll(structpollfd *fds, nfds_t nfds, int timeout);
1)fds參數是一個pollfd結構類型的數組,它指定因此咱們感興趣的文件描寫敘述符上發生的刻度、可寫和異常等時間。其結構定義例如如下:
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};
當中fd成員指定文件描寫敘述符;events成員告訴poll監聽f上的那些時間,它是一系列時間的按位或;revents成員則由內核改動,以通知應用程序fd上實際發生了哪些事件。
2)nfds參數指定被監聽事件集合的大小。其類型nfds_t定義例如如下:
typedef unsignedlong int nfds_t;
timeout參數指定poll的超時時間。單位是毫秒。當timeout爲-1時。poll調用將永遠堵塞,直到某個事件發生;當爲0時,poll調用立刻返回。
poll返回值含義與select一樣。
epoll是Linux特有的I/O複用函數。它在實現和使用上與select、poll有很是大差別。首先,epoll使用一組函數來完畢任務。而不是單個函數。
其次,epoll把用戶關心的文件描寫敘述符上的時間放在內核裏的一個時間表中。從而無需向select和poll那樣每次調用都要反覆傳入文件描寫敘述符集或事件集。但epoll需要使用一個額外的文件描寫敘述符,來惟一標識內核中的這個時間表。這個文件描寫敘述符使用例如如下epoll_create函數建立:
#include <sys/epoll.h>
int epoll_create(int size);
size參數給內核一個提示,告訴它時間表需要多大。該函數返回的文件描寫敘述符將做用其它所有epoll系統調用的第一個參數,以指定要訪問的內核事件表。
如下的函數用來操做epoll的內核事件表:
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd,struct epoll_event *event);
fd參數是要操做的文件描寫敘述符。op參數則制定操做類型,操做類型有例如如下3種:
EPOLL_CTL_ADD:往事件表中註冊fd上的事件
EPOLL_CTL_MOD:改動fd上的註冊事件
EPOLL_CTL_DEL:刪除fd上的註冊事件
event參數指定時間,它是epoll_event結構指針類型。epoll_event的定義例如如下:
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
當中events成員描寫敘述事件類型。data成員用於存儲用戶數據。其類型epoll_data的定義例如如下:
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
epoll_data_t是一個聯合體,當中4個成員中使用最多的是fd,它指定事件所叢書的目標文件描寫敘述符。
epoll_ctl成功時返回0,失敗時返回-1並設置errno。
epoll系列系統調用的主要接口是epoll_wait函數。它在一段超時時間內等待一組文件描寫敘述符上的事件,其原型例如如下:
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event*events, int maxevents, int timeout);
該函數成功時返回就緒的文件描寫敘述符的個數,失敗是返回-1,並設置errno。
maxevents參數指定最多監聽多少時間,必須大於0.
epoll_wait函數假設檢測到事件。就將所有就緒的事件從內核事件表中拷貝到它的第二個參數events指向的數組中。這個數組僅僅用於輸出epoll_wait檢測到的就緒時間,而不像select和poll數組那樣即用於傳入用戶註冊的時間,實用於輸出內核檢測到的就緒時間。這就極大的提升了應用程序索引就緒文件描寫敘述符的效率。
如下的代碼體現了這個區別:
/*怎樣索引poll返回的就緒文件描寫敘述符*/
int ret = poll(fds, MAX_EVENT_NUMBER, -1);
/*必須遍歷所有註冊文件描寫敘述符並找到當中的就緒着*/
for(int i=0;i<MAX_EVENT_NUMBER; ++i)
{
if(fds[i].revents & POLLIN)
{
int sockfd = fds[i].fd;
/*處理sockfd*/
}
}
/*怎樣索引epoll返回的就緒文件描寫敘述符*/
int ret =epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1);
/*遍歷就緒的ret個文件描寫敘述符*/
for( int i=0;i<ret; i++)
{
int socketfd = events[i].data.fd;
/*socket確定就緒。直接處理*/
}
epoll對文件描寫敘述符的操做有兩種模式:LT模式(Levek Trigger,電平觸發)和ET模式(E多個Trigger,邊沿觸發)。
LT模式是默認的工做模式,這樣的模式下epoll至關於一個效率較高的poll。
當往epoll內核事件表中註冊一個文件描寫敘述符上的EPOLLET事件時,epoll將以ET模式來操做該文件描寫敘述符。ET模式是epoll的搞笑工做模式。
對於採用LT工做模式的文件描寫敘述符。當epoll_wait檢測到其上有時間發生並將此事件通知應用程序後,應用程序可以不立刻處理該事件。這樣。當應用程序下一次調用epoll_wait時,epoll_wait還會再次嚮應用程序通告此事件,直到該事件被處理。而對於採用ET工做模式的文件描寫敘述符,當epoll_wait檢測到其上有時間發生並將此時間通知應用程序後。應用程序必須立刻處理該事件,因爲興許的epoll_wait調用將再也不向用用程序通知這一事件。可見。ET在很是大程度上減小了同一個epoll事件被反覆觸發的次數,所以效率比LT模式高。
文章最後的程序清單1比較了兩種模式:
當在clienttelnet傳輸「abcdefghijklmnopqrstuvwxyz」字符串時。輸出例如如下
ET模式輸出:
event trigger once
get 9 bytes of content: abcdefghi
get 9 bytes of content: jklmnopqr
get 9 bytes of content: stuvwxyz
get 1 bytes of content:
LT模式輸出:
event trigger once
get 9 bytes of content: abcdefghi
event trigger once
get 9 bytes of content: jklmnopqr
event trigger once
get 9 bytes of content: stuvwxyz
event trigger once
get 1 bytes of content:
可以看到正如咱們預期,ET模式下時間僅僅被觸發一次,要比LT模式下少很是多。
即便咱們使用ET模式。一個socket上的某個事件仍是可能被觸發屢次。這在併發程序中會引發一個問題。
比方一個縣城在讀取完某個socket上的數據後開始處理這些數據,二在數據的處理project中該socket上又有新數據可讀。此時另一個縣城北喚醒來讀取這些新的數據。
因而就出現了兩個線程同一時候操做一個socket的局面。這固然不是咱們指望的。
咱們指望的是一個socket鏈接在任一時刻都僅僅被一個線程處理。這一點可以使用spoll的EPOLLONESHOT事件實現。
對於註冊了EPOLLONESHOT事件的文件描寫敘述符,操做系統最多觸發其上註冊的一個可讀、可寫或者異常事件,而且僅僅觸發一次。除非咱們使用epoll_ctl函數重置該文件描寫敘述符上註冊的EPOLLONESHOT事件ain.zheyang。當一個線程在處理某個socket時。其它線程是不可能有機會操做該socket的。
但反過來思考,註冊了EPOLLONESHOT事件的socket一旦被某個線程處理完成,該線程就應該立刻重置這個socket上的EPOLLONESHOT事件,以確保這個socket下一次可讀時,其EPOLLIN事件能被觸發,進而讓其它工做線程有機會處理這個socket。
程序清單2展現了EPOLLONESHOT事件的使用。
系統調用 |
select |
poll |
epoll |
事件集合 |
用戶經過3個參數分別傳入感興趣的可讀、可寫及異常等事件。內核經過對這些參數在線改動來反饋當中的就緒事件。這使得用戶每次調用select都要重置這3個參數 |
統一處理所有事件類型,所以僅僅需要一個事件集參數。用戶經過pollfd.events傳入感興趣的事件,內核經過改動pollfd.revents反饋當中就緒的事件 |
內核經過一個時間表直接管理用戶感興趣的所有事件。所以每次調用epoll_wait時,無需重複傳入用戶感興趣的時間。 epoll_wait系統調用的參數events僅用來反饋就緒的事件。 |
應用程序索引就緒文件描寫敘述符的時間複雜度 |
O(N) |
O(N) |
O(1) |
最大支持文件描寫敘述符數 |
通常有最大值限制 |
65535 |
65535 |
工做模式 |
LT |
LT |
支持ET高效模式 |
內核實現和工做效率 |
採用輪詢方法來檢測就緒事件,算法複雜度爲O(N) |
採用輪詢方式檢測就緒事件,算法複雜度爲O(N) |
採用回調方式來檢測就緒事件。算法複雜度爲O(1) |
聊天程序見程序(poll實現)見清單3
同一時候處理TCP和UDP服務的回射server程序(epoll程序)見清單4
程序清單1: #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h> #define MAX_EVENT_NUMBER 1024 #define BUFFER_SIZE 10 int setnonblocking( int fd ) { int old_option = fcntl( fd, F_GETFL ); int new_option = old_option | O_NONBLOCK; fcntl( fd, F_SETFL, new_option ); return old_option; } void addfd( int epollfd, int fd, bool enable_et ) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN; if( enable_et ) { event.events |= EPOLLET; } epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event ); setnonblocking( fd ); } void lt( epoll_event* events, int number, int epollfd, int listenfd ) { char buf[ BUFFER_SIZE ]; for ( int i = 0; i < number; i++ ) { int sockfd = events[i].data.fd; if ( sockfd == listenfd ) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); addfd( epollfd, connfd, false ); } else if ( events[i].events & EPOLLIN ) { printf( "event trigger once\n" ); memset( buf, '\0', BUFFER_SIZE ); int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 ); if( ret <= 0 ) { close( sockfd ); continue; } printf( "get %d bytes of content: %s\n", ret, buf ); } else { printf( "something else happened \n" ); } } } void et( epoll_event* events, int number, int epollfd, int listenfd ) { char buf[ BUFFER_SIZE ]; for ( int i = 0; i < number; i++ ) { int sockfd = events[i].data.fd; if ( sockfd == listenfd ) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); addfd( epollfd, connfd, true ); } else if ( events[i].events & EPOLLIN ) { printf( "event trigger once\n" ); while( 1 ) { memset( buf, '\0', BUFFER_SIZE ); int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 ); if( ret < 0 ) { if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) ) { printf( "read later\n" ); break; } close( sockfd ); break; } else if( ret == 0 ) { close( sockfd ); } else { printf( "get %d bytes of content: %s\n", ret, buf ); } } } else { printf( "something else happened \n" ); } } } int main( int argc, char* argv[] ) { if( argc <= 2 ) { printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); int ret = 0; struct sockaddr_in address; bzero( &address, sizeof( address ) ); address.sin_family = AF_INET; inet_pton( AF_INET, ip, &address.sin_addr ); address.sin_port = htons( port ); int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( listenfd >= 0 ); ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) ); assert( ret != -1 ); ret = listen( listenfd, 5 ); assert( ret != -1 ); epoll_event events[ MAX_EVENT_NUMBER ]; int epollfd = epoll_create( 5 ); assert( epollfd != -1 ); addfd( epollfd, listenfd, true ); while( 1 ) { int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 ); if ( ret < 0 ) { printf( "epoll failure\n" ); break; } lt( events, ret, epollfd, listenfd ); //et( events, ret, epollfd, listenfd ); } close( listenfd ); return 0; }
程序清單2 #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h> #define MAX_EVENT_NUMBER 1024 #define BUFFER_SIZE 1024 struct fds { int epollfd; int sockfd; }; int setnonblocking( int fd ) { int old_option = fcntl( fd, F_GETFL ); int new_option = old_option | O_NONBLOCK; fcntl( fd, F_SETFL, new_option ); return old_option; } void addfd( int epollfd, int fd, bool oneshot ) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; if( oneshot ) { event.events |= EPOLLONESHOT; } epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event ); setnonblocking( fd ); } void reset_oneshot( int epollfd, int fd ) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET | EPOLLONESHOT; epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event ); } void* worker( void* arg ) { int sockfd = ( (fds*)arg )->sockfd; int epollfd = ( (fds*)arg )->epollfd; printf( "start new thread to receive data on fd: %d\n", sockfd ); char buf[ BUFFER_SIZE ]; memset( buf, '\0', BUFFER_SIZE ); while( 1 ) { int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 ); if( ret == 0 ) { close( sockfd ); printf( "foreiner closed the connection\n" ); break; } else if( ret < 0 ) { if( errno == EAGAIN ) { reset_oneshot( epollfd, sockfd ); printf( "read later\n" ); break; } } else { printf( "get content: %s\n", buf ); sleep( 5 ); } } printf( "end thread receiving data on fd: %d\n", sockfd ); } int main( int argc, char* argv[] ) { if( argc <= 2 ) { printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); int ret = 0; struct sockaddr_in address; bzero( &address, sizeof( address ) ); address.sin_family = AF_INET; inet_pton( AF_INET, ip, &address.sin_addr ); address.sin_port = htons( port ); int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( listenfd >= 0 ); ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) ); assert( ret != -1 ); ret = listen( listenfd, 5 ); assert( ret != -1 ); epoll_event events[ MAX_EVENT_NUMBER ]; int epollfd = epoll_create( 5 ); assert( epollfd != -1 ); addfd( epollfd, listenfd, false ); while( 1 ) { int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 ); if ( ret < 0 ) { printf( "epoll failure\n" ); break; } for ( int i = 0; i < ret; i++ ) { int sockfd = events[i].data.fd; if ( sockfd == listenfd ) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); addfd( epollfd, connfd, true ); } else if ( events[i].events & EPOLLIN ) { pthread_t thread; fds fds_for_new_worker; fds_for_new_worker.epollfd = epollfd; fds_for_new_worker.sockfd = sockfd; pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker ); } else { printf( "something else happened \n" ); } } } close( listenfd ); return 0; }
程序清單3 客戶端程序 #define _GNU_SOURCE 1 #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <poll.h> #include <fcntl.h> #define BUFFER_SIZE 64 int main( int argc, char* argv[] ) { if( argc <= 2 ) { printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); struct sockaddr_in server_address; bzero( &server_address, sizeof( server_address ) ); server_address.sin_family = AF_INET; inet_pton( AF_INET, ip, &server_address.sin_addr ); server_address.sin_port = htons( port ); int sockfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( sockfd >= 0 ); if ( connect( sockfd, ( struct sockaddr* )&server_address, sizeof( server_address ) ) < 0 ) { printf( "connection failed\n" ); close( sockfd ); return 1; } pollfd fds[2]; fds[0].fd = 0; fds[0].events = POLLIN; fds[0].revents = 0; fds[1].fd = sockfd; fds[1].events = POLLIN | POLLRDHUP; fds[1].revents = 0; char read_buf[BUFFER_SIZE]; int pipefd[2]; int ret = pipe( pipefd ); assert( ret != -1 ); while( 1 ) { ret = poll( fds, 2, -1 ); if( ret < 0 ) { printf( "poll failure\n" ); break; } if( fds[1].revents & POLLRDHUP ) { printf( "server close the connection\n" ); break; } else if( fds[1].revents & POLLIN ) { memset( read_buf, '\0', BUFFER_SIZE ); recv( fds[1].fd, read_buf, BUFFER_SIZE-1, 0 ); printf( "%s\n", read_buf ); } if( fds[0].revents & POLLIN ) { ret = splice( 0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE ); ret = splice( pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE ); } } close( sockfd ); return 0; } 服務器程序 #define _GNU_SOURCE 1 #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <poll.h> #define USER_LIMIT 5 #define BUFFER_SIZE 64 #define FD_LIMIT 65535 struct client_data { sockaddr_in address; char* write_buf; char buf[ BUFFER_SIZE ]; }; int setnonblocking( int fd ) { int old_option = fcntl( fd, F_GETFL ); int new_option = old_option | O_NONBLOCK; fcntl( fd, F_SETFL, new_option ); return old_option; } int main( int argc, char* argv[] ) { if( argc <= 2 ) { printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); int ret = 0; struct sockaddr_in address; bzero( &address, sizeof( address ) ); address.sin_family = AF_INET; inet_pton( AF_INET, ip, &address.sin_addr ); address.sin_port = htons( port ); int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( listenfd >= 0 ); ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) ); assert( ret != -1 ); ret = listen( listenfd, 5 ); assert( ret != -1 ); client_data* users = new client_data[FD_LIMIT]; pollfd fds[USER_LIMIT+1]; int user_counter = 0; for( int i = 1; i <= USER_LIMIT; ++i ) { fds[i].fd = -1; fds[i].events = 0; } fds[0].fd = listenfd; fds[0].events = POLLIN | POLLERR; fds[0].revents = 0; while( 1 ) { ret = poll( fds, user_counter+1, -1 ); if ( ret < 0 ) { printf( "poll failure\n" ); break; } for( int i = 0; i < user_counter+1; ++i ) { if( ( fds[i].fd == listenfd ) && ( fds[i].revents & POLLIN ) ) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); if ( connfd < 0 ) { printf( "errno is: %d\n", errno ); continue; } if( user_counter >= USER_LIMIT ) { const char* info = "too many users\n"; printf( "%s", info ); send( connfd, info, strlen( info ), 0 ); close( connfd ); continue; } user_counter++; users[connfd].address = client_address; setnonblocking( connfd ); fds[user_counter].fd = connfd; fds[user_counter].events = POLLIN | POLLRDHUP | POLLERR; fds[user_counter].revents = 0; printf( "comes a new user, now have %d users\n", user_counter ); } else if( fds[i].revents & POLLERR ) { printf( "get an error from %d\n", fds[i].fd ); char errors[ 100 ]; memset( errors, '\0', 100 ); socklen_t length = sizeof( errors ); if( getsockopt( fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &length ) < 0 ) { printf( "get socket option failed\n" ); } continue; } else if( fds[i].revents & POLLRDHUP ) { users[fds[i].fd] = users[fds[user_counter].fd]; close( fds[i].fd ); fds[i] = fds[user_counter]; i--; user_counter--; printf( "a client left\n" ); } else if( fds[i].revents & POLLIN ) { int connfd = fds[i].fd; memset( users[connfd].buf, '\0', BUFFER_SIZE ); ret = recv( connfd, users[connfd].buf, BUFFER_SIZE-1, 0 ); printf( "get %d bytes of client data %s from %d\n", ret, users[connfd].buf, connfd ); if( ret < 0 ) { if( errno != EAGAIN ) { close( connfd ); users[fds[i].fd] = users[fds[user_counter].fd]; fds[i] = fds[user_counter]; i--; user_counter--; } } else if( ret == 0 ) { printf( "code should not come to here\n" ); } else { for( int j = 1; j <= user_counter; ++j ) { if( fds[j].fd == connfd ) { continue; } fds[j].events |= ~POLLIN; fds[j].events |= POLLOUT; users[fds[j].fd].write_buf = users[connfd].buf; } } } else if( fds[i].revents & POLLOUT ) { int connfd = fds[i].fd; if( ! users[connfd].write_buf ) { continue; } ret = send( connfd, users[connfd].write_buf, strlen( users[connfd].write_buf ), 0 ); users[connfd].write_buf = NULL; fds[i].events |= ~POLLOUT; fds[i].events |= POLLIN; } } } delete [] users; close( listenfd ); return 0; }
程序清單4 回射服務器程序 #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h> #define MAX_EVENT_NUMBER 1024 #define TCP_BUFFER_SIZE 512 #define UDP_BUFFER_SIZE 1024 int setnonblocking( int fd ) { int old_option = fcntl( fd, F_GETFL ); int new_option = old_option | O_NONBLOCK; fcntl( fd, F_SETFL, new_option ); return old_option; } void addfd( int epollfd, int fd ) { epoll_event event; event.data.fd = fd; //event.events = EPOLLIN | EPOLLET; event.events = EPOLLIN; epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event ); setnonblocking( fd ); } int main( int argc, char* argv[] ) { if( argc <= 2 ) { printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); int ret = 0; struct sockaddr_in address; bzero( &address, sizeof( address ) ); address.sin_family = AF_INET; inet_pton( AF_INET, ip, &address.sin_addr ); address.sin_port = htons( port ); int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( listenfd >= 0 ); ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) ); assert( ret != -1 ); ret = listen( listenfd, 5 ); assert( ret != -1 ); bzero( &address, sizeof( address ) ); address.sin_family = AF_INET; inet_pton( AF_INET, ip, &address.sin_addr ); address.sin_port = htons( port ); int udpfd = socket( PF_INET, SOCK_DGRAM, 0 ); assert( udpfd >= 0 ); ret = bind( udpfd, ( struct sockaddr* )&address, sizeof( address ) ); assert( ret != -1 ); epoll_event events[ MAX_EVENT_NUMBER ]; int epollfd = epoll_create( 5 ); assert( epollfd != -1 ); addfd( epollfd, listenfd ); addfd( epollfd, udpfd ); while( 1 ) { int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 ); if ( number < 0 ) { printf( "epoll failure\n" ); break; } for ( int i = 0; i < number; i++ ) { int sockfd = events[i].data.fd; if ( sockfd == listenfd ) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); addfd( epollfd, connfd ); } else if ( sockfd == udpfd ) { char buf[ UDP_BUFFER_SIZE ]; memset( buf, '\0', UDP_BUFFER_SIZE ); struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); ret = recvfrom( udpfd, buf, UDP_BUFFER_SIZE-1, 0, ( struct sockaddr* )&client_address, &client_addrlength ); if( ret > 0 ) { sendto( udpfd, buf, UDP_BUFFER_SIZE-1, 0, ( struct sockaddr* )&client_address, client_addrlength ); } } else if ( events[i].events & EPOLLIN ) { char buf[ TCP_BUFFER_SIZE ]; while( 1 ) { memset( buf, '\0', TCP_BUFFER_SIZE ); ret = recv( sockfd, buf, TCP_BUFFER_SIZE-1, 0 ); if( ret < 0 ) { if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) ) { break; } close( sockfd ); break; } else if( ret == 0 ) { close( sockfd ); } else { send( sockfd, buf, ret, 0 ); } } } else { printf( "something else happened \n" ); } } } close( listenfd ); return 0; }