BGCC源代碼(一)

本文從總體上介紹下百度的通用通訊組件, 須要下載源碼的同窗,請點這裏 http://bgcc.baidu.com/node

第一部分:服務端邏輯數組

1.線程池,在服務啓動時建立線程,socket

2.一個線程池對應一個同步的任務隊列 ,函數

3.線程池中的每一個線程,初始時,都阻塞在任務隊列, 等待喚醒,oop

4.主線程進入事件循環,監聽事件post

5.遇到讀事件,調用添加事件時指定的回調函數, 默認爲DataCallback, 若是是鏈接請求,調用AcceptDataCallbackui

6.DataCallback解析包頭和包體, 獲得processor名稱,  找到對應的processor, this

7.把processor、包體、序列化對象封裝到成一個任務,push到同步隊列中,並觸發信號量 sem_postspa

8.線程池中的線程醒來, 調用processor處理task任務, 從包體中讀出指望執行的函數名, 執行該函數(業務實現)線程

9.業務在該函數中實現功能後, 還需調用序列化類的writeMessageBegin, writeMessageEnd, 進而調用socket類中的write, 並最終調用send返回處理結果給客戶端。

  這裏沒有使用EpollOut事件, 當發送緩衝區滿時, 會返回-1,errno=EAGAIN,  此時bgcc直接返回失敗了, 沒有繼續處理。  這裏若是了註冊epoll_out事件,就能夠處理了。

10.從新註冊事件

 

看一個服務端使用的例子

 1 Server* server;
 2 
 3 void* server_func(const bool* isstopped, void*) {
 4     SharedPointer<IProcessor> xp(
 5             new MathProcessor(
 6                 SharedPointer<Math>(
 7                     new MathImpl)));
 8 
 9     ServiceManager sm;
10     sm.add_service(xp);
11 
12     ThreadPool tp;
13     tp.init(10);
14 
15     server = new Server(&sm, &tp, 8321);
16     if (0 != server->serve()) {
17         return 0;
18     }
19     return NULL;
20 }
21 
22 int main(int argc, char* argv[]) {
23     log_open("server.conf");
24     Thread t(server_func);
25     t.start();
26 
27     return 0;
28 }

例子中, Server是bgcc::EpollServer類。定義以下

 1 #ifndef _BGCC2_EPOLL_SERVER_H_
 2 #define _BGCC2_EPOLL_SERVER_H_
 3 
 4 #ifndef _WIN32
 5 
 6 #include "bgcc_common.h"
 7 #include "server.h"
 8 #include "event_poll.h"
 9 #include "mempool.h"
10 #include "service_manager.h"
11 #include "thread_pool.h"
12 #include "server_task.h"
13 
14 namespace bgcc {
15 
16     class EpollServer : public IServer {
17     public:
18         EpollServer(ServiceManager* service_manager,
19                 ThreadPool* thread_pool,
20                 uint16_t port,
21                 const std::string& node = "");
22 
23         virtual ~EpollServer() {
24         }
25 
26         virtual int32_t init();
27         virtual int32_t serve();
28         virtual int32_t stop();
29 
30         ServiceManager* get_service_manager();
31         ThreadPool* get_thread_pool();
32 
33         TaskAsso Tasks[MAXNFD];
34 
35     protected:
36         enum state_t {
37             S_UNINIT,
38             S_INIT,
39             S_SERVE,
40             S_STOPPED
41         };
42         int32_t socket_init();
43 
44         ServiceManager* _service_manager;
45         ThreadPool* _thread_pool;
46         uint16_t _port;
47         state_t _state;
48         int32_t _listenfd;
49         EventLoop _loop;
50         std::string _node;
51     };
52 
53     typedef EpollServer Server;
54 }
55 
56 #endif // _WIN32
57 
58 #endif // _BGCC2_EPOLL_SERVER_H_

咱們主要關心EpollServer::serve(), 其實現以下

 1     int32_t EpollServer::serve() {
 2 
 3         int32_t ret;
 4         if (0 != (ret = init())) {
 5             return ret;
 6         }
 7 
 8         if (S_INIT != _state) {
 9             BGCC_NOTICE("bgcc", "Need to call `init' before `serve' on Instance of EpollServer\n");
10             return E_BGCC_SERVER_NEED_INIT;
11         }
12 
13         _listenfd = socket_init();
14         if (INVALID_SOCKET == _listenfd) {
15             return E_BGCC_SERVER_CREATE_LISTENFD_FAILED;
16         }
17 
18         Event e;
19         EventCallback::PrepareEvent(e, _listenfd, const_cast<EpollServer*>(this));
20         e.read_cb = EventCallback::AcceptCallback;
21         _loop.add_event(&e);
22 
23         BGCC_NOTICE("bgcc", "fd=%d Is Begin To Wait for accept new client On %s:%d", 
24                 _listenfd, (_node.empty()?"*":_node.c_str()), _port);
25         _state = S_SERVE;
26         
27         return _loop.loop();
28     }

