高性能服務器開發基礎系列 (二)Reactor模式

系列目錄react

第01篇 主線程與工做線程的分工linux

第02篇 Reactor模式ios

第03篇 一個服務器程序的架構介紹c++

第04篇 如何將socket設置爲非阻塞模式編程

第05篇 如何編寫高性能日誌windows

第06篇 關於網絡編程的一些實用技巧和細節服務器

第07篇 開源一款即時通信軟件的源碼微信

第08篇 高性能服務器架構設計總結1網絡

第09篇 高性能服務器架構設計總結2session

第10篇 高性能服務器架構設計總結3

第11篇 高性能服務器架構設計總結4

最近一直在看遊雙的《高性能linux服務器編程》一書,下載連接: http://download.csdn.net/deta...

書上是這麼介紹Reactor模式的:

圖片描述

按照這個思路,我寫個簡單的練習:

/** 
 *@desc:   用reactor模式練習服務器程序,main.cpp
 *@author: zhangyl
 *@date:   2016.11.23
 */

#include <iostream>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>  //for htonl() and htons()
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <signal.h>     //for signal()
#include <pthread.h>
#include <semaphore.h>
#include <list>
#include <errno.h>
#include <time.h>
#include <sstream>
#include <iomanip> //for std::setw()/setfill()
#include <stdlib.h>


#define WORKER_THREAD_NUM   5

#define min(a, b) ((a <= b) ? (a) : (b)) 

int g_epollfd = 0;
bool g_bStop = false;
int g_listenfd = 0;
pthread_t g_acceptthreadid = 0;
pthread_t g_threadid[WORKER_THREAD_NUM] = { 0 };
pthread_cond_t g_acceptcond;
pthread_mutex_t g_acceptmutex;

pthread_cond_t g_cond /*= PTHREAD_COND_INITIALIZER*/;
pthread_mutex_t g_mutex /*= PTHREAD_MUTEX_INITIALIZER*/;

pthread_mutex_t g_clientmutex;

std::list<int> g_listClients;

void prog_exit(int signo)
{
    ::signal(SIGINT, SIG_IGN);
    //::signal(SIGKILL, SIG_IGN);//該信號不能被阻塞、處理或者忽略
    ::signal(SIGTERM, SIG_IGN);

    std::cout << "program recv signal " << signo << " to exit." << std::endl;

    g_bStop = true;

    ::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, g_listenfd, NULL);

    //TODO: 是否須要先調用shutdown()一下?
    ::shutdown(g_listenfd, SHUT_RDWR);
    ::close(g_listenfd);
    ::close(g_epollfd);

    ::pthread_cond_destroy(&g_acceptcond);
    ::pthread_mutex_destroy(&g_acceptmutex);
    
    ::pthread_cond_destroy(&g_cond);
    ::pthread_mutex_destroy(&g_mutex);

    ::pthread_mutex_destroy(&g_clientmutex);
}

bool create_server_listener(const char* ip, short port)
{
    g_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
    if (g_listenfd == -1)
        return false;

    int on = 1;
    ::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));
    ::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEPORT, (char *)&on, sizeof(on));

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr)); 
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = inet_addr(ip);
    servaddr.sin_port = htons(port);
    if (::bind(g_listenfd, (sockaddr *)&servaddr, sizeof(servaddr)) == -1)
        return false;

    if (::listen(g_listenfd, 50) == -1)
        return false;

    g_epollfd = ::epoll_create(1);
    if (g_epollfd == -1)
        return false;

    struct epoll_event e;
    memset(&e, 0, sizeof(e));
    e.events = EPOLLIN | EPOLLRDHUP;
    e.data.fd = g_listenfd;
    if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, g_listenfd, &e) == -1)
        return false;

    return true;
}

void release_client(int clientfd)
{
    if (::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)
        std::cout << "release client socket failed as call epoll_ctl failed" << std::endl;

    ::close(clientfd);
}

