對libevent+多線程服務器模型的C++封裝類

最近在看memcached的源碼,以爲它那種libevent+多線程的服務器模型真的很不錯,我將這個模型封裝成一個C++類,根據個人簡單測試,這個模型的效率真的很不錯,歡迎你們試用。java

這個類的使用方法很簡單(缺點是不太靈活),只要派生一個類,根據須要重寫如下這幾個虛函數就好了:數組

//新建鏈接成功後,會調用該函數virtual void ConnectionEvent(Conn *conn) { }//讀取完數據後,會調用該函數virtual void ReadEvent(Conn *conn) { }//發送完成功後,會調用該函數(由於串包的問題,因此並非每次發送完數據都會被調用)virtual void WriteEvent(Conn *conn) { }//斷開鏈接(客戶自動斷開或異常斷開)後,會調用該函數virtual void CloseEvent(Conn *conn, short events) { }//發生致命錯誤(若是建立子線程失敗等)後,會調用該函數//該函數的默認操做是輸出錯誤提示,終止程序virtual void ErrorQuit(const char *str);


若是你們有什麼建議或意見,歡迎給我發郵件:aa1080711@163.com服務器


上代碼:多線程

頭文件:TcpEventServer.hsocket

//TcpEventServer.h#ifndef TCPEVENTSERVER_H_#define TCPEVENTSERVER_H_#include <stdio.h>#include <stdlib.h>#include <unistd.h>#include <string.h>#include <errno.h>#include <signal.h>#include <time.h>#include <pthread.h>#include <fcntl.h>#include <map>using std::map;#include <event.h>#include <event2/bufferevent.h>#include <event2/buffer.h>#include <event2/listener.h>#include <event2/util.h>#include <event2/event.h>class TcpEventServer;class Conn;class ConnQueue;struct LibeventThread;//這個類一個鏈表的結點類,結點裏存儲各個鏈接的信息,//並提供了讀寫數據的接口class Conn
{  //此類只能由TcpBaseServer建立,  //並由ConnQueue類管理  friend class ConnQueue;  friend class TcpEventServer;private:  const int m_fd;				//socket的ID  evbuffer *m_ReadBuf;		//讀數據的緩衝區  evbuffer *m_WriteBuf;		//寫數據的緩衝區  Conn *m_Prev;				//前一個結點的指針  Conn *m_Next;				//後一個結點的指針  LibeventThread *m_Thread;  Conn(int fd=0);  ~Conn();public:  LibeventThread *GetThread() { return m_Thread; }  int GetFd() { return m_fd; }  //獲取可讀數據的長度  int GetReadBufferLen()  { return evbuffer_get_length(m_ReadBuf); }  //從讀緩衝區中取出len個字節的數據,存入buffer中,若不夠,則讀出全部數據  //返回讀出數據的字節數  int GetReadBuffer(char *buffer, int len)  { return evbuffer_remove(m_ReadBuf, buffer, len); }  //從讀緩衝區中複製出len個字節的數據,存入buffer中,若不夠,則複製出全部數據  //返回複製出數據的字節數  //執行該操做後,數據還會留在緩衝區中,buffer中的數據只是原數據的副本  int CopyReadBuffer(char *buffer, int len)  { return evbuffer_copyout(m_ReadBuf, buffer, len); }  //獲取可寫數據的長度  int GetWriteBufferLen()  { return evbuffer_get_length(m_WriteBuf); }  //將數據加入寫緩衝區,準備發送  int AddToWriteBuffer(char *buffer, int len)  { return evbuffer_add(m_WriteBuf, buffer, len); }  //將讀緩衝區中的數據移動到寫緩衝區  void MoveBufferData()  { evbuffer_add_buffer(m_WriteBuf, m_ReadBuf); }

};//帶頭尾結點的雙鏈表類,每一個結點存儲一個鏈接的數據class ConnQueue
{private:  Conn *m_head;  Conn *m_tail;public:  ConnQueue();  ~ConnQueue();  Conn *InsertConn(int fd, LibeventThread *t);  void DeleteConn(Conn *c);  //void PrintQueue();};//每一個子線程的線程信息struct LibeventThread
{  pthread_t tid;				//線程的ID  struct event_base *base;	//libevent的事件處理機  struct event notifyEvent;	//監聽管理的事件機  int notifyReceiveFd;		//管理的接收端  int notifySendFd;			//管道的發送端  ConnQueue connectQueue;		//socket鏈接的鏈表  //在libevent的事件處理中要用到不少回調函數,不能使用類隱含的this指針  //因此用這樣方式將TcpBaseServer的類指針傳過去  TcpEventServer *tcpConnect;	 //TcpBaseServer類的指針};class TcpEventServer
{private:  int m_ThreadCount;					//子線程數  int m_Port;							//監聽的端口  LibeventThread *m_MainBase;			//主線程的libevent事件處理機  LibeventThread *m_Threads;			//存儲各個子線程信息的數組  map<int, event*> m_SignalEvents;	//自定義的信號處理public:  static const int EXIT_CODE = -1;private:  //初始化子線程的數據  void SetupThread(LibeventThread *thread);  //子線程的入門函數  static void *WorkerLibevent(void *arg);  //(主線程收到請求後),對應子線程的處理函數  static void ThreadProcess(int fd, short which, void *arg);  //被libevent回調的各個靜態函數  static void ListenerEventCb(evconnlistener *listener, evutil_socket_t fd,    sockaddr *sa, int socklen, void *user_data);  static void ReadEventCb(struct bufferevent *bev, void *data);  static void WriteEventCb(struct bufferevent *bev, void *data); 
  static void CloseEventCb(struct bufferevent *bev, short events, void *data);protected:  //這五個虛函數,通常是要被子類繼承,並在其中處理具體業務的  //新建鏈接成功後,會調用該函數  virtual void ConnectionEvent(Conn *conn) { }  //讀取完數據後,會調用該函數  virtual void ReadEvent(Conn *conn) { }  //發送完成功後,會調用該函數(由於串包的問題,因此並非每次發送完數據都會被調用)  virtual void WriteEvent(Conn *conn) { }  //斷開鏈接(客戶自動斷開或異常斷開)後,會調用該函數  virtual void CloseEvent(Conn *conn, short events) { }  //發生致命錯誤(若是建立子線程失敗等)後,會調用該函數  //該函數的默認操做是輸出錯誤提示,終止程序  virtual void ErrorQuit(const char *str);public:  TcpEventServer(int count);  ~TcpEventServer();  //設置監聽的端口號,若是不須要監聽,請將其設置爲EXIT_CODE  void SetPort(int port)  { m_Port = port; }  //開始事件循環  bool StartRun();  //在tv時間裏結束事件循環  //否tv爲空,則當即中止  void StopRun(timeval *tv);  //添加和刪除信號處理事件  //sig是信號,ptr爲要回調的函數  bool AddSignalEvent(int sig, void (*ptr)(int, short, void*));  bool DeleteSignalEvent(int sig);  //添加和刪除定時事件  //ptr爲要回調的函數,tv是間隔時間,once決定是否只執行一次  event *AddTimerEvent(void(*ptr)(int, short, void*),    timeval tv, bool once);  bool DeleteTImerEvent(event *ev);
};#endif


實現文件:TcpEventServer.cpptcp

//TcpEventServer.cpp#include "TcpEventServer.h"Conn::Conn(int fd) : m_fd(fd)
{  m_Prev = NULL;  m_Next = NULL;
}

Conn::~Conn()
{

}

ConnQueue::ConnQueue()
{  //創建頭尾結點,並調整其指針  m_head = new Conn(0);  m_tail = new Conn(0);  m_head->m_Prev = m_tail->m_Next = NULL;  m_head->m_Next = m_tail;  m_tail->m_Prev = m_head;
}

ConnQueue::~ConnQueue()
{  Conn *tcur, *tnext;  tcur = m_head;  //循環刪除鏈表中的各個結點  while( tcur != NULL )  {    tnext = tcur->m_Next;    delete tcur;    tcur = tnext;  }
}

Conn *ConnQueue::InsertConn(int fd, LibeventThread *t)
{  Conn *c = new Conn(fd);  c->m_Thread = t;  Conn *next = m_head->m_Next;  c->m_Prev = m_head;  c->m_Next = m_head->m_Next;  m_head->m_Next = c;  next->m_Prev = c;  return c;
}void ConnQueue::DeleteConn(Conn *c)
{  c->m_Prev->m_Next = c->m_Next;  c->m_Next->m_Prev = c->m_Prev;  delete c;
}/*
void ConnQueue::PrintQueue()
{  Conn *cur = m_head->m_Next;  while( cur->m_Next != NULL )  {    printf("%d ", cur->m_fd);    cur = cur->m_Next;  }  printf("\n");
}
*/TcpEventServer::TcpEventServer(int count)
{  //初始化各項數據  m_ThreadCount = count;  m_Port = -1;  m_MainBase = new LibeventThread;  m_Threads = new LibeventThread[m_ThreadCount];  m_MainBase->tid = pthread_self();  m_MainBase->base = event_base_new();  //初始化各個子線程的結構體  for(int i=0; i<m_ThreadCount; i++)  {    SetupThread(&m_Threads[i]);  }

}

TcpEventServer::~TcpEventServer()
{  //中止事件循環(若是事件循環沒開始,則沒效果)  StopRun(NULL);  //釋放內存  event_base_free(m_MainBase->base);  for(int i=0; i<m_ThreadCount; i++)    event_base_free(m_Threads[i].base);  delete m_MainBase;  delete [] m_Threads;
}void TcpEventServer::ErrorQuit(const char *str)
{  //輸出錯誤信息,退出程序  fprintf(stderr, "%s", str);   
  if( errno != 0 )    
    fprintf(stderr, " : %s", strerror(errno));    
  fprintf(stderr, "\n");        
  exit(1);    
}void TcpEventServer::SetupThread(LibeventThread *me)
{  //創建libevent事件處理機制  me->tcpConnect = this;  me->base = event_base_new();  if( NULL == me->base )    ErrorQuit("event base new error");  //在主線程和子線程之間創建管道  int fds[2];  if( pipe(fds) )    ErrorQuit("create pipe error");  me->notifyReceiveFd = fds[0];  me->notifySendFd = fds[1];  //讓子線程的狀態機監聽管道  event_set( &me->notifyEvent, me->notifyReceiveFd,    EV_READ | EV_PERSIST, ThreadProcess, me );  event_base_set(me->base, &me->notifyEvent);  if ( event_add(&me->notifyEvent, 0) == -1 )    ErrorQuit("Can't monitor libevent notify pipe\n");
}void *TcpEventServer::WorkerLibevent(void *arg)
{  //開啓libevent的事件循環,準備處理業務  LibeventThread *me = (LibeventThread*)arg;  //printf("thread %u started\n", (unsigned int)me->tid);  event_base_dispatch(me->base);  //printf("subthread done\n");}bool TcpEventServer::StartRun()
{  evconnlistener *listener;  //若是端口號不是EXIT_CODE,就監聽該端口號  if( m_Port != EXIT_CODE )  {    sockaddr_in sin;    memset(&sin, 0, sizeof(sin));    sin.sin_family = AF_INET;    sin.sin_port = htons(m_Port);    listener = evconnlistener_new_bind(m_MainBase->base, 
      ListenerEventCb, (void*)this,      LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,      (sockaddr*)&sin, sizeof(sockaddr_in));    if( NULL == listener )      ErrorQuit("TCP listen error");  }  //開啓各個子線程  for(int i=0; i<m_ThreadCount; i++)  {    pthread_create(&m_Threads[i].tid, NULL,  
      WorkerLibevent, (void*)&m_Threads[i]);  }  //開啓主線程的事件循環  event_base_dispatch(m_MainBase->base);  //事件循環結果,釋放監聽者的內存  if( m_Port != EXIT_CODE )  {    //printf("free listen\n");    evconnlistener_free(listener);  }
}void TcpEventServer::StopRun(timeval *tv)
{  int contant = EXIT_CODE;  //向各個子線程的管理中寫入EXIT_CODE,通知它們退出  for(int i=0; i<m_ThreadCount; i++)  {    write(m_Threads[i].notifySendFd, &contant, sizeof(int));  }  //結果主線程的事件循環  event_base_loopexit(m_MainBase->base, tv);
}void TcpEventServer::ListenerEventCb(struct evconnlistener *listener, 
                  evutil_socket_t fd,                  struct sockaddr *sa, 
                  int socklen, 
                  void *user_data)
{  TcpEventServer *server = (TcpEventServer*)user_data;  //隨機選擇一個子線程,經過管道向其傳遞socket描述符  int num = rand() % server->m_ThreadCount;  int sendfd = server->m_Threads[num].notifySendFd;  write(sendfd, &fd, sizeof(evutil_socket_t));
}void TcpEventServer::ThreadProcess(int fd, short which, void *arg)
{  LibeventThread *me = (LibeventThread*)arg;  //從管道中讀取數據(socket的描述符或操做碼)  int pipefd = me->notifyReceiveFd;  evutil_socket_t confd;  read(pipefd, &confd, sizeof(evutil_socket_t));  //若是操做碼是EXIT_CODE,則終於事件循環  if( EXIT_CODE == confd )  {    event_base_loopbreak(me->base);    return;  }  //新建鏈接  struct bufferevent *bev;  bev = bufferevent_socket_new(me->base, confd, BEV_OPT_CLOSE_ON_FREE);  if (!bev)  {    fprintf(stderr, "Error constructing bufferevent!");    event_base_loopbreak(me->base);    return;  }  //將該連接放入隊列  Conn *conn = me->connectQueue.InsertConn(confd, me);  //準備從socket中讀寫數據  bufferevent_setcb(bev, ReadEventCb, WriteEventCb, CloseEventCb, conn);  bufferevent_enable(bev, EV_WRITE);  bufferevent_enable(bev, EV_READ);  //調用用戶自定義的鏈接事件處理函數  me->tcpConnect->ConnectionEvent(conn);
}void TcpEventServer::ReadEventCb(struct bufferevent *bev, void *data)
{  Conn *conn = (Conn*)data;  conn->m_ReadBuf = bufferevent_get_input(bev);  conn->m_WriteBuf = bufferevent_get_output(bev);  //調用用戶自定義的讀取事件處理函數  conn->m_Thread->tcpConnect->ReadEvent(conn);
} 