其中_loop類型是bgcc::EventLoop, 其定義以下

 1     /**
 2      * @brief 事件循環
 3      * @see
 4      * @note
 5      * @author  liuxupeng(liuxupeng@baidu.com)
 6      * @date    2012年06月14日 20時05分36秒
 7      */
 8     class EventLoop {
 9     public:
10         /**
11          * @brief EventLoop 構造函數
12          * @see
13          * @note
14          * @author  liuxupeng(liuxupeng@baidu.com)
15          * @date    2012年06月14日 20時19分50秒
16          */
17         EventLoop();
18 
19         /**
20          * @brief create 建立內部epoll
21          *
22          * @return 成功返回0
23          * @see
24          * @note
25          * @author  liuxupeng(liuxupeng@baidu.com)
26          * @date    2012年06月14日 20時19分59秒
27          */
28         int32_t create();
29         int32_t destroy();
30 
31         int32_t add_event(Event* event);
32         int32_t del_event(Event* event);
33 
34         int32_t loop();
35         int32_t unloop();
36         bool is_stopped() const;
37     private:
38         enum state_t {
39             S_UNINIT,
40             S_INIT,
41             S_LOOP,
42             S_STOP,
43             S_DESTROYED
44         };
45     private:
46         state_t _state;
47         volatile bool _stopped;
48         int32_t _epfd;
49         struct epoll_event _ep_events[MAXNFD];
50         Event _events[MAXNFD];
51     };
52 }