void* accept_thread_func(void* arg)
{   
    while (!g_bStop)
    {
        ::pthread_mutex_lock(&g_acceptmutex);
        ::pthread_cond_wait(&g_acceptcond, &g_acceptmutex);
        //::pthread_mutex_lock(&g_acceptmutex);

        //std::cout << "run loop in accept_thread_func" << std::endl;

        struct sockaddr_in clientaddr;
        socklen_t addrlen;
        int newfd = ::accept(g_listenfd, (struct sockaddr *)&clientaddr, &addrlen);
        ::pthread_mutex_unlock(&g_acceptmutex);
        if (newfd == -1)
            continue;

        std::cout << "new client connected: " << ::inet_ntoa(clientaddr.sin_addr) << ":" << ::ntohs(clientaddr.sin_port) << std::endl;

        //將新socket設置爲non-blocking
        int oldflag = ::fcntl(newfd, F_GETFL, 0);
        int newflag = oldflag | O_NONBLOCK;
        if (::fcntl(newfd, F_SETFL, newflag) == -1)
        {
            std::cout << "fcntl error, oldflag =" << oldflag << ", newflag = " << newflag << std::endl;
            continue;
        }

        struct epoll_event e;
        memset(&e, 0, sizeof(e));
        e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
        e.data.fd = newfd;
        if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1)
        {
            std::cout << "epoll_ctl error, fd =" << newfd << std::endl;
        }
    }

    return NULL;
}


void* worker_thread_func(void* arg)
{   
    while (!g_bStop)
    {
        int clientfd;
        ::pthread_mutex_lock(&g_clientmutex);
        while (g_listClients.empty())
            ::pthread_cond_wait(&g_cond, &g_clientmutex);
        clientfd = g_listClients.front();
        g_listClients.pop_front();  
        pthread_mutex_unlock(&g_clientmutex);

        //gdb調試時不能實時刷新標準輸出,用這個函數刷新標準輸出,使信息在屏幕上實時顯示出來
        std::cout << std::endl;

        std::string strclientmsg;
        char buff[256];
        bool bError = false;
        while (true)
        {
            memset(buff, 0, sizeof(buff));
            int nRecv = ::recv(clientfd, buff, 256, 0);
            if (nRecv == -1)
            {
                if (errno == EWOULDBLOCK)
                    break;
                else
                {
                    std::cout << "recv error, client disconnected, fd = " << clientfd << std::endl;
                    release_client(clientfd);
                    bError = true;
                    break;
                }
                    
            }
            //對端關閉了socket,這端也關閉。
            else if (nRecv == 0)
            {
                std::cout << "peer closed, client disconnected, fd = " << clientfd << std::endl;
                release_client(clientfd);
                bError = true;
                break;
            }

            strclientmsg += buff;
        }

        //出錯了,就不要再繼續往下執行了
        if (bError)
            continue;
        
        std::cout << "client msg: " << strclientmsg;

        //將消息加上時間標籤後發回
        time_t now = time(NULL);
        struct tm* nowstr = localtime(&now);
        std::ostringstream ostimestr;
        ostimestr << "[" << nowstr->tm_year + 1900 << "-" 
                  << std::setw(2) << std::setfill('0') << nowstr->tm_mon + 1 << "-" 
                  << std::setw(2) << std::setfill('0') << nowstr->tm_mday << " "
                  << std::setw(2) << std::setfill('0') << nowstr->tm_hour << ":" 
                  << std::setw(2) << std::setfill('0') << nowstr->tm_min << ":" 
                  << std::setw(2) << std::setfill('0') << nowstr->tm_sec << "]server reply: ";

        strclientmsg.insert(0, ostimestr.str());
        
        while (true)
        {
            int nSent = ::send(clientfd, strclientmsg.c_str(), strclientmsg.length(), 0);
            if (nSent == -1)
            {
                if (errno == EWOULDBLOCK)
                {
                    ::sleep(10);
                    continue;
                }
                else
                {
                    std::cout << "send error, fd = " << clientfd << std::endl;
                    release_client(clientfd);
                    break;
                }
                   
            }          

            std::cout << "send: " << strclientmsg;
            strclientmsg.erase(0, nSent);

            if (strclientmsg.empty())
                break;
        }
    }

    return NULL;
}

