開源項目SMSS開發指南(二)——基於libevent的線程池

libevent是一套輕量級的網絡庫,基於事件驅動開發。可以實現多線程的多路複用和註冊事件響應。本文將介紹libevent的基本功能以及如何利用libevent開發一個線程池。

一. 使用指南

監聽服務和註冊鏈接事件

libevent是一個基於事件驅動的網絡庫,經過在一個事件循環上註冊不一樣的事件以完成線程多路複用。因爲libevent採用c語言開發,爲了使用方便咱們能夠將它的功能經過面向對象的設計模式用c++來封裝。下面是對經常使用函數的詳細介紹:html

(1)event_base_new():建立(初始化)event_baselinux

event_base表明了一個事件循環上下文,全部須要基於這個事件循環的事件都須要註冊在它的上面。若是建立成功,最後須要使用event_base_free()來釋放資源。c++

(2)evconnlistener_new_bind():綁定一個本地端口並註冊網絡監聽事件git

參數說明:設計模式

  • struct event_base* base 前文建立好的base,事件將關聯到這個事件循環上
  • evconnlistener_cb cb 事件觸發的回調
  • void *ptr 回調函數的參數,這個參數能夠由用戶任意指定,方便在回調函數中使用
  • unsigned flags 事件的附加標識,表明事件操做
  • int backlog 網絡緩存大小
  • const struct sockaddr *sa socket地址
  • int socklen socket地址長度

函數會返回一個新的evconnlistener,若是建立成功,須要使用evconnlistener_free()來釋放資源。緩存

(3)event_base_dispatch():啓動事件循環和事件分發網絡

這個函數會阻塞當前線程,用戶能夠在事件回調函數中經過event_base_loopbreak()來中斷。若是不但願當前線程被堵塞也可使用event_base_loop()函數。注意,千萬不要在回調函數中清理event_base。多線程

代碼示例:負載均衡

// 建立事件循環
ev_base_ = event_base_new();
if (!ev_base_)
{
    return false;
}
sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(port_);
// 建立socket鏈接回調
ev_listener_ = evconnlistener_new_bind(
    ev_base_,
    SConnListenerCb,
    this,
    LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,
    this->backlog_,
    (sockaddr *)&sin,
    sizeof(sin));
if (!ev_listener_)
{
    return false;
}
while (!quit_)
{
    event_base_loop(ev_base_, EVLOOP_NONBLOCK);
    this_thread::sleep_for(chrono::milliseconds(1));
}
evconnlistener_free(ev_listener_);
event_base_free(ev_base_);
static void SConnListenerCb(struct evconnlistener *listen, evutil_socket_t sock, struct sockaddr *addr, int len, void *ctx)
{
    // 解析客戶端ip
    char ip[16] = {0};
    sockaddr_in *addr_in = (sockaddr_in *)addr;
    evutil_inet_ntop(AF_INET, &addr_in->sin_addr, ip, sizeof(ip));
    stringstream ss;
    ss << ip << ":" << addr_in->sin_port << " 鏈接完成...";
    LOG4CPLUS_INFO(SimpleLogger::Get()->LoggerRef(), ss.str());
    SmsServer *server = (SmsServer *)ctx;
    int s = sock;

    server->ConnListener(s);
}

建立鏈接和註冊讀、寫、事件監聽

(1)bufferevent_socket_new():建立一個帶socket緩存的事件socket

bufferevent表示一個事件緩存,每當有數據須要讀取的時候,它會先將數據從內核態取出再通知用戶。順帶提一下,libevent對事件的觸發支持兩種模式:(ET)邊沿觸發和(LT)水平觸發。若是你設置了水平觸發,可是經過bufferevent來讀取消息,不管你是否將消息接收完成,都不會被反覆觸發回調。所以,使用bufferevent來接收消息的時候,須要特別關注TCP粘包和長包。

(2)bufferevent_setcb():設置bufferevent的回調函數

參數說明:

  • struct bufferevent* bufev 關聯對象
  • bufferevent_data_cb readcb 讀回調 函數原型void (*bufferevent_data_cb)(struct bufferevent *bev, void* ctx)
  • bufferevent_data_cb writecb 寫回調 函數原型(同上)
  • bufferevent_event_cb eventcb 事件回調 函數原型void (*bufferevent_event_cb)(struct bufferevent *bev, short what, void *ctx)
  • void *cbarg 回調函數的最後一個參數,由用戶指定