其中loop()實現以下  

 1     int32_t EventLoop::loop() {
 2         if (S_INIT != _state) {
 3             return -1;
 4         }
 5         _state = S_LOOP;
 6         _stopped = false;
 7 
 8         while (!_stopped) {
 9             int32_t numevents;
10             while((numevents=epoll_wait(_epfd, _ep_events, MAXNFD, 200))==SOCKET_ERROR&&EINTR==errno);
11 
12             if (numevents > 0) {
13                 int j;
14 
15                 for (j = 0; j < numevents; j++) {
16                     struct epoll_event* e = _ep_events + j;
17                     int32_t fd = e->data.fd;
18 
19                     if (e->events & EPOLLIN) {
20                         if (_events[fd].read_cb)
21                             (_events[fd].read_cb)(this, fd, _events[fd].read_cb_arg);
22                     }
23                     if (e->events & EPOLLOUT) {
24                         if (_events[fd].write_cb)
25                             (_events[fd].write_cb)(this, fd, _events[fd].write_cb_arg);
26                     }
27                     if (e->events & EPOLLERR) {
28                         if (_events[fd].error_cb)
29                             (_events[fd].error_cb)(this, fd, _events[fd].error_cb_arg);
30                     }
31                 }
32             }
33         }
34         _state = S_STOP;
35         return 0;  

 從實現上看,  事件循環使用了epoll_wait,EventLoop類中有一個事件數組成員_events[], 全部fd事件都保存在該數組中。 增刪fd事件,也都須要操做該數組。

添加事件

 1     int32_t EventLoop::add_event(Event* event) {
 2         if (S_INIT != _state && S_LOOP != _state) {
 3             return -1;
 4         }
 5 
 6         if (NULL == event) {
 7             return 0;
 8         }
 9 
10         int32_t fd = event->fd;
11         uint32_t mask = event->mask;
12 
13         int32_t op;
14         if (EVENT_NONE == _events[fd].mask) {
15             op = EPOLL_CTL_ADD;
16         }
17         else {
18             op = EPOLL_CTL_MOD;
19             mask |= _events[fd].mask;
20         }
21         _events[fd].mask = mask;
22 
23         struct epoll_event ee;
24 
25         // To fix valgrind error: Syscall param epoll_ctl(event) points to uninitialised byte(s)
26         memset(&ee.data, 0, sizeof(ee.data));
27 
28         ee.data.fd = fd;
29         ee.events = 0;
30 
31         if (mask & EVENT_READ) {
32             ee.events |= EPOLLIN;
33             _events[fd].read_cb = event->read_cb;
34             _events[fd].read_cb_arg = event->read_cb_arg;
35         }
36 
37         if (mask & EVENT_WRITE) {
38             ee.events |= EPOLLOUT;
39             _events[fd].write_cb = event->write_cb;
40             _events[fd].write_cb_arg = event->write_cb_arg;
41         }
42 
43         if (mask & EVENT_ERROR) {
44             ee.events |= EPOLLERR;
45             _events[fd].error_cb = event->error_cb;
46             _events[fd].error_cb_arg = event->error_cb_arg;
47         }
48 
49         if(SocketTool::set_nonblock(fd, 1)!=0){
50             BGCC_WARN("bgcc", "Before Add fd=%d to Epoll Set To Nonblock Failed(%d)",
51                     fd, BgccGetLastError());
52             return -1;
53         }
54 
55         int32_t ret=epoll_ctl(_epfd, op, fd, &ee);
56         if(0!=ret&&EPOLL_CTL_ADD==op){
57             if(SocketTool::set_nonblock(fd, 0)!=0){
58                 BGCC_WARN("bgcc", "Add fd=%d to Epoll Failed Set To Block Failed(%d)",
59                     fd, BgccGetLastError());
60             }
61         }
62 
63         return ret;
64     }

刪除事件

 1    int32_t EventLoop::del_event(Event* event) {
 2         if (S_INIT != _state && S_LOOP != _state) {
 3             return -1;
 4         }
 5 
 6         if (NULL == event) {
 7             return 0;
 8         }
 9 
10         int32_t fd = event->fd;
11         uint32_t mask = _events[fd].mask & (~event->mask);
12         _events[fd].mask = mask;
13 
14         struct epoll_event ee;
15         ee.data.fd = fd;
16         ee.events = 0;
17         if (mask & EVENT_READ) ee.events |= EPOLLIN;
18         if (mask & EVENT_WRITE) ee.events |= EPOLLOUT;
19 
20         int32_t op;
21         if (mask != EVENT_NONE) {
22             op = EPOLL_CTL_MOD;
23         } else {
24             op = EPOLL_CTL_DEL;
25 //            _events[fd].Reset();
26         }
27 
28         int32_t ret=epoll_ctl(_epfd, op, fd, &ee);
29         if(EPOLL_CTL_DEL==op&&0==ret){
30             if(SocketTool::set_nonblock(fd, 0)!=0){
31                 BGCC_WARN("bgcc", "Del fd=%d From Epoll Set To Block Failed(%d)",
32                         fd, BgccGetLastError());
33             }
34         }
35         return ret;
36     }

添加和刪除事件,最終都是對epoll接口的封裝,

接着看下_events[] 中的元素類型 Event, 其定義以下

 1    /**
 2      * @brief 封裝事件及事件處理函數
 3      * @see
 4      * @note
 5      * @author  liuxupeng(liuxupeng@baidu.com)
 6      * @date    2012年06月14日 20時00分59秒
 7      */
 8     struct Event {
 9         /**
10          * @brief Event 事件類
11          * @see
12          * @note
13          * @author  liuxupeng(liuxupeng@baidu.com)
14          * @date    2012年06月14日 20時04分59秒
15          */
16         Event() {
17             Reset();
18             }
19 
20         void Reset(){
21             fd=INVALID_SOCKET;
22             mask=EVENT_NONE;
23             read_cb=NULL;
24             write_cb=NULL;
25             error_cb=NULL;    
26             read_cb_arg=NULL;
27             write_cb_arg=NULL;
28             error_cb_arg=NULL;    
29         }
30 
31         int32_t fd; /** 事件對應的fd*/
32         uint32_t mask;  /** 事件標識位*/
33         callback_func_t read_cb;    /** 讀回調*/
34         callback_func_t write_cb;   /** 寫回調*/
35         callback_func_t error_cb;   /** 錯誤回調*/
36         void* read_cb_arg;
37         void* write_cb_arg;
38         void* error_cb_arg;
39     };

Event對象包含了事件fd、事件類型、以及回調函數、回調函數的參數, 這裏有read_cb, write_cb, error_cb

回到EpollServer::serve(), 它調用了EpollServer::init()

相關文章
相關標籤/搜索