幾種服務器端IO模型的簡單介紹及實現

一些概念:linux

同步和異步ios

同步和異步是針對應用程序和內核的交互而言的,同步指的是用戶進程觸發I/O操做並等待或者輪詢的去查看I/O操做是否就緒,而異步是指用戶進程觸發I/O操做之後便開始作本身的事情,而當I/O操做已經完成的時候會獲得I/O完成的通知。編程

阻塞和非阻塞後端

阻塞和非阻塞是針對於進程在訪問數據的時候,根據I/O操做的就緒狀態來採起的不一樣方式,說白了是一種讀取或者寫入操做函數的實現方式,阻塞方式下讀取或者寫入函數將一直等待,而非阻塞方式下,讀取或者寫入函數會當即返回一個狀態值。緩存

服務器端幾種模型:服務器

一、阻塞式模型(blocking IO)網絡

咱們第一次接觸到的網絡編程都是從 listen()、accpet()、send()、recv() 等接口開始的。使用這些接口能夠很方便的構建C/S的模型。這裏大部分的 socket 接口都是阻塞型的。所謂阻塞型接口是指系統調用(通常是 IO 接口)不返回調用結果並讓當前線程一直阻塞,只有當該系統調用得到結果或者超時出錯時才返回。多線程

以下面一個簡單的Server端實現:異步

#include <Winsock2.h>
#include <cstdio>
#include <iostream>
#include <string>
using namespace std;

#pragma  comment(lib,"ws2_32.lib")

int init_win_socket()
{
    WSADATA wsaData;

    if(WSAStartup(MAKEWORD(2,2) , &wsaData ) != 0) 
    {
        return -1;
    }

    return 0;
}

#define Server_Port 10286

void handle_client(int newfd)
{
    while(1)
    {
        char buff[1024];
        memset(buff,0,1024);
        int result = recv(newfd,buff,1024,0);
        if(result <= 0)
        {
            break;
        }
        else
        {
            printf("Receive Data %s, Size: %d \n",buff,result);
            int ret = send(newfd,buff,result,0);
            if(ret>0)
            {
                printf("Send Data %s, Size: %d \n",buff,ret);
            }
            else
            {
                break;
            }
        }
    }
    closesocket(newfd);
    return;
}

int run()
{
    int listener;
    struct sockaddr_in addr_server;

    listener = socket(AF_INET, SOCK_STREAM, 0);

    //addr_server.sin_addr.S_un.S_addr = inet_addr("127.0.0.1");
    addr_server.sin_addr.S_un.S_addr = ADDR_ANY;
    addr_server.sin_family = AF_INET;
    addr_server.sin_port = htons(Server_Port);

    if(bind(listener,(const sockaddr *)&addr_server,sizeof(addr_server)) < 0)
    {
        perror("bind error");
        return -1;
    }

    if (listen(listener, 10)<0) 
    {
        perror("listen error");
        return -1;
    }

    printf("Server is listening ... \n");

    bool runing = true;
    while(runing)
    {
        sockaddr_in addr_client;
        int clientlen = sizeof(addr_client);
        int client_sock;
        if ((client_sock = accept(listener, (struct sockaddr *) &addr_client, &clientlen)) < 0) 
        {
            printf("Failed to accept client connection \n");
        }
        fprintf(stdout, "Client connected: %s \n", inet_ntoa(addr_client.sin_addr));
        /*Handle this connect */
        handle_client(client_sock);
    }

    closesocket(listener);
    return 0;
}

int main(int c, char **v)
{

#ifdef WIN32
    init_win_socket();
#endif

    run();

    getchar();
    return 0;
}
View Code

示意圖以下:socket

這裏的socket的接口是阻塞的(blocking),在線程被阻塞期間,線程將沒法執行任何運算或響應任何的網絡請求,這給多客戶機、多業務邏輯的網絡編程帶來了挑戰。

二、多線程的服務器模型(Multi-Thread)

