本文從總體上介紹下百度的通用通訊組件, 須要下載源碼的同窗,請點這裏 http://bgcc.baidu.com/node
第一部分:服務端邏輯數組
1.線程池,在服務啓動時建立線程,socket
2.一個線程池對應一個同步的任務隊列 ,函數
3.線程池中的每一個線程,初始時,都阻塞在任務隊列, 等待喚醒,oop
4.主線程進入事件循環,監聽事件post
5.遇到讀事件,調用添加事件時指定的回調函數, 默認爲DataCallback, 若是是鏈接請求,調用AcceptDataCallbackui
6.DataCallback解析包頭和包體, 獲得processor名稱, 找到對應的processor, this
7.把processor、包體、序列化對象封裝到成一個任務,push到同步隊列中,並觸發信號量 sem_postspa
8.線程池中的線程醒來, 調用processor處理task任務, 從包體中讀出指望執行的函數名, 執行該函數(業務實現)線程
9.業務在該函數中實現功能後, 還需調用序列化類的writeMessageBegin, writeMessageEnd, 進而調用socket類中的write, 並最終調用send返回處理結果給客戶端。
這裏沒有使用EpollOut事件, 當發送緩衝區滿時, 會返回-1,errno=EAGAIN, 此時bgcc直接返回失敗了, 沒有繼續處理。 這裏若是了註冊epoll_out事件,就能夠處理了。
10.從新註冊事件
看一個服務端使用的例子
1 Server* server; 2 3 void* server_func(const bool* isstopped, void*) { 4 SharedPointer<IProcessor> xp( 5 new MathProcessor( 6 SharedPointer<Math>( 7 new MathImpl))); 8 9 ServiceManager sm; 10 sm.add_service(xp); 11 12 ThreadPool tp; 13 tp.init(10); 14 15 server = new Server(&sm, &tp, 8321); 16 if (0 != server->serve()) { 17 return 0; 18 } 19 return NULL; 20 } 21 22 int main(int argc, char* argv[]) { 23 log_open("server.conf"); 24 Thread t(server_func); 25 t.start(); 26 27 return 0; 28 }
例子中, Server是bgcc::EpollServer類。定義以下
1 #ifndef _BGCC2_EPOLL_SERVER_H_ 2 #define _BGCC2_EPOLL_SERVER_H_ 3 4 #ifndef _WIN32 5 6 #include "bgcc_common.h" 7 #include "server.h" 8 #include "event_poll.h" 9 #include "mempool.h" 10 #include "service_manager.h" 11 #include "thread_pool.h" 12 #include "server_task.h" 13 14 namespace bgcc { 15 16 class EpollServer : public IServer { 17 public: 18 EpollServer(ServiceManager* service_manager, 19 ThreadPool* thread_pool, 20 uint16_t port, 21 const std::string& node = ""); 22 23 virtual ~EpollServer() { 24 } 25 26 virtual int32_t init(); 27 virtual int32_t serve(); 28 virtual int32_t stop(); 29 30 ServiceManager* get_service_manager(); 31 ThreadPool* get_thread_pool(); 32 33 TaskAsso Tasks[MAXNFD]; 34 35 protected: 36 enum state_t { 37 S_UNINIT, 38 S_INIT, 39 S_SERVE, 40 S_STOPPED 41 }; 42 int32_t socket_init(); 43 44 ServiceManager* _service_manager; 45 ThreadPool* _thread_pool; 46 uint16_t _port; 47 state_t _state; 48 int32_t _listenfd; 49 EventLoop _loop; 50 std::string _node; 51 }; 52 53 typedef EpollServer Server; 54 } 55 56 #endif // _WIN32 57 58 #endif // _BGCC2_EPOLL_SERVER_H_
咱們主要關心EpollServer::serve(), 其實現以下
1 int32_t EpollServer::serve() { 2 3 int32_t ret; 4 if (0 != (ret = init())) { 5 return ret; 6 } 7 8 if (S_INIT != _state) { 9 BGCC_NOTICE("bgcc", "Need to call `init' before `serve' on Instance of EpollServer\n"); 10 return E_BGCC_SERVER_NEED_INIT; 11 } 12 13 _listenfd = socket_init(); 14 if (INVALID_SOCKET == _listenfd) { 15 return E_BGCC_SERVER_CREATE_LISTENFD_FAILED; 16 } 17 18 Event e; 19 EventCallback::PrepareEvent(e, _listenfd, const_cast<EpollServer*>(this)); 20 e.read_cb = EventCallback::AcceptCallback; 21 _loop.add_event(&e); 22 23 BGCC_NOTICE("bgcc", "fd=%d Is Begin To Wait for accept new client On %s:%d", 24 _listenfd, (_node.empty()?"*":_node.c_str()), _port); 25 _state = S_SERVE; 26 27 return _loop.loop(); 28 }
其中_loop類型是bgcc::EventLoop, 其定義以下
1 /** 2 * @brief 事件循環 3 * @see 4 * @note 5 * @author liuxupeng(liuxupeng@baidu.com) 6 * @date 2012年06月14日 20時05分36秒 7 */ 8 class EventLoop { 9 public: 10 /** 11 * @brief EventLoop 構造函數 12 * @see 13 * @note 14 * @author liuxupeng(liuxupeng@baidu.com) 15 * @date 2012年06月14日 20時19分50秒 16 */ 17 EventLoop(); 18 19 /** 20 * @brief create 建立內部epoll 21 * 22 * @return 成功返回0 23 * @see 24 * @note 25 * @author liuxupeng(liuxupeng@baidu.com) 26 * @date 2012年06月14日 20時19分59秒 27 */ 28 int32_t create(); 29 int32_t destroy(); 30 31 int32_t add_event(Event* event); 32 int32_t del_event(Event* event); 33 34 int32_t loop(); 35 int32_t unloop(); 36 bool is_stopped() const; 37 private: 38 enum state_t { 39 S_UNINIT, 40 S_INIT, 41 S_LOOP, 42 S_STOP, 43 S_DESTROYED 44 }; 45 private: 46 state_t _state; 47 volatile bool _stopped; 48 int32_t _epfd; 49 struct epoll_event _ep_events[MAXNFD]; 50 Event _events[MAXNFD]; 51 }; 52 }
其中loop()實現以下
1 int32_t EventLoop::loop() { 2 if (S_INIT != _state) { 3 return -1; 4 } 5 _state = S_LOOP; 6 _stopped = false; 7 8 while (!_stopped) { 9 int32_t numevents; 10 while((numevents=epoll_wait(_epfd, _ep_events, MAXNFD, 200))==SOCKET_ERROR&&EINTR==errno); 11 12 if (numevents > 0) { 13 int j; 14 15 for (j = 0; j < numevents; j++) { 16 struct epoll_event* e = _ep_events + j; 17 int32_t fd = e->data.fd; 18 19 if (e->events & EPOLLIN) { 20 if (_events[fd].read_cb) 21 (_events[fd].read_cb)(this, fd, _events[fd].read_cb_arg); 22 } 23 if (e->events & EPOLLOUT) { 24 if (_events[fd].write_cb) 25 (_events[fd].write_cb)(this, fd, _events[fd].write_cb_arg); 26 } 27 if (e->events & EPOLLERR) { 28 if (_events[fd].error_cb) 29 (_events[fd].error_cb)(this, fd, _events[fd].error_cb_arg); 30 } 31 } 32 } 33 } 34 _state = S_STOP; 35 return 0;
從實現上看, 事件循環使用了epoll_wait,EventLoop類中有一個事件數組成員_events[], 全部fd事件都保存在該數組中。 增刪fd事件,也都須要操做該數組。
添加事件
1 int32_t EventLoop::add_event(Event* event) { 2 if (S_INIT != _state && S_LOOP != _state) { 3 return -1; 4 } 5 6 if (NULL == event) { 7 return 0; 8 } 9 10 int32_t fd = event->fd; 11 uint32_t mask = event->mask; 12 13 int32_t op; 14 if (EVENT_NONE == _events[fd].mask) { 15 op = EPOLL_CTL_ADD; 16 } 17 else { 18 op = EPOLL_CTL_MOD; 19 mask |= _events[fd].mask; 20 } 21 _events[fd].mask = mask; 22 23 struct epoll_event ee; 24 25 // To fix valgrind error: Syscall param epoll_ctl(event) points to uninitialised byte(s) 26 memset(&ee.data, 0, sizeof(ee.data)); 27 28 ee.data.fd = fd; 29 ee.events = 0; 30 31 if (mask & EVENT_READ) { 32 ee.events |= EPOLLIN; 33 _events[fd].read_cb = event->read_cb; 34 _events[fd].read_cb_arg = event->read_cb_arg; 35 } 36 37 if (mask & EVENT_WRITE) { 38 ee.events |= EPOLLOUT; 39 _events[fd].write_cb = event->write_cb; 40 _events[fd].write_cb_arg = event->write_cb_arg; 41 } 42 43 if (mask & EVENT_ERROR) { 44 ee.events |= EPOLLERR; 45 _events[fd].error_cb = event->error_cb; 46 _events[fd].error_cb_arg = event->error_cb_arg; 47 } 48 49 if(SocketTool::set_nonblock(fd, 1)!=0){ 50 BGCC_WARN("bgcc", "Before Add fd=%d to Epoll Set To Nonblock Failed(%d)", 51 fd, BgccGetLastError()); 52 return -1; 53 } 54 55 int32_t ret=epoll_ctl(_epfd, op, fd, &ee); 56 if(0!=ret&&EPOLL_CTL_ADD==op){ 57 if(SocketTool::set_nonblock(fd, 0)!=0){ 58 BGCC_WARN("bgcc", "Add fd=%d to Epoll Failed Set To Block Failed(%d)", 59 fd, BgccGetLastError()); 60 } 61 } 62 63 return ret; 64 }
刪除事件
1 int32_t EventLoop::del_event(Event* event) { 2 if (S_INIT != _state && S_LOOP != _state) { 3 return -1; 4 } 5 6 if (NULL == event) { 7 return 0; 8 } 9 10 int32_t fd = event->fd; 11 uint32_t mask = _events[fd].mask & (~event->mask); 12 _events[fd].mask = mask; 13 14 struct epoll_event ee; 15 ee.data.fd = fd; 16 ee.events = 0; 17 if (mask & EVENT_READ) ee.events |= EPOLLIN; 18 if (mask & EVENT_WRITE) ee.events |= EPOLLOUT; 19 20 int32_t op; 21 if (mask != EVENT_NONE) { 22 op = EPOLL_CTL_MOD; 23 } else { 24 op = EPOLL_CTL_DEL; 25 // _events[fd].Reset(); 26 } 27 28 int32_t ret=epoll_ctl(_epfd, op, fd, &ee); 29 if(EPOLL_CTL_DEL==op&&0==ret){ 30 if(SocketTool::set_nonblock(fd, 0)!=0){ 31 BGCC_WARN("bgcc", "Del fd=%d From Epoll Set To Block Failed(%d)", 32 fd, BgccGetLastError()); 33 } 34 } 35 return ret; 36 }
添加和刪除事件,最終都是對epoll接口的封裝,
接着看下_events[] 中的元素類型 Event, 其定義以下
1 /** 2 * @brief 封裝事件及事件處理函數 3 * @see 4 * @note 5 * @author liuxupeng(liuxupeng@baidu.com) 6 * @date 2012年06月14日 20時00分59秒 7 */ 8 struct Event { 9 /** 10 * @brief Event 事件類 11 * @see 12 * @note 13 * @author liuxupeng(liuxupeng@baidu.com) 14 * @date 2012年06月14日 20時04分59秒 15 */ 16 Event() { 17 Reset(); 18 } 19 20 void Reset(){ 21 fd=INVALID_SOCKET; 22 mask=EVENT_NONE; 23 read_cb=NULL; 24 write_cb=NULL; 25 error_cb=NULL; 26 read_cb_arg=NULL; 27 write_cb_arg=NULL; 28 error_cb_arg=NULL; 29 } 30 31 int32_t fd; /** 事件對應的fd*/ 32 uint32_t mask; /** 事件標識位*/ 33 callback_func_t read_cb; /** 讀回調*/ 34 callback_func_t write_cb; /** 寫回調*/ 35 callback_func_t error_cb; /** 錯誤回調*/ 36 void* read_cb_arg; 37 void* write_cb_arg; 38 void* error_cb_arg; 39 };
Event對象包含了事件fd、事件類型、以及回調函數、回調函數的參數, 這裏有read_cb, write_cb, error_cb
回到EpollServer::serve(), 它調用了EpollServer::init()