libevent是一個基於事件驅動的網絡庫,經過在一個事件循環上註冊不一樣的事件以完成線程多路複用。因爲libevent採用c語言開發,爲了使用方便咱們能夠將它的功能經過面向對象的設計模式用c++來封裝。下面是對經常使用函數的詳細介紹:html
(1)event_base_new():建立(初始化)event_baselinux
event_base表明了一個事件循環上下文,全部須要基於這個事件循環的事件都須要註冊在它的上面。若是建立成功,最後須要使用event_base_free()來釋放資源。c++
(2)evconnlistener_new_bind():綁定一個本地端口並註冊網絡監聽事件git
參數說明:設計模式
函數會返回一個新的evconnlistener,若是建立成功,須要使用evconnlistener_free()來釋放資源。緩存
(3)event_base_dispatch():啓動事件循環和事件分發網絡
這個函數會阻塞當前線程,用戶能夠在事件回調函數中經過event_base_loopbreak()來中斷。若是不但願當前線程被堵塞也可使用event_base_loop()函數。注意,千萬不要在回調函數中清理event_base。多線程
代碼示例:負載均衡
// 建立事件循環 ev_base_ = event_base_new(); if (!ev_base_) { return false; } sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(port_); // 建立socket鏈接回調 ev_listener_ = evconnlistener_new_bind( ev_base_, SConnListenerCb, this, LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, this->backlog_, (sockaddr *)&sin, sizeof(sin)); if (!ev_listener_) { return false; } while (!quit_) { event_base_loop(ev_base_, EVLOOP_NONBLOCK); this_thread::sleep_for(chrono::milliseconds(1)); } evconnlistener_free(ev_listener_); event_base_free(ev_base_);
static void SConnListenerCb(struct evconnlistener *listen, evutil_socket_t sock, struct sockaddr *addr, int len, void *ctx) { // 解析客戶端ip char ip[16] = {0}; sockaddr_in *addr_in = (sockaddr_in *)addr; evutil_inet_ntop(AF_INET, &addr_in->sin_addr, ip, sizeof(ip)); stringstream ss; ss << ip << ":" << addr_in->sin_port << " 鏈接完成..."; LOG4CPLUS_INFO(SimpleLogger::Get()->LoggerRef(), ss.str()); SmsServer *server = (SmsServer *)ctx; int s = sock; server->ConnListener(s); }
(1)bufferevent_socket_new():建立一個帶socket緩存的事件socket
bufferevent表示一個事件緩存,每當有數據須要讀取的時候,它會先將數據從內核態取出再通知用戶。順帶提一下,libevent對事件的觸發支持兩種模式:(ET)邊沿觸發和(LT)水平觸發。若是你設置了水平觸發,可是經過bufferevent來讀取消息,不管你是否將消息接收完成,都不會被反覆觸發回調。所以,使用bufferevent來接收消息的時候,須要特別關注TCP粘包和長包。
(2)bufferevent_setcb():設置bufferevent的回調函數
參數說明:
讀回調顧名思義就是當有數據的時候會觸發的函數,但是寫回調何時觸發?有興趣的朋友能夠本身測試一下。特別須要關注事件回調函數。全部可觸發的事件包括:BEV_EVENT_READING(讀事件),BEV_EVENT_WRITING(寫事件),BEV_EVENT_EOF(結束事件),BEV_EVENT_ERROR(錯誤事件),BEV_EVENT_TIMEOUT(超時事件),BEV_EVENT_CONNECTED(鏈接事件)。若是你是在開發服務端BEV_EVENT_CONNECTED事件不會被觸發,由於鏈接事件是在bufferevent建立前產生的。BEV_EVENT_READING || BEV_EVENT_TIMEOUT能夠用來表示讀數據超時,經過這個事件能夠偵測心跳錶明距離上次讀數據已經超時。BEV_EVENT_WRITING || BEV_EVENT_TIMEOUT能夠表示寫超時,可是這個事件只會在當有數據須要被髮送但是超時未發送成功後纔會被觸發。
此外,當發生超時事件後,相關的讀寫操做都會被從bufferevent中移除。若是用戶但願繼續以前的操做,須要從新註冊讀/寫。
(3)bufferevent_set_timeouts():設置讀/寫超時
只有在經過這個函數設置了讀/寫超時後,在事件回調函數中BEV_EVENT_TIMEOUT纔會生效。
代碼示例:
bufferevent *buff_ev_ = bufferevent_socket_new(ev_base_, socket_, BEV_OPT_CLOSE_ON_FREE); if (!buff_ev_) { return false; } // 指定參數 bufferevent_setcb(buff_ev_, SReadCb, SWriteCb, SEventCb, this); bufferevent_enable(buff_ev_, EV_READ | EV_WRITE); timeval tv = {timeout_, 0}; bufferevent_set_timeouts(buff_ev_, &tv, NULL); return true;
(1)bufferevent_read():從緩存中接收數據
一般在讀回調中使用,經過返回值判斷緩存中是否還有數據
(2)bufferevent_write():向緩衝寫入數據以經過socket發送
返回值表示有多少數據已經被寫入進內核
libevent除了能夠用在網絡上,還能夠和管道(pipe)結合用來生成管道事件。
(1)event_config_new():建立一個事件配置對象
event_config能夠用來建立一個非默認的事件循環,一般使用這個函數配合event_base_new_with_config()建立event_base。最後須要使用event_config_free()來釋放資源。
(2)event_new():建立一個讀/寫事件
和bufferevent的建立不一樣,event_new()只會建立一個配套的事件,若是在事件中用戶沒有對數據進行處理,回調會一直被觸發。
代碼示例:
// 初始化一對管道,只能在linux系統下使用 int pipefd[2]; if (pipe(pipefd)) { return false; } // pipefd[0]讀取管道 pipefd[1]發送管道 this->pipe_endpoint_ = pipefd[1]; // 建立管道事件 event_config *ev_conf = event_config_new(); event_config_set_flag(ev_conf, EVENT_BASE_FLAG_NOLOCK); this->ev_base_ = event_base_new_with_config(ev_conf); event_config_free(ev_conf); if (!ev_base_) { return false; } pipe_ev_ = event_new(this->ev_base_, pipefd[0], EV_READ | EV_PERSIST, SEventCb, this); event_add(pipe_ev_, 0);
libevent能夠實現對線程的多路複用,所以咱們能夠在一個線程中完成服務端對多個客戶端的同時讀寫操做。這樣作雖然可以最大限度的利用系統資源,但是沒法充分發揮cpu多線程的處理能力。開發高可用和適合高負載的服務端咱們依然應該考慮啓動多個線程來處理數據。關鍵是咱們如何將事件分發到不一樣的線程中以保持多個線程的負載均衡。
相關的源碼文件爲sms_server,work_group,work_thread和socket_manager
服務初始化,註冊鏈接監聽事件並初始化線程組
bool SmsServer::Init() { // 建立事件循環 ev_base_ = event_base_new(); if (!ev_base_) { return false; } sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(port_); // 建立socket鏈接回調 ev_listener_ = evconnlistener_new_bind( ev_base_, SConnListenerCb, this, LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, this->backlog_, (sockaddr *)&sin, sizeof(sin)); if (!ev_listener_) { return false; } // 建立線程組管理類 boss_ = new WorkGroup(thread_num_); boss_->Init(); return true; }
線程組負責管理全部的線程
bool WorkGroup::Init() { // 直接初始化指定的工做線程 for (int i = 0; i < num_; i++) { int id = group_.size() + 1; WorkThread *work = new WorkThread(this, id, net_bus_); if (!work->Init()) { return false; } work->Start(); // thread start... group_.push_back(work); // 將當前初始化完成的工做線程註冊進消息總線 net_bus_->Regist(work); // regist thread to netbus } return true; }
每個線程在初始化的時候會建立一條管道並在本身的事件循環上註冊對應的讀回調,對外部暴露Notify方法用來激活事件
bool WorkThread::Init() { // 初始化一對管道,只能在linux系統下使用 int pipefd[2]; if (pipe(pipefd)) { return false; } // pipefd[0]讀取管道 pipefd[1]發送管道 this->pipe_endpoint_ = pipefd[1]; // 建立管道事件 event_config *ev_conf = event_config_new(); event_config_set_flag(ev_conf, EVENT_BASE_FLAG_NOLOCK); this->ev_base_ = event_base_new_with_config(ev_conf); event_config_free(ev_conf); if (!ev_base_) { return false; } pipe_ev_ = event_new(this->ev_base_, pipefd[0], EV_READ | EV_PERSIST, SEventCb, this); event_add(pipe_ev_, 0); return true; } void WorkThread::Notify(const char *sign) { // 激活 int re = write(this->pipe_endpoint_, sign, 1); if (re <= 0) { LOG4CPLUS_ERROR(SimpleLogger::Get()->LoggerRef(), "管道激活失敗"); } }
在讀回調中獲取套接字,建立鏈接管理對象SocketManager
void WorkThread::Activated(int fd) { char buf[2] = {0}; int re = read(fd, buf, 1); socket_list_mtx_.lock(); if (strcmp(buf, "c") == 0) // 通知有新的客戶端鏈接 { // new client connect, create SocketManager if (socket_list_.empty()) { socket_list_mtx_.unlock(); return; } // 讀取一條套接字 int client_sock = socket_list_.front(); socket_list_.pop_front(); // 建立socketManager SocketManager *manager = new SocketManager(this, ev_base_, client_sock, AppContext::Get()->client_timeout()); manager->Init(); sock_manager_list_.push_back(manager); stringstream ss; ss << "SocketManager:" << client_sock << " 建立完成"; LOG4CPLUS_DEBUG(SimpleLogger::Get()->LoggerRef(), ss.str()); } socket_list_mtx_.unlock(); }
客戶端鏈接後將建立的套接字交給負載最小的線程處理
void WorkGroup::CreateConnection(int sock) { int min = -1; WorkThread *work = nullptr; // 遍歷尋找負載最輕的線程 for (auto it = group_.begin(); it != group_.end(); it++) { if (min == -1) { min = (*it)->connect_num(); work = (*it); } else if ((*it)->connect_num() < min) { min = (*it)->connect_num(); work = (*it); } } // 添加一條socket fd進隊列並經過管道激活 work->AddSocket(sock); work->Notify("c"); }
完整源碼已經發布在碼雲上。
相關文章:《開源項目SMSS開發指南》