應對多客戶機的網絡應用,最簡單的解決方式是在服務器端使用多線程(或多進程)。多線程(或多進程)的目的是讓每一個鏈接都擁有獨立的線程(或進程),這樣任何一個鏈接的阻塞都不會影響其餘的鏈接。

多線程Server端的實現:

#include <Winsock2.h>
#include <cstdio>
#include <iostream>
#include <string>
using namespace std;

#pragma  comment(lib,"ws2_32.lib")

int init_win_socket()
{
    WSADATA wsaData;

    if(WSAStartup(MAKEWORD(2,2) , &wsaData ) != 0) 
    {
        return -1;
    }

    return 0;
}

#define Server_Port 10286

DWORD WINAPI handle_client(LPVOID lppara)
{
    int *newfd = (int *)lppara;
    while(1)
    {
        char buff[1024];
        memset(buff,0,1024);
        int result = recv(*newfd,buff,1024,0);
        if(result <= 0)
        {
            break;
        }
        else
        {
            printf("Receive Data %s, Size: %d \n",buff,result);
            int ret = send(*newfd,buff,result,0);
            if(ret>0)
            {
                printf("Send Data %s, Size: %d \n",buff,ret);
            }
            else
            {
                break;
            }
        }
        Sleep(10);
    }
    closesocket(*newfd);
    return 0;
}

int run()
{
    int listener;
    struct sockaddr_in addr_server;
    int sock_clients[1024]; //max number for accept client connection;

    listener = socket(AF_INET, SOCK_STREAM, 0);

    //addr_server.sin_addr.S_un.S_addr = inet_addr("127.0.0.1");
    addr_server.sin_addr.S_un.S_addr = ADDR_ANY;
    addr_server.sin_family = AF_INET;
    addr_server.sin_port = htons(Server_Port);

    if(bind(listener,(const sockaddr *)&addr_server,sizeof(addr_server)) < 0)
    {
        perror("bind error");
        return -1;
    }

    if (listen(listener, 10)<0) 
    {
        perror("listen error");
        return -1;
    }

    printf("Server is listening ... \n");

    int fd_count = 0;
    bool runing = true;
    while(runing)
    {
        sockaddr_in addr_client;
        int clientlen = sizeof(addr_client);
        int client_sock;
        if ((client_sock = accept(listener, (struct sockaddr *) &addr_client, &clientlen)) < 0) 
        {
            printf("Failed to accept client connection \n");
        }
        fprintf(stdout, "Client connected: socket fd %d , %s \n", client_sock,inet_ntoa(addr_client.sin_addr));
        /*Handle this connect */
        if(fd_count<1024)
        {
            sock_clients[fd_count] = client_sock;
            if(CreateThread(NULL,0,handle_client,&sock_clients[fd_count],0,NULL)==NULL)
                return -1;
            ++ fd_count;
        }
        
        Sleep(10);
    }

    closesocket(listener);
    return 0;
}

int main(int c, char **v)
{

#ifdef WIN32
    init_win_socket();
#endif

    run();

    getchar();
    return 0;
}
View Code

上述多線程的服務器模型能夠解決一些鏈接量不大的多客戶端鏈接請求,可是若是要同時響應成千上萬路的鏈接請求,則不管多線程仍是多進程都會嚴重佔據系統資源,下降系統對外界響應效率。

在多線程的基礎上,能夠考慮使用「線程池」或「鏈接池」,「線程池」旨在減小建立和銷燬線程的頻率,其維持必定合理數量的線程,並讓空閒的線程從新承擔新的執行任務。「鏈接池」維持鏈接的緩存池,儘可能重用已有的鏈接、減小建立和關閉鏈接的頻率。這兩種技術均可以很好的下降系統開銷,都被普遍應用不少大型系統。

三、非阻塞式模型(Non-blocking IO)

非阻塞的接口相比於阻塞型接口的顯著差別在於,在被調用以後當即返回。

非阻塞型IO的示意圖以下:

從應用程序的角度來講,blocking read 調用會延續很長時間。在內核執行讀操做和其餘工做時,應用程序會被阻塞。

