Linux高性能server編程——I/O複用



IO複用算法

I/O複用使得程序能同一時候監聽多個文件描寫敘述符。一般網絡程序在下列狀況下需要使用I/O複用技術:數組

  1. client程序要同一時候處理多個socket服務器

  2. client程序要同一時候處理用戶輸入和網絡鏈接網絡

  3. TCPserver要同一時候處理監聽socket和鏈接socket,這是I/O複用使用最多的場合多線程

  4. server要同一時候處理TCP請求和UDP請求。比方本章將要討論的會社server併發

  5. server要同一時候監聽多個port。或者處理多種服務。app

I/O複用儘管能同一時候監聽多個文件描寫敘述符,但它自己是堵塞的。並且當多個文件描寫敘述符同一時候就緒時,假設不採用額外措施,程序就僅僅能按順序依次處理當中的每一個文件描寫敘述符,這使得server程序看起來像是串行工做。socket

假設要實現併發,僅僅能使用多進程或多線程等變成手段。函數

select系統複用

select系統調用的用途是:在一段指定時間內。監聽用戶感興趣的文件描寫敘述符上的可讀可寫和異常等事件。ui

#include <sys/select.h>

int select(int nfds, fd_set *readfds,fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

  1. nfds參數指定被監聽的文件描寫敘述符的總數。

    一般被設置爲select監聽的所有文件描寫敘述符中的最大值加1,因爲文件描寫敘述符是從0開始計數的

  2. readfds, writefdsexceptfds參數分別指向可讀、可寫和異常等事件相應的文件描寫敘述符集合。

    fd_set結構體僅包括一個整形數組。高數組的每個元素的每一位標記一個文件描寫敘述符。

    可用例如如下宏來訪問fd_set結構體中的位:

    voidFD_CLR(int fd, fd_set *set);

    int  FD_ISSET(int fd, fd_set *set);

    voidFD_SET(int fd, fd_set *set);

        void FD_ZERO(fd_set*set);

  3. timeout參數用來設置select函數的超時時間。它是一個timeval指針。timeval結構體定義例如如下:

struct timeval {

              long   tv_sec;        /* seconds */

              long   tv_usec;       /* microseconds */

          };

假設給timeout傳遞NULL。則select將一直堵塞,直到某個文件描寫敘述符就緒。

select成功時返回就緒文件描寫敘述符的總數,假設在超時時間內沒有不論什麼文件描寫敘述符就緒返回0,失敗返回-1,並設置errno;假設select在等待期間收到信號,則select立刻返回-1,並設置errnoEINTR

poll系統調用

poll系統調用和select相似,也是在指定時間內倫旭必定數量的文件描寫敘述符。以測試當中是否有就緒。

poll原型例如如下:

#include<poll.h>

int poll(structpollfd *fds, nfds_t nfds, int timeout);

1)fds參數是一個pollfd結構類型的數組,它指定因此咱們感興趣的文件描寫敘述符上發生的刻度、可寫和異常等時間。其結構定義例如如下:

struct pollfd {

              int  fd;        /* file descriptor */

              short events;    /* requested events */

              short revents;   /* returned events */

          };

當中fd成員指定文件描寫敘述符;events成員告訴poll監聽f上的那些時間,它是一系列時間的按位或;revents成員則由內核改動,以通知應用程序fd上實際發生了哪些事件。

2)nfds參數指定被監聽事件集合的大小。其類型nfds_t定義例如如下:

typedef unsignedlong int nfds_t;

  1. timeout參數指定poll的超時時間。單位是毫秒。當timeout-1時。poll調用將永遠堵塞,直到某個事件發生;當爲0時,poll調用立刻返回。

    poll返回值含義與select一樣。

epoll系列系統調用

內核事件表

epollLinux特有的I/O複用函數。它在實現和使用上與selectpoll有很是大差別。首先,epoll使用一組函數來完畢任務。而不是單個函數。

其次,epoll把用戶關心的文件描寫敘述符上的時間放在內核裏的一個時間表中。從而無需向selectpoll那樣每次調用都要反覆傳入文件描寫敘述符集或事件集。但epoll需要使用一個額外的文件描寫敘述符,來惟一標識內核中的這個時間表。這個文件描寫敘述符使用例如如下epoll_create函數建立:

#include <sys/epoll.h>

int epoll_create(int size);

size參數給內核一個提示,告訴它時間表需要多大。該函數返回的文件描寫敘述符將做用其它所有epoll系統調用的第一個參數,以指定要訪問的內核事件表。

如下的函數用來操做epoll的內核事件表:

#include <sys/epoll.h>

int epoll_ctl(int epfd, int op, int fd,struct epoll_event *event);

fd參數是要操做的文件描寫敘述符。op參數則制定操做類型,操做類型有例如如下3種:

EPOLL_CTL_ADD:往事件表中註冊fd上的事件

EPOLL_CTL_MOD:改動fd上的註冊事件

EPOLL_CTL_DEL:刪除fd上的註冊事件

event參數指定時間,它是epoll_event結構指針類型。epoll_event的定義例如如下:

struct epoll_event {

              uint32_t    events;     /* Epoll events */

              epoll_data_t data;       /* User data variable */

          };

當中events成員描寫敘述事件類型。data成員用於存儲用戶數據。其類型epoll_data的定義例如如下:

typedef union epoll_data {

              void       *ptr;

              int         fd;

              uint32_t    u32;

              uint64_t    u64;

          } epoll_data_t;

epoll_data_t是一個聯合體,當中4個成員中使用最多的是fd,它指定事件所叢書的目標文件描寫敘述符。

epoll_ctl成功時返回0,失敗時返回-1並設置errno

epoll_wait函數

epoll系列系統調用的主要接口是epoll_wait函數。它在一段超時時間內等待一組文件描寫敘述符上的事件,其原型例如如下:

#include <sys/epoll.h>

int epoll_wait(int epfd, struct epoll_event*events, int maxevents, int timeout);

該函數成功時返回就緒的文件描寫敘述符的個數,失敗是返回-1,並設置errno

maxevents參數指定最多監聽多少時間,必須大於0.

epoll_wait函數假設檢測到事件。就將所有就緒的事件從內核事件表中拷貝到它的第二個參數events指向的數組中。這個數組僅僅用於輸出epoll_wait檢測到的就緒時間,而不像selectpoll數組那樣即用於傳入用戶註冊的時間,實用於輸出內核檢測到的就緒時間。這就極大的提升了應用程序索引就緒文件描寫敘述符的效率。

如下的代碼體現了這個區別:

/*怎樣索引poll返回的就緒文件描寫敘述符*/

int ret = poll(fds, MAX_EVENT_NUMBER, -1);

/*必須遍歷所有註冊文件描寫敘述符並找到當中的就緒着*/

for(int i=0;i<MAX_EVENT_NUMBER; ++i)

{

        if(fds[i].revents & POLLIN)

{

        int sockfd = fds[i].fd;

        /*處理sockfd*/

}

}

 

/*怎樣索引epoll返回的就緒文件描寫敘述符*/

int ret =epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1);

/*遍歷就緒的ret個文件描寫敘述符*/

for( int i=0;i<ret; i++)

{

        int socketfd = events[i].data.fd;

        /*socket確定就緒。直接處理*/

}

 

LTET模式

epoll對文件描寫敘述符的操做有兩種模式:LT模式(Levek Trigger,電平觸發)和ET模式(E多個Trigger,邊沿觸發)。

LT模式是默認的工做模式,這樣的模式下epoll至關於一個效率較高的poll

當往epoll內核事件表中註冊一個文件描寫敘述符上的EPOLLET事件時,epoll將以ET模式來操做該文件描寫敘述符。ET模式是epoll的搞笑工做模式。

對於採用LT工做模式的文件描寫敘述符。當epoll_wait檢測到其上有時間發生並將此事件通知應用程序後,應用程序可以不立刻處理該事件。這樣。當應用程序下一次調用epoll_wait時,epoll_wait還會再次嚮應用程序通告此事件,直到該事件被處理。而對於採用ET工做模式的文件描寫敘述符,當epoll_wait檢測到其上有時間發生並將此時間通知應用程序後。應用程序必須立刻處理該事件,因爲興許的epoll_wait調用將再也不向用用程序通知這一事件。可見。ET在很是大程度上減小了同一個epoll事件被反覆觸發的次數,所以效率比LT模式高。