void daemon_run()
{
    int pid;
    signal(SIGCHLD, SIG_IGN);
    //1)在父進程中,fork返回新建立子進程的進程ID;
    //2)在子進程中,fork返回0;
    //3)若是出現錯誤,fork返回一個負值;
    pid = fork();
    if (pid < 0)
    {
        std:: cout << "fork error" << std::endl;
        exit(-1);
    }
    //父進程退出,子進程獨立運行
    else if (pid > 0) {
        exit(0);
    }
    //以前parent和child運行在同一個session裏,parent是會話(session)的領頭進程,
    //parent進程做爲會話的領頭進程,若是exit結束執行的話,那麼子進程會成爲孤兒進程,並被init收養。
    //執行setsid()以後,child將從新得到一個新的會話(session)id。
    //這時parent退出以後,將不會影響到child了。
    setsid();
    int fd;
    fd = open("/dev/null", O_RDWR, 0);
    if (fd != -1)
    {
        dup2(fd, STDIN_FILENO);
        dup2(fd, STDOUT_FILENO);
        dup2(fd, STDERR_FILENO);
    }
    if (fd > 2)
        close(fd);
 
}


int main(int argc, char* argv[])
{  
    short port = 0;
    int ch;
    bool bdaemon = false;
    while ((ch = getopt(argc, argv, "p:d")) != -1)
    {
        switch (ch)
        {
        case 'd':
            bdaemon = true;
            break;
        case 'p':
            port = atol(optarg);
            break;
        }
    }

    if (bdaemon)
        daemon_run();


    if (port == 0)
        port = 12345;
     
    if (!create_server_listener("0.0.0.0", port))
    {
        std::cout << "Unable to create listen server: ip=0.0.0.0, port=" << port << "." << std::endl;
        return -1;
    }

    
    //設置信號處理
    signal(SIGCHLD, SIG_DFL);
    signal(SIGPIPE, SIG_IGN);
    signal(SIGINT, prog_exit);
    //signal(SIGKILL, prog_exit);<span style="font-family:Arial, Helvetica, sans-serif;">//該信號不能被阻塞、處理或者忽略</span>

    signal(SIGTERM, prog_exit);

    ::pthread_cond_init(&g_acceptcond, NULL);
    ::pthread_mutex_init(&g_acceptmutex, NULL);

    ::pthread_cond_init(&g_cond, NULL);
    ::pthread_mutex_init(&g_mutex, NULL);

    ::pthread_mutex_init(&g_clientmutex, NULL);
     
    ::pthread_create(&g_acceptthreadid, NULL, accept_thread_func, NULL);
    //啓動工做線程
    for (int i = 0; i < WORKER_THREAD_NUM; ++i)
    {
        ::pthread_create(&g_threadid[i], NULL, worker_thread_func, NULL);
    }

    while (!g_bStop)
    {       
        struct epoll_event ev[1024];
        int n = ::epoll_wait(g_epollfd, ev, 1024, 10);
        if (n == 0)
            continue;
        else if (n < 0)
        {
            std::cout << "epoll_wait error" << std::endl;
            continue;
        }

        int m = min(n, 1024);
        for (int i = 0; i < m; ++i)
        {
            //通知接收鏈接線程接收新鏈接
            if (ev[i].data.fd == g_listenfd)
                pthread_cond_signal(&g_acceptcond);
            //通知普通工做線程接收數據
            else
            {               
                pthread_mutex_lock(&g_clientmutex);              
                g_listClients.push_back(ev[i].data.fd);
                pthread_mutex_unlock(&g_clientmutex);
                pthread_cond_signal(&g_cond);
                //std::cout << "signal" << std::endl;
            }
                
        }

    }
    
    return 0;
}