非阻塞的IO可能並不會當即知足,須要應用程序調用許屢次來等待操做完成。這可能效率不高,由於在不少狀況下,當內核執行這個命令時,應用程序必需要進行忙碌等待,直到數據可用爲止。

另外一個問題,在循環調用非阻塞IO的時候,將大幅度佔用CPU,因此通常使用select等來檢測」是否能夠操做「。

四、多路複用IO

支持I/O複用的系統調用有select、poll、epoll、kqueue等,

這裏以Select函數爲例,select函數用於探測多個文件句柄的狀態變化,如下爲一個使用了使用了Select函數的Server實現:

#include <Winsock2.h>
#include <cstdio>
#include <cstdlib>
#include <cassert>
#include <iostream>
#include <string>
using namespace std;

#pragma  comment(lib,"ws2_32.lib")

int init_win_socket()
{
    WSADATA wsaData;

    if(WSAStartup(MAKEWORD(2,2) , &wsaData ) != 0) 
    {
        return -1;
    }

    return 0;
}

#define Server_Port 10286
#define MAX_LINE 16384

#define  FD_SETSIZE 1024

struct fd_state 
{
    char buffer[MAX_LINE];
    size_t buffer_used;

    int writing;
    size_t n_written;
    size_t write_upto;
};

struct fd_state * alloc_fd_state(void)
{
    struct fd_state *state = (struct fd_state *)malloc(sizeof(struct fd_state));
    if (!state)
        return NULL;
    state->buffer_used = state->n_written = state->writing =
        state->write_upto = 0;
    memset(state->buffer,0,MAX_LINE);
    return state;
}

void free_fd_state(struct fd_state *state)
{
    free(state);
}

int set_socket_nonblocking(int fd)
{
    unsigned long mode = 1;
    int result = ioctlsocket(fd, FIONBIO, &mode);
    if (result != 0)
    {
        return -1;
        printf("ioctlsocket failed with error: %ld\n", result);
    }
    return 0;
}

int do_read(int fd, struct fd_state *state)
{
    char buf[1024];
    int i;
    int result;
    while (1) 
    {
        memset(buf,0,1024);
        result = recv(fd, buf, sizeof(buf), 0);
        if (result <= 0)
            break;

        for (i=0; i < result; ++i)  
        {
            if (state->buffer_used < sizeof(state->buffer))
                state->buffer[state->buffer_used++] = buf[i];
        }
    }
    state->writing = 1;
    state->write_upto = state->buffer_used;
    printf("Receive data: %s  size: %d\n",state->buffer+state->n_written,state->write_upto-state->n_written);

    if (result == 0) 
    {
        return 1;
    } 
    else if (result < 0) 
    {
#ifdef WIN32
        if (result == -1 && WSAGetLastError()==WSAEWOULDBLOCK)
            return 0;
#else
        if (errno == EAGAIN)
            return 0;
#endif
        return -1;
    }

    return 0;
}

int do_write(int fd, struct fd_state *state)
{
    while (state->n_written < state->write_upto) 
    {
        int result = send(fd, state->buffer + state->n_written,
            state->write_upto - state->n_written, 0);
        if (result < 0) 
        {
#ifdef WIN32
            if (result == -1 && WSAGetLastError()==WSAEWOULDBLOCK)
                return 0;
#else
            if (errno == EAGAIN)
                return 0;
#endif
            return -1;
        }
        assert(result != 0);
        printf("Send data: %s \n",state->buffer+ state->n_written);
        state->n_written += result;
    }

    if (state->n_written == state->buffer_used)
        state->n_written = state->write_upto = state->buffer_used = 0;

    state->writing = 0;

    return 0;
}