讀回調顧名思義就是當有數據的時候會觸發的函數,但是寫回調何時觸發?有興趣的朋友能夠本身測試一下。特別須要關注事件回調函數。全部可觸發的事件包括:BEV_EVENT_READING(讀事件),BEV_EVENT_WRITING(寫事件),BEV_EVENT_EOF(結束事件),BEV_EVENT_ERROR(錯誤事件),BEV_EVENT_TIMEOUT(超時事件),BEV_EVENT_CONNECTED(鏈接事件)。若是你是在開發服務端BEV_EVENT_CONNECTED事件不會被觸發,由於鏈接事件是在bufferevent建立前產生的。BEV_EVENT_READING || BEV_EVENT_TIMEOUT能夠用來表示讀數據超時,經過這個事件能夠偵測心跳錶明距離上次讀數據已經超時。BEV_EVENT_WRITING || BEV_EVENT_TIMEOUT能夠表示寫超時,可是這個事件只會在當有數據須要被髮送但是超時未發送成功後纔會被觸發。

此外,當發生超時事件後,相關的讀寫操做都會被從bufferevent中移除。若是用戶但願繼續以前的操做,須要從新註冊讀/寫。

(3)bufferevent_set_timeouts():設置讀/寫超時

只有在經過這個函數設置了讀/寫超時後,在事件回調函數中BEV_EVENT_TIMEOUT纔會生效。

代碼示例:

bufferevent *buff_ev_ = bufferevent_socket_new(ev_base_, socket_, BEV_OPT_CLOSE_ON_FREE);
if (!buff_ev_)
{
    return false;
}
// 指定參數
bufferevent_setcb(buff_ev_, SReadCb, SWriteCb, SEventCb, this);
bufferevent_enable(buff_ev_, EV_READ | EV_WRITE);
timeval tv = {timeout_, 0};
bufferevent_set_timeouts(buff_ev_, &tv, NULL);
return true;

讀寫數據

(1)bufferevent_read():從緩存中接收數據

一般在讀回調中使用,經過返回值判斷緩存中是否還有數據

(2)bufferevent_write():向緩衝寫入數據以經過socket發送

返回值表示有多少數據已經被寫入進內核

建立基於管道的事件

libevent除了能夠用在網絡上,還能夠和管道(pipe)結合用來生成管道事件。

(1)event_config_new():建立一個事件配置對象

event_config能夠用來建立一個非默認的事件循環,一般使用這個函數配合event_base_new_with_config()建立event_base。最後須要使用event_config_free()來釋放資源。

(2)event_new():建立一個讀/寫事件

和bufferevent的建立不一樣,event_new()只會建立一個配套的事件,若是在事件中用戶沒有對數據進行處理,回調會一直被觸發。

代碼示例:

// 初始化一對管道,只能在linux系統下使用
int pipefd[2];
if (pipe(pipefd))
{
    return false;
}
// pipefd[0]讀取管道 pipefd[1]發送管道
this->pipe_endpoint_ = pipefd[1];
// 建立管道事件
event_config *ev_conf = event_config_new();
event_config_set_flag(ev_conf, EVENT_BASE_FLAG_NOLOCK);
this->ev_base_ = event_base_new_with_config(ev_conf);
event_config_free(ev_conf);
if (!ev_base_)
{
    return false;
}

pipe_ev_ = event_new(this->ev_base_, pipefd[0], EV_READ | EV_PERSIST, SEventCb, this);
event_add(pipe_ev_, 0);

 2、實現線程池

線程池實現原理

libevent能夠實現對線程的多路複用,所以咱們能夠在一個線程中完成服務端對多個客戶端的同時讀寫操做。這樣作雖然可以最大限度的利用系統資源,但是沒法充分發揮cpu多線程的處理能力。開發高可用和適合高負載的服務端咱們依然應該考慮啓動多個線程來處理數據。關鍵是咱們如何將事件分發到不一樣的線程中以保持多個線程的負載均衡。

  1. 當服務啓動的時候首先建立N條線程。每個線程對應一個事件循環event_base。
  2. 主線程負責監聽指定端口並在鏈接的回調函數中處理新鏈接套接字的處理。
  3. 當有新的客戶端鏈接後,主線程會把套接字先保存在一個隊列中。掃描當前全部線程的處理量,選擇負載最小的線程利用管道發送一個信號(‘c’)。對應線程的事件循環在管道的讀事件中從隊列中獲取這個套接字,並創建對應的bufferevent進行處理。當前線程負載量+1。
  4. 客戶端斷開後通知bufferevent所在的線程將負載量減一。