void TcpEventServer::WriteEventCb(struct bufferevent *bev, void *data)
{  Conn *conn = (Conn*)data;  conn->m_ReadBuf = bufferevent_get_input(bev);  conn->m_WriteBuf = bufferevent_get_output(bev);  //調用用戶自定義的寫入事件處理函數  conn->m_Thread->tcpConnect->WriteEvent(conn);

}void TcpEventServer::CloseEventCb(struct bufferevent *bev, short events, void *data)
{  Conn *conn = (Conn*)data;  //調用用戶自定義的斷開事件處理函數  conn->m_Thread->tcpConnect->CloseEvent(conn, events);  conn->GetThread()->connectQueue.DeleteConn(conn);  bufferevent_free(bev);
}bool TcpEventServer::AddSignalEvent(int sig, void (*ptr)(int, short, void*))
{  //新建一個信號事件  event *ev = evsignal_new(m_MainBase->base, sig, ptr, (void*)this);  if ( !ev || 
    event_add(ev, NULL) < 0 )  {    event_del(ev);    return false;  }  //刪除舊的信號事件(同一個信號只能有一個信號事件)  DeleteSignalEvent(sig);  m_SignalEvents[sig] = ev;  return true;
}bool TcpEventServer::DeleteSignalEvent(int sig)
{  map<int, event*>::iterator iter = m_SignalEvents.find(sig);  if( iter == m_SignalEvents.end() )    return false;  event_del(iter->second);  m_SignalEvents.erase(iter);  return true;
}event *TcpEventServer::AddTimerEvent(void (*ptr)(int, short, void *), 
                  timeval tv, bool once)
{  int flag = 0;  if( !once )    flag = EV_PERSIST;  //新建定時器信號事件  event *ev = new event;  event_assign(ev, m_MainBase->base, -1, flag, ptr, (void*)this);  if( event_add(ev, &tv) < 0 )  {    event_del(ev);    return NULL;  }  return ev;
}bool TcpEventServer::DeleteTImerEvent(event *ev)
{  int res = event_del(ev);  return (0 == res);
}