void run()
{
    int listener;
    struct fd_state *state[FD_SETSIZE];
    struct sockaddr_in sin;
    int i, maxfd;
    fd_set readset, writeset, exset;

    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = 0;
    sin.sin_port = htons(Server_Port);

    for (i = 0; i < FD_SETSIZE; ++i)
        state[i] = NULL;

    listener = socket(AF_INET, SOCK_STREAM, 0);
    set_socket_nonblocking(listener);

    int one = 1;
    setsockopt(listener, SOL_SOCKET, SO_REUSEADDR,(const char *)&one, sizeof(one));

    if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) 
    {
        perror("bind");
        return;
    }

    if (listen(listener, 16)<0) 
    {
        perror("listen");
        return;
    }

    printf("Server is listening ... \n");

    FD_ZERO(&readset);
    FD_ZERO(&writeset);
    FD_ZERO(&exset);

    while (1) 
    {
        maxfd = listener;

        FD_ZERO(&readset);
        FD_ZERO(&writeset);
        FD_ZERO(&exset);

        FD_SET(listener, &readset);

        for (i=0; i < FD_SETSIZE; ++i) 
        {
            if (state[i]) 
            {
                if (i > maxfd)
                    maxfd = i;
                FD_SET(i, &readset);
                if (state[i]->writing) 
                {
                    FD_SET(i, &writeset);
                }
            }
        }

        if (select(maxfd+1, &readset, &writeset, &exset, NULL) < 0) 
        {
            perror("select");
            return;
        }

        //check if listener can accept
        if (FD_ISSET(listener, &readset)) 
        {
            struct sockaddr_in ss;
            int slen = sizeof(ss);
            int fd = accept(listener, (struct sockaddr*)&ss, &slen);
            if (fd < 0) 
            {
                perror("accept");
            } 
            else if(fd > FD_SETSIZE)
            {
                closesocket(fd);
            }
            else 
            {
                printf("Accept socket %d, address %s \n",fd,inet_ntoa(ss.sin_addr));
                set_socket_nonblocking(fd);
                state[fd] = alloc_fd_state();
                assert(state[fd]);
            }
        }

        //process read and write socket
        for (i=0; i < maxfd+1; ++i) 
        {
            int r = 0;
            if (i == listener)
                continue;

            if (FD_ISSET(i, &readset)) 
            {
                r = do_read(i, state[i]);
            }
            if (r == 0 && FD_ISSET(i, &writeset)) 
            {
                r = do_write(i, state[i]);
            }
            if (r) 
            {
                free_fd_state(state[i]);
                state[i] = NULL;
                closesocket(i);
            }
        }
    }
}

int main(int c, char **v)
{
#ifdef WIN32
    init_win_socket();
#endif

    run();
    return 0;
}
View Code

示意圖以下:

這裏Select監聽的socket都是Non-blocking的,因此在do_read() do_write()中對返回爲EAGAIN/WSAEWOULDBLOCK都作了處理。

從代碼中能夠看出使用Select返回後,仍然須要輪訓再檢測每一個socket的狀態(讀、寫),這樣的輪訓檢測在大量鏈接下也是效率不高的。由於當須要探測的句柄值較大時,select () 接口自己須要消耗大量時間去輪詢各個句柄。

不少操做系統提供了更爲高效的接口,如 linux 提供 了 epoll,BSD 提供了 kqueue,Solaris 提供了 /dev/poll …。若是須要實現更高效的服務器程序,相似 epoll 這樣的接口更被推薦。遺憾的是不一樣的操做系統特供的 epoll 接口有很大差別,因此使用相似於 epoll 的接口實現具備較好跨平臺能力的服務器會比較困難。

五、使用事件驅動庫libevent的服務器模型

Libevent 是一種高性能事件循環/事件驅動庫。

爲了實際處理每一個請求,libevent 庫提供一種事件機制,它做爲底層網絡後端的包裝器。事件系統讓爲鏈接添加處理函數變得很是簡便,同時下降了底層IO複雜性。這是 libevent 系統的核心。

建立 libevent 服務器的基本方法是,註冊當發生某一操做(好比接受來自客戶端的鏈接)時應該執行的函數,而後調用主事件循環 event_dispatch()。執行過程的控制如今由 libevent 系統處理。註冊事件和將調用的函數以後,事件系統開始自治;在應用程序運行時,能夠在事件隊列中添加(註冊)或 刪除(取消註冊)事件。事件註冊很是方便,能夠經過它添加新事件以處理新打開的鏈接,從而構建靈活的網絡處理系統。

