用一個示例講解我如何一步步實現高併發服務(基於C++)

去年作了一個遠程升級的服務。客戶端鏈接此服務能夠下載更新程序。簡單點說就是個TCP sever。基於C++。nginx

運行環境是centOS 6.5apache

剛開始客戶端數量少並且訪問不頻繁,因此沒太關注併發的問題。當時用工具測試大概只能支持的40/秒的併發訪問,並且已經有數據串包的狀況出現了。最近有空作了很多的優化並記錄了筆記備忘。編程

下面給出的代碼都不是完整的項目源碼,我只是截取了關鍵部分用於說明主題安全

我選擇的測試工具是一個tcp客戶端工具,能夠快捷的進行多客戶端鏈接的測試。多線程

這裏寫圖片描述

線程安全的單例模式

最初的版本是經過多線程實現高併發的。個人工程裏有兩個類是單例模式,一個參數文件管理類,一個是日誌管理類。一開始我實現的時候沒有考慮線程安全,因而第一步我先把這兩個類改爲線程安全測試看看效果。併發

增長線程安全前的代碼片斷(只給出參數文件管理類的實現)異步

//.h
class AppCof {
public:
    static AppCof* open_cof();
    
private:
    AppCof();

    class CGarbo   //它的惟一工做就是在析構函數中刪除CSingleton的實例  
    {  
    public:  
        ~CGarbo()  
        {  
            if(AppCof::m_pInstance)  
                delete AppCof::m_pInstance;  
        }  
    };  
    static CGarbo Garbo;  //定義一個靜態成員變量,程序結束時,系統會自動調用它的析構函數  
...
//.cpp
AppCof* AppCof::open_cof(){

    if(m_pInstance == NULL){
        m_pInstance = new AppCof();
    }
    return m_pInstance;
}
...

增長線程安全後socket

//.h
class AppCof:boost::noncopyable
{
public:
    static AppCof* open_cof();
private:
    AppCof();
    static AppCof *m_pInstance;
    static void init();
    static pthread_once_t ponce_;
    ...
void AppCof::init()
{
    m_pInstance = new AppCof();
    if(m_pInstance != NULL)
    {
        m_pInstance->get_env();
        m_pInstance->read_cof();
    }
}

AppCof* AppCof::open_cof(){

    pthread_once(&ponce_, &AppCof::init);
    return m_pInstance;
}

...

這裏有兩個重點,一是pthread_once的用法,還有就是boost::noncopyable。tcp

先說說前者,函數

int pthread_once(pthread_once_t *once_control, void (*init_routine) (void));

本函數使用初值爲PTHREAD_ONCE_INIT的once_control變量保證init_routine()函數在本進程執行序列中僅執行一次。在多線程編程環境下,儘管pthread_once()調用會出如今多個線程中,init_routine()函數僅執行一次,究竟在哪一個線程中執行是不定的,是由內核調度來決定。

boost::noncopyable這種用法其實從名字能夠窺探一二,一個類繼承自它就表示該類不能經過賦值,複製等手段建立新的對象了。

優化IO讀寫機制

這部分從select,epoll這些IO處理上下手。我用三個方案分別測試,效果仍是比較明顯。

最初的版本是這樣的:

void run_srv(const char* i_port){
    for(;;){
        client = server.accept_client();
        std::thread t1(base_proc,client,i_port);
        usleep(200);
        t1.detach();
    }
    
}

void base_proc(Socket::TCP* i_client,const char* i_port){
    Socket::TCP* client = i_client;
    i_client = NULL;
    TmsProc* tmpc =new  TmsProc(client);
    tmpc->run();
    delete tmpc;
    return ;
}
void TmsProc::run(){
    Writelog::Trace(9,"業務處理開始");
    try{
        
        for(;;){
        tm.tv_sec = 3;
        tm.tv_usec = 0;
        FD_ZERO(&set);
        FD_SET(p_client->_socket_id,&set);
        
            int iret = select(p_client->_socket_id+1,&set,NULL,NULL,&tm);
            if(iret < 0){
                Writelog::Trace(2,"select出錯:%s",strerror(errno));
                return;
            }
            if(iret == 0){
                Writelog::Trace(2,"select超時");
                return;
            }
            Writelog::Trace(9,"監控到能夠進行接收");
            //收取信息
            if(read_sock() == false){
                Writelog::Trace(3,"檢測到客戶端套接字異常,準備斷開鏈接");
                break;
            }
            ...

很簡單,主要流程都在run函數裏。這個函數能夠優化的地方有幾處。好比兩個if的判斷能夠改爲if elseif的形式。由於兩次if雖然是互斥的可是程序都會判斷一次,效率比較低。

另外接收數據的條件能夠用FD_ISSET判斷是否有數據可讀,若是有才真正接收,不然不處理。

因此第一種優化方案很快出爐

void TmsProc::run(){

        tm.tv_sec = 60;
        tm.tv_usec = 0;
    try{
        
        for(;;){
        
        
        FD_ZERO(&set);
        FD_SET(p_client->_socket_id,&set);
        
            int iret = select(p_client->_socket_id+1,&set,NULL,NULL,&tm);
            if(iret < 0){
                pLog_tmsProc->Trace(2,"select出錯:%s",strerror(errno));
                return;
            }
            else if(iret == 0){
                pLog_tmsProc->Trace(2,"select超時");
                return;
            }
            if(FD_ISSET(p_client->_socket_id,&set))
            {

            pLog_tmsProc->Trace(9,"監控到能夠進行接收");
              if(read_sock() == false){
                      pLog_tmsProc->Trace(3,"檢測到客戶端套接字異常,準備斷開鏈接");
                      break;
              }
                   ...

注意到我把超時時間改爲了60秒,

tm.tv_sec = 60;

這是我在實際測試時發現,當併發量大時,程序在處理數量多的鏈接時,前面分配成功的線程會超時退出,看下日誌就明白了:

14:39:40][140579076663072]:準備accept
[14:39:40][140579076663072]:接待並分配文件描述符[44],主服務描述符[3]
[14:39:40][140579076663072]:接到鏈接請求,準備啓動線程TCP:0x1e89e90,IP:10.0.0.106,PORT:19803
[14:39:40][140579076663072]:啓動服務線程於140577928623872
[14:39:40][140579076663072]:等待接收客戶端鏈接
[14:39:40][140579076663072]:準備accept
[14:39:40][140579076663072]:接待並分配文件描述符[46],主服務描述符[3]
[14:39:40][140579076663072]:接到鏈接請求,準備啓動線程TCP:0x1e8a170,IP:10.0.0.106,PORT:19804
[14:39:40][140577928623872]:接到來自TMS端口的請求
[14:39:40][140577928623872]:業務處理開始
[14:39:40][140577918134016]:接到來自TMS端口的請求
[14:39:40][140577918134016]:業務處理開始
[14:39:40][140579076663072]:啓動服務線程於140577918134016
[14:39:40][140579076663072]:等待接收客戶端鏈接
[14:39:40][140579076663072]:準備accept
[14:39:40][140579076663072]:接待並分配文件描述符[48],主服務描述符[3]
[14:39:40][140579076663072]:接到鏈接請求,準備啓動線程TCP:0x1e8a450,IP:10.0.0.106,PORT:19805
[14:39:40][140577500821248]:接到來自TMS端口的請求
[14:39:40][140577500821248]:業務處理開始
[14:39:40][140579076663072]:啓動服務線程於140577500821248
[14:39:40][140579076663072]:等待接收客戶端鏈接
[14:39:40][140579076663072]:準備accept
[14:39:41][140579076663072]:接待並分配文件描述符[50],主服務描述符[3]

由於工具是模擬多個客戶端同時發起請求,因而就有了上面這樣的分配線程的過程,會持續的時間比較長(還要寫日誌),也就是同時發生的鏈接數越多,超時時間就要設置越長。超時改爲60秒後。通過工具實測,500鏈接/500毫秒(應該至關於1000/秒的併發量)的處理都正常。

這裏寫圖片描述

性能大大提升。

可是問題仍是很明顯,就是超時時間。隨着鏈接數的增大,超時也要一直增大才能保證沒有線程"掉隊",可是這個時間太大了會影響真正接收數據時的效率。

第二種優化方案思路來源於apache和nginx的性能差別。

apache和nginx他倆的一個重要區別是前者基於多線程實現高併發,然後者基於多進程(fork)。

而衆所周知,nginx不少場景的高併發是好於apache的。

因此個人第二種方案,基本思路是爲每一個鏈接fork一個單獨的進程處理。獨立進程有個最大的好處是不須要加鎖了(不解釋)。修改好的代碼片斷以下(我已經把全部帶鎖的地方都去掉了,這裏不貼出來了)。

void run_srv(const char* i_port){
    for(;;)
    {
        client = server.accept_client();
        pWriteLogInstance->Trace(1,"接到鏈接請求,準備啓動進程TCP:%p,IP:%s,PORT:%u",client,client->ip().c_str(),client->port());
        
        if(client->_socket_id < 0)
        {
            if(errno == EINTR || errno == ECONNABORTED)  
                continue;  
            else  
            {  
                cout << "accept error" << endl;  
                return;  
            }  
        }

        fpid = fork();
        if(fpid < 0)
        {
            pWriteLogInstance->Trace(9,"fork error");
        }
        else if(fpid > 0)//father
        {
            pWriteLogInstance->Trace(1,"father process start");
            client->close();
        }
        else //child
        {
            server.close();
            pWriteLogInstance->Trace(1,"child process start");

            TmsProc* tmpc =new  TmsProc(client);
            
            tmpc->run();
            
            delete tmpc;

            if(client != NULL)
            {
                client->close();
                delete client;
                client = NULL;
            }
            
            exit(-6);
        }

        usleep(10);
void TmsProc::run(){
        while(1)
        {
            
        //收取信息
            if(read_sock() == false){
                pLog_tmsProc->Trace(3,"檢測到客戶端套接字異常,準備斷開鏈接");
                send_info.is_bad_qry = true;
                break;
            }
        
            if(stc_tms.un_parse_size == 0)
            {
                pLog_tmsProc->Trace(3,"沒有接受到有效數據,客戶端關閉了");
                break;
            }
                   ....

測試結果跟我預想的差很少。效果也是不錯的。一樣是1000次/秒的併發量數據沒有出現問題。並且相比較前一種方案,沒有了超時時間的困擾。

第三種方案,我考慮試試 IO 處理中的王者,epoll

epoll是Linux內核爲處理大批量文件描述符而做了改進的poll,是Linux下多路複用IO接口select/poll的加強版本,它能顯著提升程序在大量併發鏈接中只有少許活躍的狀況下的系統CPU利用率。另外一點緣由就是獲取事件的時候,它無須遍歷整個被偵聽的描述符集,只要遍歷那些被內核IO事件異步喚醒而加入Ready隊列的描述符集合就好了。

爲了簡單起見,我這裏只是用了單線程的epoll,用循環來輪詢客戶端的socket id來處理多個客戶端鏈接的狀況。單線程的epoll號稱也能處理1萬以上的併發量,我要測試下是否是有這邊牛X。

epoll方案的代碼以下:

void TmsProc::run(){

        struct epoll_event event;   // 告訴內核要監聽什麼事件    
           struct epoll_event wait_event[OPEN_MAX]; //內核監聽完的結果 
    
        Socket::TCP server;
       pLog_tmsProc->Trace(9,"準備監聽端口:%d",this->port);
       server.listen_on_port(this->port,OPEN_MAX);

       Socket::TCP* client;

       //4.epoll相應參數準備  
        int fd[OPEN_MAX+1];  
        int i = 0, maxi = 0;  
        int number = 0;
        memset(fd,-1, sizeof(fd));  
        fd[0] = server._socket_id;


        pLog_tmsProc->Trace(9,"epoll 開始準備");
        int epfd = epoll_create(OPEN_MAX+1); 
        if( -1 == epfd )
        {    
            pLog_tmsProc->Trace(9,"epoll create error");
            return;    
        }    
        
        event.data.fd = server._socket_id;     //監聽套接字    
        event.events = EPOLLIN; // 表示對應的文件描述符能夠讀

        //5.事件註冊函數,將監聽套接字描述符 sockfd 加入監聽事件    
        int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, server._socket_id, &event);    
        if(-1 == ret){    
            pLog_tmsProc->Trace(9,"epoll_ctl error");   
            return;    
        }   

       

        pLog_tmsProc->Trace(9,"業務處理開始");
        
        while(1)
        {
            // 監視並等待多個文件(標準輸入,udp套接字)描述符的屬性變化(是否可讀)    
            // 沒有屬性變化,這個函數會阻塞,直到有變化才往下執行,這裏沒有設置超時    

            pLog_tmsProc->Trace(9,"epoll 開始監聽");
            number = epoll_wait(epfd, wait_event, OPEN_MAX, -1);   

            for(int i = 0; i < number; i++)
            {
                if( (wait_event[i].events & EPOLLERR) || ( wait_event[i].events & EPOLLHUP ) || !(wait_event[i].events & EPOLLIN) )
                {
                    pLog_tmsProc->Trace(9,"epoll error");
                    close(wait_event[i].data.fd);
                    continue;
                }
                else if(server._socket_id == wait_event[i].data.fd )  
                {   
                    while(1)
                    {
                        client = server.accept_client();
                        if(client->_socket_id == -1)
                         {
                             if( errno == EAGAIN || errno == EWOULDBLOCK )
                             {
                                 break;
                             }
                             else
                             {
                                 pLog_tmsProc->Trace(9,"accept error");
                                 break;
                             }
                         }

                        Socket::TCP::make_socket_non_blocking(client->_socket_id);

                        event.data.fd = client->_socket_id; //監聽套接字    
                        event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET; // 表示對應的文件描述符能夠讀  
                          
                        //6.1.3.事件註冊函數,將監聽套接字描述符 connfd 加入監聽事件 
                        pLog_tmsProc->Trace(9,"爲客戶端註冊epoll監聽"); 
                        ret = epoll_ctl(epfd, EPOLL_CTL_ADD, client->_socket_id, &event);
                        
                        if(ret < 0){    
                            pLog_tmsProc->Trace(9,"epoll_ctl error");     
                        }   
                        event.data.fd = client->_socket_id;
                    }
                }
                else
                {
                    //收取信息
                   if(read_sock(wait_event[i].data.fd) == false){
                        pLog_tmsProc->Trace(3,"檢測到客戶端套接字異常,準備斷開鏈接");
                           close(wait_event[i].data.fd); 
                   }
                   ...

只能說epoll確實比較給力,我這只是個單線程的服務,用工具測試上述代碼,500個併發也是妥妥的。

這裏寫圖片描述

相關文章
相關標籤/搜索