文章最後的程序清單1比較了兩種模式:

當在clienttelnet傳輸「abcdefghijklmnopqrstuvwxyz」字符串時。輸出例如如下

ET模式輸出:

event trigger once

get 9 bytes of content: abcdefghi

get 9 bytes of content: jklmnopqr

get 9 bytes of content: stuvwxyz

get 1 bytes of content:

LT模式輸出:

event trigger once

get 9 bytes of content: abcdefghi

event trigger once

get 9 bytes of content: jklmnopqr

event trigger once

get 9 bytes of content: stuvwxyz

event trigger once

get 1 bytes of content:

可以看到正如咱們預期,ET模式下時間僅僅被觸發一次,要比LT模式下少很是多。

EPOLLONESHOT事件

即便咱們使用ET模式。一個socket上的某個事件仍是可能被觸發屢次。這在併發程序中會引發一個問題。

比方一個縣城在讀取完某個socket上的數據後開始處理這些數據,二在數據的處理project中該socket上又有新數據可讀。此時另一個縣城北喚醒來讀取這些新的數據。

因而就出現了兩個線程同一時候操做一個socket的局面。這固然不是咱們指望的。

咱們指望的是一個socket鏈接在任一時刻都僅僅被一個線程處理。這一點可以使用spollEPOLLONESHOT事件實現。

        對於註冊了EPOLLONESHOT事件的文件描寫敘述符,操做系統最多觸發其上註冊的一個可讀、可寫或者異常事件,而且僅僅觸發一次。除非咱們使用epoll_ctl函數重置該文件描寫敘述符上註冊的EPOLLONESHOT事件ain.zheyang。當一個線程在處理某個socket時。其它線程是不可能有機會操做該socket的。

但反過來思考,註冊了EPOLLONESHOT事件的socket一旦被某個線程處理完成,該線程就應該立刻重置這個socket上的EPOLLONESHOT事件,以確保這個socket下一次可讀時,其EPOLLIN事件能被觸發,進而讓其它工做線程有機會處理這個socket

程序清單2展現了EPOLLONESHOT事件的使用。

三組I/O複用函數的比較

系統調用

select

poll

epoll

事件集合

用戶經過3個參數分別傳入感興趣的可讀、可寫及異常等事件。內核經過對這些參數在線改動來反饋當中的就緒事件。這使得用戶每次調用select都要重置這3個參數

統一處理所有事件類型,所以僅僅需要一個事件集參數。用戶經過pollfd.events傳入感興趣的事件,內核經過改動pollfd.revents反饋當中就緒的事件

內核經過一個時間表直接管理用戶感興趣的所有事件。所以每次調用epoll_wait時,無需重複傳入用戶感興趣的時間。

epoll_wait系統調用的參數events僅用來反饋就緒的事件。

應用程序索引就緒文件描寫敘述符的時間複雜度

O(N)

O(N)

O(1)

最大支持文件描寫敘述符數

通常有最大值限制

65535

65535

工做模式

LT

LT

支持ET高效模式

內核實現和工做效率

採用輪詢方法來檢測就緒事件,算法複雜度爲O(N)

採用輪詢方式檢測就緒事件,算法複雜度爲O(N)

採用回調方式來檢測就緒事件。算法複雜度爲O(1)

 

聊天程序見程序(poll實現)見清單3

同一時候處理TCPUDP服務的回射server程序(epoll程序)見清單4



程序清單1:
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10

int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}

void addfd( int epollfd, int fd, bool enable_et )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN;
    if( enable_et )
    {
        event.events |= EPOLLET;
    }
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );
}