使用Libevent實現的一個回顯服務器以下:

#include <event2/event.h>

#include <assert.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>

#define MAX_LINE 16384

void do_read(evutil_socket_t fd, short events, void *arg);
void do_write(evutil_socket_t fd, short events, void *arg);

struct fd_state 
{
    char buffer[MAX_LINE];
    size_t buffer_used;

    size_t n_written;
    size_t write_upto;

    struct event *read_event;
    struct event *write_event;
};

struct fd_state * alloc_fd_state(struct event_base *base, evutil_socket_t fd)
{
    struct fd_state *state = (struct fd_state *)malloc(sizeof(struct fd_state));
    if (!state)
    {
        return NULL;
    }

    state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state);
    if (!state->read_event) 
    {
        free(state);
        return NULL;
    }

    state->write_event = event_new(base, fd, EV_WRITE, do_write, state);
    if (!state->write_event) 
    {
        event_free(state->read_event);
        free(state);
        return NULL;
    }

    memset(state->buffer,0,MAX_LINE);
    state->buffer_used = state->n_written = state->write_upto = 0;

    return state;
}

void free_fd_state(struct fd_state *state)
{
    event_free(state->read_event);
    event_free(state->write_event);
    free(state);
}

void do_read(evutil_socket_t fd, short events, void *arg)
{
    struct fd_state *state = (struct fd_state *) arg;
    char buf[1024];
    int i;
    int result;
    assert(state->write_event);
    while(1)
    {
        memset(buf,0,1024);
        result = recv(fd, buf, sizeof(buf), 0);
        if (result <= 0)
        {
            break;
        }
        else
        {
            for (i=0; i < result; ++i)  
            {
                if (state->buffer_used < sizeof(state->buffer))
                    state->buffer[state->buffer_used++] = buf[i];
            }
        }
    }
    printf("receive data: %s  size: %d\n",state->buffer+state->n_written,state->write_upto-state->n_written);
    assert(state->write_event);
    event_add(state->write_event, NULL);
    state->write_upto = state->buffer_used;
    
    if (result == 0) 
    {
        printf("connect closed \n");
        free_fd_state(state);
    } 
    else if (result < 0) 
    {
#ifdef WIN32
        if (result == -1 && WSAGetLastError()==WSAEWOULDBLOCK)
            return;
#else
        if (errno == EAGAIN)
            return;
#endif
        perror("recv");
        free_fd_state(state);
    }
}

void do_write(evutil_socket_t fd, short events, void *arg)
{
    struct fd_state *state = (struct fd_state *)arg;

    while (state->n_written < state->write_upto) 
    {
        int result = send(fd, state->buffer + state->n_written,
            state->write_upto - state->n_written, 0);
        if (result < 0) 
        {
#ifdef WIN32
            if (result == -1 && WSAGetLastError()==WSAEWOULDBLOCK)
                return;
#else
            if (errno == EAGAIN)
                return;
#endif
            free_fd_state(state);
            return;
        }
        assert(result != 0);
        printf("send data: %s \n",state->buffer+ state->n_written);

        state->n_written += result;
    }

    //buffer is full
    if (state->n_written == state->buffer_used)
    {
        state->n_written = state->write_upto = state->buffer_used = 0;
        memset(state->buffer,0,MAX_LINE);
    }
}

void do_accept(evutil_socket_t listener, short event, void *arg)
{
    struct event_base *base = (struct event_base *)arg;
    struct sockaddr_in ss;
    int slen = sizeof(ss);
    int fd = accept(listener, (struct sockaddr*)&ss, &slen);
    if (fd > 0) 
    {
        printf("accept socket %d, address %s \n",fd,inet_ntoa(ss.sin_addr));
        struct fd_state *state;
        evutil_make_socket_nonblocking(fd);
        state = alloc_fd_state(base, fd);
        assert(state);
        assert(state->read_event);
        event_add(state->read_event, NULL);
    }
}