smss源碼閱讀

相關的源碼文件爲sms_server,work_group,work_thread和socket_manager

服務初始化,註冊鏈接監聽事件並初始化線程組

bool SmsServer::Init()
{
    // 建立事件循環
    ev_base_ = event_base_new();
    if (!ev_base_)
    {
        return false;
    }
    sockaddr_in sin;
    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(port_);
    // 建立socket鏈接回調
    ev_listener_ = evconnlistener_new_bind(
        ev_base_,
        SConnListenerCb,
        this,
        LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,
        this->backlog_,
        (sockaddr *)&sin,
        sizeof(sin));
    if (!ev_listener_)
    {
        return false;
    }
    // 建立線程組管理類
    boss_ = new WorkGroup(thread_num_);
    boss_->Init();
    return true;
}

線程組負責管理全部的線程

bool WorkGroup::Init()
{
    // 直接初始化指定的工做線程
    for (int i = 0; i < num_; i++)
    {
        int id = group_.size() + 1;
        WorkThread *work = new WorkThread(this, id, net_bus_);
        if (!work->Init())
        {
            return false;
        }
        work->Start(); // thread start...
        group_.push_back(work);
        // 將當前初始化完成的工做線程註冊進消息總線
        net_bus_->Regist(work); // regist thread to netbus
    }
    return true;
}

每個線程在初始化的時候會建立一條管道並在本身的事件循環上註冊對應的讀回調,對外部暴露Notify方法用來激活事件

bool WorkThread::Init()
{
    // 初始化一對管道,只能在linux系統下使用
    int pipefd[2];
    if (pipe(pipefd))
    {
        return false;
    }
    // pipefd[0]讀取管道 pipefd[1]發送管道
    this->pipe_endpoint_ = pipefd[1];
    // 建立管道事件
    event_config *ev_conf = event_config_new();
    event_config_set_flag(ev_conf, EVENT_BASE_FLAG_NOLOCK);
    this->ev_base_ = event_base_new_with_config(ev_conf);
    event_config_free(ev_conf);
    if (!ev_base_)
    {
        return false;
    }

    pipe_ev_ = event_new(this->ev_base_, pipefd[0], EV_READ | EV_PERSIST, SEventCb, this);
    event_add(pipe_ev_, 0);
    return true;
}

void WorkThread::Notify(const char *sign)
{
    // 激活
    int re = write(this->pipe_endpoint_, sign, 1);
    if (re <= 0)
    {
        LOG4CPLUS_ERROR(SimpleLogger::Get()->LoggerRef(), "管道激活失敗");
    }
}

在讀回調中獲取套接字,建立鏈接管理對象SocketManager

void WorkThread::Activated(int fd)
{
    char buf[2] = {0};
    int re = read(fd, buf, 1);
    socket_list_mtx_.lock();
    if (strcmp(buf, "c") == 0) // 通知有新的客戶端鏈接
    {
        // new client connect, create SocketManager
        if (socket_list_.empty())
        {
            socket_list_mtx_.unlock();
            return;
        }
        // 讀取一條套接字
        int client_sock = socket_list_.front();
        socket_list_.pop_front();
        // 建立socketManager
        SocketManager *manager = new SocketManager(this, ev_base_, client_sock, AppContext::Get()->client_timeout());
        manager->Init();
        sock_manager_list_.push_back(manager);
        stringstream ss;
        ss << "SocketManager:" << client_sock << " 建立完成";
        LOG4CPLUS_DEBUG(SimpleLogger::Get()->LoggerRef(), ss.str());
    }

    socket_list_mtx_.unlock();
}

客戶端鏈接後將建立的套接字交給負載最小的線程處理

void WorkGroup::CreateConnection(int sock)
{
    int min = -1;
    WorkThread *work = nullptr;
    // 遍歷尋找負載最輕的線程
    for (auto it = group_.begin(); it != group_.end(); it++)
    {
        if (min == -1)
        {
            min = (*it)->connect_num();
            work = (*it);
        }
        else if ((*it)->connect_num() < min)
        {
            min = (*it)->connect_num();
            work = (*it);
        }
    }
    // 添加一條socket fd進隊列並經過管道激活
    work->AddSocket(sock);
    work->Notify("c");
}

 

完整源碼已經發布在碼雲上。

相關文章:《開源項目SMSS開發指南》

相關文章
相關標籤/搜索