程序的功能一個簡單的echo服務:客戶端鏈接上服務器以後,給服務器發送信息,服務器加上時間戳等信息後返回給客戶端。

使用到的知識點有:

1.條件變量

2.epoll的邊緣觸發模式

程序的大體框架是:

主線程只負責監聽偵聽socket上是否有新鏈接,若是有新鏈接到來,交給一個叫accept的工做線程去接收新鏈接,並將新鏈接socket綁定到主線程使用epollfd上去。

主線程若是偵聽到客戶端的socket上有可讀事件,則通知另外五個工做線程去接收處理客戶端發來的數據,並將數據加上時間戳後發回給客戶端。

能夠經過傳遞-p port來設置程序的監聽端口號;能夠經過傳遞-d來使程序以daemon模式運行在後臺。這也是標準linux daemon模式的書寫方法。

程序難點和須要注意的地方是:

條件變量爲了防止虛假喚醒,必定要在一個循環裏面調用pthread_cond_wait()函數,我在worker_thread_func()中使用了:

while (g_listClients.empty())
    ::pthread_cond_wait(&g_cond, &g_clientmutex);

accept_thread_func()函數裏面我沒有使用循環,這樣會有問題嗎?

使用條件變量pthread_cond_wait()函數的時候必定要先得到與該條件變量相關的mutex,即像下面這樣的結構:

mutex_lock(...);

while (condition is true)
    ::pthread_cond_wait(...);

//這裏能夠有其餘代碼...
mutex_unlock(...);

//這裏能夠有其餘代碼...

由於pthread_cond_wait()若是阻塞的話,它解鎖相關mutex和阻塞當前線程這兩個動做加在一塊兒是原子的。

做爲服務器端程序最好對偵聽socket調用setsocketopt()設置SO_REUSEADDR和SO_REUSEPORT兩個標誌,由於服務程序有時候會須要重啓(好比調試的時候就會不斷重啓),若是不設置這兩個標誌的話,綁定端口時就會調用失敗。由於一個端口使用後,即便再也不使用,由於四次揮手該端口處於TIME_WAIT狀態,有大約2min的MSL(Maximum Segment Lifetime,最大存活期)。這2min內,該端口是不能被重複使用的。你的服務器程序上次使用了這個端口號,接着重啓,由於這個緣故,你再次綁定這個端口就會失敗(bind函數調用失敗)。要不你就每次重啓時須要等待2min後再試(這在頻繁重啓程序調試是難以接收的),或者設置這種SO_REUSEADDR和SO_REUSEPORT當即回收端口使用。

其實,SO_REUSEADDR在windows上和Unix平臺上還有些細微的區別,我在libevent源碼中看到這樣的描述:

int evutil_make_listen_socket_reuseable(evutil_socket_t sock)
{
#ifndef WIN32
    int one = 1;
    /* REUSEADDR on Unix means, "don't hang on to this address after the
     * listener is closed."  On Windows, though, it means "don't keep other
     * processes from binding to this address while we're using it. */
    return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void*) &one,
        (ev_socklen_t)sizeof(one));
#else
    return 0;
#endif
}

注意註釋部分,在Unix平臺上設置這個選項意味着,任意進程能夠複用該地址;而在windows,不要阻止其餘進程複用該地址。也就是在在Unix平臺上,若是不設置這個選項,任意進程在必定時間內,不能bind該地址;在windows平臺上,在必定時間內,其餘進程不能bind該地址,而本進程卻能夠再次bind該地址。

epoll_wait對新鏈接socket使用的是邊緣觸發模式EPOLLET(edge trigger),而不是默認的水平觸發模式(level trigger)。由於若是採起水平觸發模式的話,主線程檢測到某個客戶端socket數據可讀時,通知工做線程去收取該socket上的數據,這個時候主線程繼續循環,只要在工做線程沒有將該socket上數據所有收完,或者在工做線程收取數據的過程當中,客戶端有新數據到來,主線程會繼續發通知(經過pthread_cond_signal())函數,再次通知工做線程收取數據。這樣會可能致使多個工做線程同時調用recv函數收取該客戶端socket上的數據,這樣產生的結果將會致使數據錯亂。

