多路複用構建高性能服務器

經過多路複用構建高性能服務器是一種常見的模型,單個I/O多路複用線程+一組工做線程,I/O線程負責協調分配任務,而實際工做交給工做線程處理。這種模型的好處在於高效併發和充分利用多線程的處理能力。ios

 

以memcached的構架圖爲例服務器

memcached的主線程用epoll監聽到EPOLLIN事件,而且觸發該事件的fd是服務器listen的fd,就accept該鏈接請求,返回的fd主線程並不處理,而是經過CQ隊列發送給工做線程去處理,工做線程又維護了一個epoll多路複用隊列,子線程的epoll輪詢和響應請求。 多線程

 

  這種架構包含兩大塊:架構

1.多路複用併發

 #include<iostream>

 #include<stdlib.h>socket

#include<sys/epoll.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<sys/types.h>
#include<fcntl.h>

using namespace std;
const int PORT = 8888;
const int MAX_CLIENT_NUM = 10000;
const int MAX_LEN = 2000;

bool setfdnoblock(int fd)
{
    int flg = fcntl(fd, F_GETFL);
    if(flg < 0)
    {
        cout << "get fd flag failed" << endl;
        return false;
    }
    if(fcntl(fd, F_SETFL, O_NONBLOCK | flg) < 0)
    {
        return false;
    }
    return true;
}

int CreateTcpServer(int port, int listennum)
{
    int fd;
    fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

    sockaddr_in TcpServer;
    bzero(&TcpServer, sizeof(TcpServer));
    TcpServer.sin_family = AF_INET;
    TcpServer.sin_port = htons(8888);
    TcpServer.sin_addr.s_addr = htonl(INADDR_ANY);

    int iRet = bind(fd, (struct sockaddr*)&TcpServer, sizeof(TcpServer));
    if(-1 == iRet)
    {
    cout << "server bind error!" << endl;
    return -1;
    }
    if(listen(fd, listennum) == -1)
    {
        cout << "server listen error" << endl;
        return -1;
    }
    return fd;
}

int main()
{
    int Serverfd = CreateTcpServer(PORT, MAX_CLIENT_NUM);
    if(Serverfd == -1)
    {
        cout << "server create failed" << endl;
    }
    else
    {
       cout << "serverfd is :" << Serverfd << endl;
    }

    int Epollfd = epoll_create(MAX_CLIENT_NUM);
    if(Epollfd == -1)
    {
        cout << "epoll_create failed" << endl;
    }
    epoll_event ev, events[MAX_CLIENT_NUM];
    int nfds = 0;
    int client = 0;
    char buff[MAX_LEN];
    sockaddr_in CliAddr;
    unsigned int iCliSize = sizeof(CliAddr);
    ev.events = EPOLLIN|EPOLLOUT;
    ev.data.fd = Serverfd;
    if(!setfdnoblock(Serverfd))
    {
        cout << "set serverfd no_block failed" << endl;
    }
    if(epoll_ctl(Epollfd, EPOLL_CTL_ADD, Serverfd, &ev))
    {
        cout << "epoll add serverfd error" << endl;
    }
    while(1)
    {
        nfds = epoll_wait(Epollfd, events, MAX_CLIENT_NUM, 100000);
        if(nfds == -1)
        {
            cout << "error occur, exit" << endl;
            return -1;
        }
        else if( nfds == 0)
        {
            cout << "epoll_wait return zero" << endl;
        }
        else
        {
            for(int i = 0; i < nfds; i++)
            {
                cout << "events[i].data.fd is :" << events[i].data.fd << endl;
                if(events[i].data.fd == Serverfd)
                {
                    cout << " Serverfd received event" << endl;
                    client = accept(Serverfd, (struct sockaddr*)&CliAddr, &iCliSize);
                    if(client == -1)
                    {
                        cout << "accept error" << endl;
                        return -1;
                    }
                    ev.data.fd = client;
                    if(!setfdnoblock(client))
                    {
                        cout << "set client fd no_block error" << endl;
                    }
                    if(epoll_ctl(Epollfd, EPOLL_CTL_ADD, client, &ev))
                    {
                        cout << "epoll add client error" << endl;
                    }
                    else
                    {
                        cout << "success add client" << endl;
                    }
                }
                else if(events[i].events&EPOLLIN)
                {
                    cout << "recv client msg" << endl;
                    if(events[i].data.fd < 0)
                    {
                        cout << " event[i].data.fd is smaller than zero" << endl;
                        continue;
                    }
                    if(read(events[i].data.fd, buff, MAX_LEN) == -1)
                    {
                        perror("clifd read");
                    }
                    else
                    {
                        cout << "read client msg suc" << endl;
                        printf("%s",buff);
                    }
                    char resp[] = "recv a client msg, this is resp msg";
                    write(events[i].data.fd, resp, strlen(resp)+1);
                    //read and mod
                }
                else if(events[i].events&EPOLLOUT)
                {
                    //send and mod
                }
            }
        }
    }
}memcached

 

 

例子中epoll listen和accept新鏈接,並響應新鏈接的請求。性能

2.工做線程or線程池this

進程的線程數量並非越多越好,也不是越少越好,須要根據機器逐步調優。spa

工做線程的工做原理, 

1.I/O線程把收到的請求放入隊列,並通知工做線程處理,隊列和通知機制能夠是傳統的加鎖消息隊列、信號量,也能夠是memcached+libevent的實現:CQ隊列裝消息,線程管道通知工做線程。

2.I/O線程沒有新的任務分配,工做線程阻塞或等待一段時間。 

線程池用到的比較少,不作評價。 

相關文章
相關標籤/搜索