測試文件:test.cppmemcached

/*
這是一個測試用的服務器,只有兩個功能:
1:對於每一個已鏈接客戶端,每10秒向其發送一句hello, world
2:若客戶端向服務器發送數據,服務器收到後,再將數據回發給客戶端
*///test.cpp#include "TcpEventServer.h"#include <set>#include <vector>using namespace std;//測試示例class TestServer : public TcpEventServer
{private:  vector<Conn*> vec;protected:  //重載各個處理業務的虛函數  void ReadEvent(Conn *conn);  void WriteEvent(Conn *conn);  void ConnectionEvent(Conn *conn);  void CloseEvent(Conn *conn, short events);public:  TestServer(int count) : TcpEventServer(count) { }  ~TestServer() { } 
    //退出事件,響應Ctrl+C  static void QuitCb(int sig, short events, void *data);  //定時器事件,每10秒向全部客戶端發一句hello, world  static void TimeOutCb(int id, int short events, void *data);
};void TestServer::ReadEvent(Conn *conn)
{  conn->MoveBufferData();
}void TestServer::WriteEvent(Conn *conn)
{

}void TestServer::ConnectionEvent(Conn *conn)
{  TestServer *me = (TestServer*)conn->GetThread()->tcpConnect;  printf("new connection: %d\n", conn->GetFd());  me->vec.push_back(conn);
}void TestServer::CloseEvent(Conn *conn, short events)
{  printf("connection closed: %d\n", conn->GetFd());
}void TestServer::QuitCb(int sig, short events, void *data)
{ 
  printf("Catch the SIGINT signal, quit in one second\n");  TestServer *me = (TestServer*)data;  timeval tv = {1, 0};  me->StopRun(&tv);
}void TestServer::TimeOutCb(int id, short events, void *data)
{  TestServer *me = (TestServer*)data;  char temp[33] = "hello, world\n";  for(int i=0; i<me->vec.size(); i++)    me->vec[i]->AddToWriteBuffer(temp, strlen(temp));
}int main()
{  printf("pid: %d\n", getpid());  TestServer server(3);  server.AddSignalEvent(SIGINT, TestServer::QuitCb);  timeval tv = {10, 0};  server.AddTimerEvent(TestServer::TimeOutCb, tv, false);  server.SetPort(2111);  server.StartRun();  printf("done\n");    return 0;
}


編譯與運行命令:函數

qch@LinuxMint ~/program/ztemp $ g++ TcpEventServer.cpp test.cpp -o test -levent
qch@LinuxMint ~/program/ztemp $ ./test
pid: 20264new connection: 22connection closed: 22^CCatch the SIGINT signal, quit in one second
done
相關文章
相關標籤/搜索