void lt( epoll_event* events, int number, int epollfd, int listenfd )
{
    char buf[ BUFFER_SIZE ];
    for ( int i = 0; i < number; i++ )
    {
        int sockfd = events[i].data.fd;
        if ( sockfd == listenfd )
        {
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof( client_address );
            int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
            addfd( epollfd, connfd, false );
        }
        else if ( events[i].events & EPOLLIN )
        {
            printf( "event trigger once\n" );
            memset( buf, '\0', BUFFER_SIZE );
            int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
            if( ret <= 0 )
            {
                close( sockfd );
                continue;
            }
            printf( "get %d bytes of content: %s\n", ret, buf );
        }
        else
        {
            printf( "something else happened \n" );
        }
    }
}

void et( epoll_event* events, int number, int epollfd, int listenfd )
{
    char buf[ BUFFER_SIZE ];
    for ( int i = 0; i < number; i++ )
    {
        int sockfd = events[i].data.fd;
        if ( sockfd == listenfd )
        {
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof( client_address );
            int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
            addfd( epollfd, connfd, true );
        }
        else if ( events[i].events & EPOLLIN )
        {
            printf( "event trigger once\n" );
            while( 1 )
            {
                memset( buf, '\0', BUFFER_SIZE );
                int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
                if( ret < 0 )
                {
                    if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )
                    {
                        printf( "read later\n" );
                        break;
                    }
                    close( sockfd );
                    break;
                }
                else if( ret == 0 )
                {
                    close( sockfd );
                }
                else
                {
                    printf( "get %d bytes of content: %s\n", ret, buf );
                }
            }
        }
        else
        {
            printf( "something else happened \n" );
        }
    }
}

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );

    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );

    int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );

    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );

    ret = listen( listenfd, 5 );
    assert( ret != -1 );

    epoll_event events[ MAX_EVENT_NUMBER ];
    int epollfd = epoll_create( 5 );
    assert( epollfd != -1 );
    addfd( epollfd, listenfd, true );

    while( 1 )
    {
        int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( ret < 0 )
        {
            printf( "epoll failure\n" );
            break;
        }
    
        lt( events, ret, epollfd, listenfd );
        //et( events, ret, epollfd, listenfd );
    }

    close( listenfd );
    return 0;
}
程序清單2
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 1024
struct fds
{
   int epollfd;
   int sockfd;
};

int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}

void addfd( int epollfd, int fd, bool oneshot )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    if( oneshot )
    {
        event.events |= EPOLLONESHOT;
    }
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );
}

void reset_oneshot( int epollfd, int fd )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );
}

void* worker( void* arg )
{
    int sockfd = ( (fds*)arg )->sockfd;
    int epollfd = ( (fds*)arg )->epollfd;
    printf( "start new thread to receive data on fd: %d\n", sockfd );
    char buf[ BUFFER_SIZE ];
    memset( buf, '\0', BUFFER_SIZE );
    while( 1 )
    {
        int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
        if( ret == 0 )
        {
            close( sockfd );
            printf( "foreiner closed the connection\n" );
            break;
        }
        else if( ret < 0 )
        {
            if( errno == EAGAIN )
            {
                reset_oneshot( epollfd, sockfd );
                printf( "read later\n" );
                break;
            }
        }
        else
        {
            printf( "get content: %s\n", buf );
            sleep( 5 );
        }
    }
    printf( "end thread receiving data on fd: %d\n", sockfd );
}

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );

    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );

    int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );

    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );

    ret = listen( listenfd, 5 );
    assert( ret != -1 );

    epoll_event events[ MAX_EVENT_NUMBER ];
    int epollfd = epoll_create( 5 );
    assert( epollfd != -1 );
    addfd( epollfd, listenfd, false );

    while( 1 )
    {
        int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( ret < 0 )
        {
            printf( "epoll failure\n" );
            break;
        }
    
        for ( int i = 0; i < ret; i++ )
        {
            int sockfd = events[i].data.fd;
            if ( sockfd == listenfd )
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                addfd( epollfd, connfd, true );
            }
            else if ( events[i].events & EPOLLIN )
            {
                pthread_t thread;
                fds fds_for_new_worker;
                fds_for_new_worker.epollfd = epollfd;
                fds_for_new_worker.sockfd = sockfd;
                pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );
            }
            else
            {
                printf( "something else happened \n" );
            }
        }
    }

    close( listenfd );
    return 0;
}