相反,採起邊緣觸發模式,只有等某個工做線程將那個客戶端socket上數據所有收取完畢,主線程的epoll_wait纔可能會再次觸發來通知工做線程繼續收取那個客戶端socket新來的數據。

代碼中有這樣一行:

//gdb調試時不能實時刷新標準輸出,用這個函數刷新標準輸出,使信息在屏幕上實時顯示出來
std::cout << std::endl;

若是不加上這一行,正常運行服務器程序,程序中要打印到控制檯的信息都會打印出來,可是若是用gdb調試狀態下,程序的全部輸出就不顯示了。我不知道這是否是gdb的一個bug,因此這裏加上std::endl來輸出一個換行符並flush標準輸出,讓輸出顯示出來。(std::endl不只是輸出一個換行符並且是同時刷新輸出,至關於fflush()函數)。

程序我部署起來了,你可使用linux的nc命令或本身寫程序鏈接服務器來查看程序效果,固然也可使用telnet命令,方法:

linux:

nc 120.55.94.78 12345

telnet 120.55.94.78 12345

而後就能夠給服務器自由發送數據了,服務器會給你發送的信息加上時間戳返回給你。效果如圖:

clipboard.png

另外我將這個代碼改寫了成純C++11版本,使用CMake編譯,爲了支持編譯必須加上這-std=c++11:

CMakeLists.txt代碼以下:

cmake_minimum_required(VERSION 2.8)

PROJECT(myreactorserver)

AUX_SOURCE_DIRECTORY(./ SRC_LIST)
SET(EXECUTABLE_OUTPUT_PATH ./)

ADD_DEFINITIONS(-g -W -Wall -Wno-deprecated -DLINUX -D_REENTRANT -D_FILE_OFFSET_BITS=64 -DAC_HAS_INFO -DAC_HAS_WARNING -DAC_HAS_ERROR -DAC_HAS_CRITICAL -DTIXML_USE_STL -DHAVE_CXX_STDHEADERS ${CMAKE_CXX_FLAGS} -std=c++11)

INCLUDE_DIRECTORIES(
./
)
LINK_DIRECTORIES(
./
)

set(
main.cpp
myreator.cpp
)

ADD_EXECUTABLE(myreactorserver ${SRC_LIST})

TARGET_LINK_LIBRARIES(myreactorserver pthread)

myreactor.h文件內容:

/**
*@desc: myreactor頭文件, myreactor.h
*@author: zhangyl
*@date: 2016.12.03
*/
#ifndef __MYREACTOR_H__
#define __MYREACTOR_H__

#include <list>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>

#define WORKER_THREAD_NUM   5

class CMyReactor
{
public:
    CMyReactor();
    ~CMyReactor();

    bool init(const char* ip, short nport);
    bool uninit();

    bool close_client(int clientfd);

    static void* main_loop(void* p);

private:
    //no copyable
    CMyReactor(const CMyReactor& rhs);
    CMyReactor& operator = (const CMyReactor& rhs);

    bool create_server_listener(const char* ip, short port);
    
    static void accept_thread_proc(CMyReactor* pReatcor);
    static void worker_thread_proc(CMyReactor* pReatcor);

private:
    //C11語法能夠在這裏初始化
    int                             m_listenfd = 0;
    int                             m_epollfd  = 0;
    bool                         m_bStop    = false;
    
    std::shared_ptr<std::thread> m_acceptthread;
    std::shared_ptr<std::thread> m_workerthreads[WORKER_THREAD_NUM];
    
    std::condition_variable         m_acceptcond;
    std::mutex                     m_acceptmutex;

    std::condition_variable         m_workercond ;
    std::mutex                     m_workermutex;

    std::list<int>                 m_listClients;
};

#endif //!__MYREACTOR_H__

myreactor.cpp文件內容:

/** 
 *@desc: myreactor實現文件, myreactor.cpp
 *@author: zhangyl
 *@date: 2016.12.03
 */
#include "myreactor.h"
#include <iostream>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>  //for htonl() and htons()
#include <fcntl.h>
#include <sys/epoll.h>
#include <list>
#include <errno.h>
#include <time.h>
#include <sstream>
#include <iomanip> //for std::setw()/setfill()
#include <unistd.h>

#define min(a, b) ((a <= b) ? (a) : (b))

CMyReactor::CMyReactor()
{
    //m_listenfd = 0;
    //m_epollfd = 0;
    //m_bStop = false;
}

CMyReactor::~CMyReactor()
{

}

bool CMyReactor::init(const char* ip, short nport)
{
    if (!create_server_listener(ip, nport))
    {
        std::cout << "Unable to bind: " << ip << ":" << nport << "." << std::endl;
        return false;
    }


    std::cout << "main thread id = " << std::this_thread::get_id() << std::endl;

    //啓動接收新鏈接的線程
    m_acceptthread.reset(new std::thread(CMyReactor::accept_thread_proc, this));
    
    //啓動工做線程
    for (auto& t : m_workerthreads)
    {
        t.reset(new std::thread(CMyReactor::worker_thread_proc, this));
    }


    return true;
}

bool CMyReactor::uninit()
{
    m_bStop = true;
    m_acceptcond.notify_one();
    m_workercond.notify_all();

    m_acceptthread->join();
    for (auto& t : m_workerthreads)
    {
        t->join();
    }

    ::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, m_listenfd, NULL);

    //TODO: 是否須要先調用shutdown()一下?
    ::shutdown(m_listenfd, SHUT_RDWR);
    ::close(m_listenfd);
    ::close(m_epollfd);

    return true;
}

bool CMyReactor::close_client(int clientfd)
{
    if (::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)
    {
        std::cout << "close client socket failed as call epoll_ctl failed" << std::endl;
        //return false;
    }
        

    ::close(clientfd);

    return true;
}


void* CMyReactor::main_loop(void* p)
{
    std::cout << "main thread id = " << std::this_thread::get_id() << std::endl;
    
    CMyReactor* pReatcor = static_cast<CMyReactor*>(p);
    
    while (!pReatcor->m_bStop)
    {
        struct epoll_event ev[1024];
        int n = ::epoll_wait(pReatcor->m_epollfd, ev, 1024, 10);
        if (n == 0)
            continue;
        else if (n < 0)
        {
            std::cout << "epoll_wait error" << std::endl;
            continue;
        }

        int m = min(n, 1024);
        for (int i = 0; i < m; ++i)
        {
            //通知接收鏈接線程接收新鏈接
            if (ev[i].data.fd == pReatcor->m_listenfd)
                pReatcor->m_acceptcond.notify_one();
            //通知普通工做線程接收數據
            else
            {
                {
                    std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);
                    pReatcor->m_listClients.push_back(ev[i].data.fd);
                }
                                
                pReatcor->m_workercond.notify_one();
                //std::cout << "signal" << std::endl;
            }// end if

        }// end for-loop
    }// end while

    std::cout << "main loop exit ..." << std::endl;

    return NULL;
}