void run()
{
    int listener;
    struct sockaddr_in addr_server;
    struct event_base *base;
    struct event *listener_event;

    base = event_base_new();
    if (!base)
    {
        perror("event_base_new error");
        return; 
    }

    addr_server.sin_addr.S_un.S_addr = ADDR_ANY;
    addr_server.sin_family = AF_INET;
    addr_server.sin_addr.s_addr = 0;
    addr_server.sin_port = htons(10286);

    listener = socket(AF_INET, SOCK_STREAM, 0);
    evutil_make_socket_nonblocking(listener);

    int one = 1;
    setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, (const char *)&one, sizeof(one));

    if (bind(listener, (struct sockaddr*)&addr_server, sizeof(addr_server)) < 0) 
    {
        perror("bind error");
        return;
    }

    if (listen(listener, 10)<0) 
    {
        perror("listen error");
        return;
    }

    printf("server is listening ... \n");

    listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
    event_add(listener_event, NULL);
    event_base_dispatch(base);
}

int init_win_socket()
{
    WSADATA wsaData;

    if(WSAStartup(MAKEWORD(2,2) , &wsaData ) != 0) 
    {
        return -1;
    }

    return 0;
}

int main(int c, char **v)
{

#ifdef WIN32
    init_win_socket();
#endif

    run();

    getchar();
    return 0;
}
View Code

六、信號驅動IO模型(Signal-driven IO)

使用信號,讓內核在描述符就緒時發送SIGIO信號通知應用程序,稱這種模型爲信號驅動式I/O(signal-driven I/O)。

圖示以下:

首先開啓套接字的信號驅動式I/O功能,並經過sigaction系統調用安裝一個信號處理函數。該系統調用將當即返回,咱們的進程繼續工做,也就是說進程沒有被阻塞。當數據報準備好讀取時,內核就爲該進程產生一個SIGIO信號。隨後就能夠在信號處理函數中調用recvfrom讀取數據報,並通知主循環數據已經準備好待處理,也能夠當即通知主循環,讓它讀取數據報。

不管如何處理SIGIO信號,這種模型的優點在於等待數據報到達期間進程不被阻塞。主循環能夠繼續執行 ,只要等到來自信號處理函數的通知:既能夠是數據已準備好被處理,也能夠是數據報已準備好被讀取。

七、異步IO模型(asynchronous IO)

異步I/O(asynchronous I/O)由POSIX規範定義。演變成當前POSIX規範的各類早起標準所定義的實時函數中存在的差別已經取得一致。通常地說,這些函數的工做機制是:告知內核啓動某個操做,並讓內核在整個操做(包括將數據從內核複製到咱們本身的緩衝區)完成後通知咱們。這種模型與前一節介紹的信號驅動模型的主要區別在於:信號驅動式I/O是由內核通知咱們什麼時候能夠啓動一個I/O操做,而異步I/O模型是由內核通知咱們I/O操做什麼時候完成。

示意圖以下:

咱們調用aio_read函數(POSIX異步I/O函數以aio_或lio_開頭),給內核傳遞描述符、緩衝區指針、緩衝區大小(與read相同的三個參數)和文件偏移(與lseek相似),並告訴內核當整個操做完成時如何通知咱們。該系統調用當即返回,而且在等待I/O完成期間,咱們的進程不被阻塞。本例子中咱們假設要求內核在操做完成時產生某個信號,該信號直到數據已複製到應用進程緩衝區才產生,這一點不一樣於信號驅動I/O模型。

 

參考:

《UNIX網絡編程》

使用 libevent 和 libev 提升網絡應用性能:http://www.ibm.com/developerworks/cn/aix/library/au-libev/

使用異步 I/O 大大提升應用程序的性能:https://www.ibm.com/developerworks/cn/linux/l-async/

相關文章
相關標籤/搜索