程序清單3
客戶端程序
#define _GNU_SOURCE 1
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <poll.h>
#include <fcntl.h>

#define BUFFER_SIZE 64

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );

    struct sockaddr_in server_address;
    bzero( &server_address, sizeof( server_address ) );
    server_address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &server_address.sin_addr );
    server_address.sin_port = htons( port );

    int sockfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( sockfd >= 0 );
    if ( connect( sockfd, ( struct sockaddr* )&server_address, sizeof( server_address ) ) < 0 )
    {
        printf( "connection failed\n" );
        close( sockfd );
        return 1;
    }

    pollfd fds[2];
    fds[0].fd = 0;
    fds[0].events = POLLIN;
    fds[0].revents = 0;
    fds[1].fd = sockfd;
    fds[1].events = POLLIN | POLLRDHUP;
    fds[1].revents = 0;
    char read_buf[BUFFER_SIZE];
    int pipefd[2];
    int ret = pipe( pipefd );
    assert( ret != -1 );

    while( 1 )
    {
        ret = poll( fds, 2, -1 );
        if( ret < 0 )
        {
            printf( "poll failure\n" );
            break;
        }

        if( fds[1].revents & POLLRDHUP )
        {
            printf( "server close the connection\n" );
            break;
        }
        else if( fds[1].revents & POLLIN )
        {
            memset( read_buf, '\0', BUFFER_SIZE );
            recv( fds[1].fd, read_buf, BUFFER_SIZE-1, 0 );
            printf( "%s\n", read_buf );
        }

        if( fds[0].revents & POLLIN )
        {
            ret = splice( 0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE );
            ret = splice( pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE );
        }
    }
    
    close( sockfd );
    return 0;
}

服務器程序
#define _GNU_SOURCE 1
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <poll.h>

#define USER_LIMIT 5
#define BUFFER_SIZE 64
#define FD_LIMIT 65535

struct client_data
{
    sockaddr_in address;
    char* write_buf;
    char buf[ BUFFER_SIZE ];
};

int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );

    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );

    int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );

    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );

    ret = listen( listenfd, 5 );
    assert( ret != -1 );

    client_data* users = new client_data[FD_LIMIT];
    pollfd fds[USER_LIMIT+1];
    int user_counter = 0;
    for( int i = 1; i <= USER_LIMIT; ++i )
    {
        fds[i].fd = -1;
        fds[i].events = 0;
    }
    fds[0].fd = listenfd;
    fds[0].events = POLLIN | POLLERR;
    fds[0].revents = 0;

    while( 1 )
    {
        ret = poll( fds, user_counter+1, -1 );
        if ( ret < 0 )
        {
            printf( "poll failure\n" );
            break;
        }
    
        for( int i = 0; i < user_counter+1; ++i )
        {
            if( ( fds[i].fd == listenfd ) && ( fds[i].revents & POLLIN ) )
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                if ( connfd < 0 )
                {
                    printf( "errno is: %d\n", errno );
                    continue;
                }
                if( user_counter >= USER_LIMIT )
                {
                    const char* info = "too many users\n";
                    printf( "%s", info );
                    send( connfd, info, strlen( info ), 0 );
                    close( connfd );
                    continue;
                }
                user_counter++;
                users[connfd].address = client_address;
                setnonblocking( connfd );
                fds[user_counter].fd = connfd;
                fds[user_counter].events = POLLIN | POLLRDHUP | POLLERR;
                fds[user_counter].revents = 0;
                printf( "comes a new user, now have %d users\n", user_counter );
            }
            else if( fds[i].revents & POLLERR )
            {
                printf( "get an error from %d\n", fds[i].fd );
                char errors[ 100 ];
                memset( errors, '\0', 100 );
                socklen_t length = sizeof( errors );
                if( getsockopt( fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &length ) < 0 )
                {
                    printf( "get socket option failed\n" );
                }
                continue;
            }
            else if( fds[i].revents & POLLRDHUP )
            {
                users[fds[i].fd] = users[fds[user_counter].fd];
                close( fds[i].fd );
                fds[i] = fds[user_counter];
                i--;
                user_counter--;
                printf( "a client left\n" );
            }
            else if( fds[i].revents & POLLIN )
            {
                int connfd = fds[i].fd;
                memset( users[connfd].buf, '\0', BUFFER_SIZE );
                ret = recv( connfd, users[connfd].buf, BUFFER_SIZE-1, 0 );
                printf( "get %d bytes of client data %s from %d\n", ret, users[connfd].buf, connfd );
                if( ret < 0 )
                {
                    if( errno != EAGAIN )
                    {
                        close( connfd );
                        users[fds[i].fd] = users[fds[user_counter].fd];
                        fds[i] = fds[user_counter];
                        i--;
                        user_counter--;
                    }
                }
                else if( ret == 0 )
                {
                    printf( "code should not come to here\n" );
                }
                else
                {
                    for( int j = 1; j <= user_counter; ++j )
                    {
                        if( fds[j].fd == connfd )
                        {
                            continue;
                        }
                        
                        fds[j].events |= ~POLLIN;
                        fds[j].events |= POLLOUT;
                        users[fds[j].fd].write_buf = users[connfd].buf;
                    }
                }
            }
            else if( fds[i].revents & POLLOUT )
            {
                int connfd = fds[i].fd;
                if( ! users[connfd].write_buf )
                {
                    continue;
                }
                ret = send( connfd, users[connfd].write_buf, strlen( users[connfd].write_buf ), 0 );
                users[connfd].write_buf = NULL;
                fds[i].events |= ~POLLOUT;
                fds[i].events |= POLLIN;
            }
        }
    }

    delete [] users;
    close( listenfd );
    return 0;
}
程序清單4 回射服務器程序
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define TCP_BUFFER_SIZE 512
#define UDP_BUFFER_SIZE 1024