void CMyReactor::accept_thread_proc(CMyReactor* pReatcor)
{
    std::cout << "accept thread, thread id = " << std::this_thread::get_id() << std::endl;

    while (true)
    {
        int newfd;
        struct sockaddr_in clientaddr;
        socklen_t addrlen;
        {
            std::unique_lock<std::mutex> guard(pReatcor->m_acceptmutex);
            pReatcor->m_acceptcond.wait(guard);
            if (pReatcor->m_bStop)
                break;

            //std::cout << "run loop in accept_thread_proc" << std::endl;
            
            newfd = ::accept(pReatcor->m_listenfd, (struct sockaddr *)&clientaddr, &addrlen);
        }
        if (newfd == -1)
            continue;

        std::cout << "new client connected: " << ::inet_ntoa(clientaddr.sin_addr) << ":" << ::ntohs(clientaddr.sin_port) << std::endl;

        //將新socket設置爲non-blocking
        int oldflag = ::fcntl(newfd, F_GETFL, 0);
        int newflag = oldflag | O_NONBLOCK;
        if (::fcntl(newfd, F_SETFL, newflag) == -1)
        {
            std::cout << "fcntl error, oldflag =" << oldflag << ", newflag = " << newflag << std::endl;
            continue;
        }

        struct epoll_event e;
        memset(&e, 0, sizeof(e));
        e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
        e.data.fd = newfd;
        if (::epoll_ctl(pReatcor->m_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1)
        {
            std::cout << "epoll_ctl error, fd =" << newfd << std::endl;
        }
    }

    std::cout << "accept thread exit ..." << std::endl;
}

void CMyReactor::worker_thread_proc(CMyReactor* pReatcor)
{
    std::cout << "new worker thread, thread id = " << std::this_thread::get_id() << std::endl;

    while (true)
    {
        int clientfd;
        {
            std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);
            while (pReatcor->m_listClients.empty())
            {
                if (pReatcor->m_bStop)
                {
                    std::cout << "worker thread exit ..." << std::endl;
                    return;
                }
                    
                pReatcor->m_workercond.wait(guard);
            }
                
            clientfd = pReatcor->m_listClients.front();
            pReatcor->m_listClients.pop_front();
        }

        //gdb調試時不能實時刷新標準輸出,用這個函數刷新標準輸出,使信息在屏幕上實時顯示出來
        std::cout << std::endl;

        std::string strclientmsg;
        char buff[256];
        bool bError = false;
        while (true)
        {
            memset(buff, 0, sizeof(buff));
            int nRecv = ::recv(clientfd, buff, 256, 0);
            if (nRecv == -1)
            {
                if (errno == EWOULDBLOCK)
                    break;
                else
                {
                    std::cout << "recv error, client disconnected, fd = " << clientfd << std::endl;
                    pReatcor->close_client(clientfd);
                    bError = true;
                    break;
                }

            }
            //對端關閉了socket,這端也關閉。
            else if (nRecv == 0)
            {
                std::cout << "peer closed, client disconnected, fd = " << clientfd << std::endl;
                pReatcor->close_client(clientfd);
                bError = true;
                break;
            }

            strclientmsg += buff;
        }

        //出錯了,就不要再繼續往下執行了
        if (bError)
            continue;

        std::cout << "client msg: " << strclientmsg;

        //將消息加上時間標籤後發回
        time_t now = time(NULL);
        struct tm* nowstr = localtime(&now);
        std::ostringstream ostimestr;
        ostimestr << "[" << nowstr->tm_year + 1900 << "-"
            << std::setw(2) << std::setfill('0') << nowstr->tm_mon + 1 << "-"
            << std::setw(2) << std::setfill('0') << nowstr->tm_mday << " "
            << std::setw(2) << std::setfill('0') << nowstr->tm_hour << ":"
            << std::setw(2) << std::setfill('0') << nowstr->tm_min << ":"
            << std::setw(2) << std::setfill('0') << nowstr->tm_sec << "]server reply: ";

        strclientmsg.insert(0, ostimestr.str());

        while (true)
        {
            int nSent = ::send(clientfd, strclientmsg.c_str(), strclientmsg.length(), 0);
            if (nSent == -1)
            {
                if (errno == EWOULDBLOCK)
                {
                    std::this_thread::sleep_for(std::chrono::milliseconds(10));
                    continue;
                }
                else
                {
                    std::cout << "send error, fd = " << clientfd << std::endl;
                    pReatcor->close_client(clientfd);
                    break;
                }

            }

            std::cout << "send: " << strclientmsg;
            strclientmsg.erase(0, nSent);

            if (strclientmsg.empty())
                break;
        }
    }
}