int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}

void addfd( int epollfd, int fd )
{
    epoll_event event;
    event.data.fd = fd;
    //event.events = EPOLLIN | EPOLLET;
    event.events = EPOLLIN;
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );
}

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );

    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );

    int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );

    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );

    ret = listen( listenfd, 5 );
    assert( ret != -1 );

    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );
    int udpfd = socket( PF_INET, SOCK_DGRAM, 0 );
    assert( udpfd >= 0 );

    ret = bind( udpfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );

    epoll_event events[ MAX_EVENT_NUMBER ];
    int epollfd = epoll_create( 5 );
    assert( epollfd != -1 );
    addfd( epollfd, listenfd );
    addfd( epollfd, udpfd );

    while( 1 )
    {
        int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( number < 0 )
        {
            printf( "epoll failure\n" );
            break;
        }
    
        for ( int i = 0; i < number; i++ )
        {
            int sockfd = events[i].data.fd;
            if ( sockfd == listenfd )
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                addfd( epollfd, connfd );
            }
            else if ( sockfd == udpfd )
            {
                char buf[ UDP_BUFFER_SIZE ];
                memset( buf, '\0', UDP_BUFFER_SIZE );
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );

                ret = recvfrom( udpfd, buf, UDP_BUFFER_SIZE-1, 0, ( struct sockaddr* )&client_address, &client_addrlength );
                if( ret > 0 )
                {
                    sendto( udpfd, buf, UDP_BUFFER_SIZE-1, 0, ( struct sockaddr* )&client_address, client_addrlength );
                }
            }
            else if ( events[i].events & EPOLLIN )
            {
                char buf[ TCP_BUFFER_SIZE ];
                while( 1 )
                {
                    memset( buf, '\0', TCP_BUFFER_SIZE );
                    ret = recv( sockfd, buf, TCP_BUFFER_SIZE-1, 0 );
                    if( ret < 0 )
                    {
                        if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )
                        {
                            break;
                        }
                        close( sockfd );
                        break;
                    }
                    else if( ret == 0 )
                    {
                        close( sockfd );
                    }
                    else
                    {
                        send( sockfd, buf, ret, 0 );
                    }
                }
            }
            else
            {
                printf( "something else happened \n" );
            }
        }
    }

    close( listenfd );
    return 0;
}
相關文章
相關標籤/搜索