bool CMyReactor::create_server_listener(const char* ip, short port)
{
    m_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
    if (m_listenfd == -1)
        return false;

    int on = 1;
    ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));
    ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEPORT, (char *)&on, sizeof(on));

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = inet_addr(ip);
    servaddr.sin_port = htons(port);
    if (::bind(m_listenfd, (sockaddr *)&servaddr, sizeof(servaddr)) == -1)
        return false;

    if (::listen(m_listenfd, 50) == -1)
        return false;

    m_epollfd = ::epoll_create(1);
    if (m_epollfd == -1)
        return false;

    struct epoll_event e;
    memset(&e, 0, sizeof(e));
    e.events = EPOLLIN | EPOLLRDHUP;
    e.data.fd = m_listenfd;
    if (::epoll_ctl(m_epollfd, EPOLL_CTL_ADD, m_listenfd, &e) == -1)
        return false;

    return true;
}

main.cpp文件內容:

/** 
 *@desc:   用reactor模式練習服務器程序
 *@author: zhangyl
 *@date:   2016.12.03
 */

#include <iostream>
#include <signal.h>     //for signal()
#include<unistd.h>
#include <stdlib.h>        //for exit()
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "myreactor.h"

CMyReactor g_reator;

void prog_exit(int signo)
{
    std::cout << "program recv signal " << signo << " to exit." << std::endl; 

    g_reator.uninit();
}

void daemon_run()
{
    int pid;
    signal(SIGCHLD, SIG_IGN);
    //1)在父進程中,fork返回新建立子進程的進程ID;
    //2)在子進程中,fork返回0;
    //3)若是出現錯誤,fork返回一個負值;
    pid = fork();
    if (pid < 0)
    {
        std:: cout << "fork error" << std::endl;
        exit(-1);
    }
    //父進程退出,子進程獨立運行
    else if (pid > 0) {
        exit(0);
    }
    //以前parent和child運行在同一個session裏,parent是會話(session)的領頭進程,
    //parent進程做爲會話的領頭進程,若是exit結束執行的話,那麼子進程會成爲孤兒進程,並被init收養。
    //執行setsid()以後,child將從新得到一個新的會話(session)id。
    //這時parent退出以後,將不會影響到child了。
    setsid();
    int fd;
    fd = open("/dev/null", O_RDWR, 0);
    if (fd != -1)
    {
        dup2(fd, STDIN_FILENO);
        dup2(fd, STDOUT_FILENO);
        dup2(fd, STDERR_FILENO);
    }
    if (fd > 2)
        close(fd);
}


int main(int argc, char* argv[])
{  
    //設置信號處理
    signal(SIGCHLD, SIG_DFL);
    signal(SIGPIPE, SIG_IGN);
    signal(SIGINT, prog_exit);
    signal(SIGKILL, prog_exit);
    signal(SIGTERM, prog_exit);
    
    short port = 0;
    int ch;
    bool bdaemon = false;
    while ((ch = getopt(argc, argv, "p:d")) != -1)
    {
        switch (ch)
        {
        case 'd':
            bdaemon = true;
            break;
        case 'p':
            port = atol(optarg);
            break;
        }
    }

    if (bdaemon)
        daemon_run();


    if (port == 0)
        port = 12345;

    
    if (!g_reator.init("0.0.0.0", 12345))
        return -1;
    
    g_reator.main_loop(&g_reator);

    return 0;
}

完整實例代碼下載地址:

普通版本:https://pan.baidu.com/s/1o82Mkno

C++11版本:https://pan.baidu.com/s/1dEJdrih

您能夠接着閱讀下一篇:《一個服務器程序的架構介紹》。

歡迎關注公衆號『easyserverdev』。若是有任何技術或者職業方面的問題須要我提供幫助,可經過這個公衆號與我取得聯繫,此公衆號不只分享高性能服務器開發經驗和故事,同時也免費爲廣大技術朋友提供技術答疑和職業解惑,您有任何問題均可以在微信公衆號直接留言,我會盡快回復您。

圖片描述

相關文章
相關標籤/搜索