基於kbengine 0.4.20

前言:

v0.0.1 2015-04-10 譽小痕(shawhen2012@hotmail.com)php

v0.0.2 2015-04-12 譽小痕(shawhen2012@hotmail.comc++

changelog請查看: http://bbs.kbengine.org/forum.php?mod=viewthread&tid=247&extra=page%3D1redis

新增內容以 **New in version x.x.x** 標示api

基於kbengine 0.4.20安全

(如今和這份文檔一塊兒的有一個kbengine主幹活動圖,建議先看看那個圖,參照圖中的流程而後對照本文檔理解流程的實現細節。活動圖可在changelog帖子中找到下載地址)網絡

MMOG服務端是一種高品質的工程項目,品讀開源的kbe是一種樂趣。本文檔我帶童鞋們一塊兒領略一下。囿於我知識面和經驗方面所限,文中所述之處不免有錯誤存在,還請讀童鞋們睜大慧眼,若是你發現錯誤,能夠 電郵至shawhen2012@hotmail.com。(由於我我的懶散或者時間倉促的關係,這個文檔的排版有點小亂。。。)架構

其餘牛逼哄哄的前言就不說了。app

從理論上來說,咱們閱讀一份源代碼,首先應該基於現有的文檔從總體上把握項目的架構以後再庖丁解牛通常地細分閱讀,不過在我寫這個文檔的如今,我暫時沒發現這樣的文檔,因此我就按照我本身的閱讀順序從而編排這個文檔的內容。異步

l 概要:

從已有的文檔可知(我得假設你已經大體看完了kbe官網的現有文檔),kbe由幾個組件共同協做,因此咱們先看看組件們:socket

各個組件被設計爲獨立的app,使用網絡通訊進行協做。C++程序天然是從main函數開始。

1. main函數的戲法:

看起來彷佛全部的組件都有一個這樣的宏(KBENGINE_MAIN)來包裹main函數

int KBENGINE_MAIN(int argc, char* argv[])

{

ENGINE_COMPONENT_INFO& info = g_kbeSrvConfig.getXXX();

return kbeMainT<XXX>(argc, argv, YYY, info.externalPorts_min,

info.externalPorts_max, info.externalInterface, 0, info.internalInterface);

}

這個宏展開是這樣子:

kbeMain(int argc, char* argv[]); \

int main(int argc, char* argv[]) \

{ \

loadConfig(); \

g_componentID = genUUID64(); \

parseMainCommandArgs(argc, argv); \

char dumpname[MAX_BUF] = {0}; \

kbe_snprintf(dumpname, MAX_BUF, "%"PRAppID, g_componentID); \

KBEngine::exception::installCrashHandler(1, dumpname); \

int retcode = -1; \

THREAD_TRY_EXECUTION; \

retcode = kbeMain(argc, argv); \

THREAD_HANDLE_CRASH; \

return retcode; \

} \

稍微整理一下以後main函數看起來很像是這個樣子:

int kbeMain(int argc, char* argv[]);

int main(int argc, char* argv[])

{

loadConfig();

g_componentID = genUUID64();

parseMainCommandArgs(argc, argv);

char dumpname[MAX_BUF] = {0};

kbe_snprintf(dumpname, MAX_BUF, "%"PRAppID, g_componentID);

KBEngine::exception::installCrashHandler(1, dumpname);

int retcode = -1;

THREAD_TRY_EXECUTION;

retcode = kbeMain(argc, argv);

THREAD_HANDLE_CRASH;

return (retcode);

}

int kbeMain(int argc, char* argv[])

{

ENGINE_COMPONENT_INFO& info = g_kbeSrvConfig.getXXX();

return kbeMainT<XXX>(argc, argv, YYY, info.externalPorts_min, info.externalPorts_max, info.externalInterface, 0, info.internalInterface);

}

嗯。。。基本能夠理解爲每一個組件的main函數流程都是同樣的,只是在特化kbeMainT時所給參數不同。

a) 載入配置文件:

咱們跟着main函數的loadConfig進去看看(kbemain.h)

inline void loadConfig()

{

Resmgr::getSingleton().initialize();

// "../../res/server/kbengine_defs.xml"

g_kbeSrvConfig.loadConfig("server/kbengine_defs.xml");

// "../../../assets/res/server/kbengine.xml"

g_kbeSrvConfig.loadConfig("server/kbengine.xml");

}

在serverconfig.h中能夠看到這樣的代碼:

#define g_kbeSrvConfig ServerConfig::getSingleton()

Resmgr和ServerConfig單例的實現:

Resmgr和ServerConfig,這兩個類都是被搞成單例了的(kbe的單例不算太嚴格的單例,線程安全,編譯時阻斷都沒法知足,咱們先不細究),

Resmgr是資源管理器,在resmgr.h/.cpp中聲明和定義,在FixedMessages類(fixed_messages.h/.cpp)的構造函數中被new出來。(有點小隱晦,我是調試了一下才跟到的。。。)

過程是這樣,在各個組件的xxx_interface.cpp中,有這樣的代碼:(摘自loginapp)

#include "loginapp_interface.h"

#define DEFINE_IN_INTERFACE

#define LOGINAPP

#include "loginapp_interface.h"

xxx_interface.h中,有這樣的代碼:

#if defined(DEFINE_IN_INTERFACE)

#undef KBE_LOGINAPP_INTERFACE_H

#endif

#ifndef KBE_LOGINAPP_INTERFACE_H

#define KBE_LOGINAPP_INTERFACE_H

大意能夠理解爲在xxx_interface.cpp中經過在包含xxx_interface.h先後定義DEFINE_IN_INTERFACE和LOGINAPP宏,使得xxx_interface.h被包含了兩次(但產生的代碼確實不一樣的),從而對xxx_interface.h內的一些變量實現了聲明(第一次)和定義(第二次)。

在xxx_interface.h中有這樣一句:

NETWORK_INTERFACE_DECLARE_BEGIN(LoginappInterface)

展開就是:

// 定義接口域名稱

#ifndef DEFINE_IN_INTERFACE

#define NETWORK_INTERFACE_DECLARE_BEGIN(INAME) \

namespace INAME \

{ \

extern Network::MessageHandlers messageHandlers; \

#else

#define NETWORK_INTERFACE_DECLARE_BEGIN(INAME) \

namespace INAME \

{ \

Network::MessageHandlers messageHandlers; \

#endif

#define NETWORK_INTERFACE_DECLARE_END() }

在第一次包含xxx_interface.h的時候就是extern Network….這樣的外部全局變量引用聲明,第二次包含的時候就是Network::M…..這樣的全局變量的定義了。

在Network::MessageHandles的構造函數(message_handler.cpp)中,有:

MessageHandlers::MessageHandlers():

msgHandlers_(),

msgID_(1),

exposedMessages_()

{

g_fm = Network::FixedMessages::getSingletonPtr();

if(g_fm == NULL)

g_fm = new Network::FixedMessages;

Network::FixedMessages::getSingleton().loadConfig("server/messages_fixed.xml");

messageHandlers().push_back(this);

}

至此,Network::FixedMessages類被有機會實例化,構造函數中有:

FixedMessages::FixedMessages():

_infomap(),

_loaded(false)

{

new Resmgr();

Resmgr::getSingleton().initialize();

}

ServerConfig在各個組件(好比loginapp在loginapp.cpp)的類的定義文件中實例化。

ServerConfig g_serverConfig;

KBE_SINGLETON_INIT(Loginapp);

咱們再次回到loadConfig,裏面的函數小跟一下就能讀明白了,咱們繼續跟進主幹流程。

b) 生成組件的隨機id:

下面的語句給組件生成一個隨機id

g_componentID = genUUID64();

c) 解析main函數參數:

下面的語句解析主函數的參數:(好比設定指定的組件id,以及gus,我也還沒了解到gus搞啥用的。。。不影響咱們閱讀總體流程,不細究)

parseMainCommandArgs(argc, argv);

d) crash處理:

下面的語句進行crash處理:(不影響咱們閱讀總體流程,不細究)

char dumpname[MAX_BUF] = {0}; \

kbe_snprintf(dumpname, MAX_BUF, "%"PRAppID, g_componentID); \

KBEngine::exception::installCrashHandler(1, dumpname);

e) 轉向kbeMain:

下面的語句就是一個標準的main函數轉向:

int retcode = -1; \

THREAD_TRY_EXECUTION; \

retcode = kbeMain(argc, argv); \

THREAD_HANDLE_CRASH; \

return retcode;

在kbemain.h中能夠看到KBENGINE_MAIN針對不一樣的平臺有不一樣的定義。。。其實就是非win32平臺沒有crash處理。

2. kbeMain:

kbeMain是在各個組件的main.cpp中定義的:(摘自loginapp)

{

ENGINE_COMPONENT_INFO& info = g_kbeSrvConfig.getLoginApp();

return kbeMainT<Loginapp>(argc, argv, LOGINAPP_TYPE, info.externalPorts_min,

info.externalPorts_max, info.externalInterface, 0, info.internalInterface);

}

a) 轉向kbeMainT:

第一句就是獲取到各個組件相關的信息,而後流程再次轉向,到了kbeMainT函數。

3. kbeMainT的戲法:

kbeMainT是一個模板函數,根據各個組件的主類的不一樣,產生不一樣的代碼(。。。我是否是有點過了)

前面的一些語句咱們暫且不看(後面會須要看的),有設置環境變量的,設置密鑰對的。能夠看到kbeMainT的流程以各個組件的實例的run接口而終止。

a) 組件實例的run接口:

大體看了幾個組件的run函數,有的是調用ServerApp的run接口,有的是直接調用dispatcher的processXXX接口,總的來說,就是依靠事件派發器來進行工做了。因此咱們有必要去看看kbe的事件派發器了。

l loginapp組件的run接口:

bool Loginapp::run()

{

return ServerApp::run();

}

l machine組件的run接口:

bool Machine::run()

{

bool ret = true;

while(!this->dispatcher().hasBreakProcessing())

{

threadPool_.onMainThreadTick();

this->dispatcher().processOnce(false);

networkInterface().processChannels(&MachineInterface::messageHandlers);

KBEngine::sleep(100);

};

return ret;

}

l baseapp組件的run接口:

bool Baseapp::run()

{

return EntityApp<Base>::run();

}

實際上EntityApp也是繼承自ServerApp

b) 事件派發器:

在kbeMainT中能夠看到:

Network::EventDispatcher dispatcher;

這裏構造了事件派發器,咱們得看看它所謂的process到底幹了些什麼。

i. 事件派發器的工做:

在event_dispatcher.cpp能夠看到:

int EventDispatcher::processOnce(bool shouldIdle)

{

if(breakProcessing_ != EVENT_DISPATCHER_STATUS_BREAK_PROCESSING)

breakProcessing_ = EVENT_DISPATCHER_STATUS_RUNNING;

this->processTasks();

if(breakProcessing_ != EVENT_DISPATCHER_STATUS_BREAK_PROCESSING){

this->processTimers();

}

this->processStats();

if(breakProcessing_ != EVENT_DISPATCHER_STATUS_BREAK_PROCESSING){

return this->processNetwork(shouldIdle);

}

return 0;

}

這裏咱們有必要明白一些常識,對於從select,poll,epoll,iocp,kqueue的api,到boost的asio,ace,libevent,libev的庫,這些網絡i/o複用模型都是爲了讓咱們監聽nic上的事件,而後提交給相應的handler處理。他們除了工做方式致使的性能和編碼方式有區別外,還有回調的時機的區別,iocp是對於資源的指定動做完成後回調,其餘的unix族(kqueue是bsd的)接口都是資源對於指定動做準備好時回調。

因此咱們要解讀這個事件派發器得找到兩個重點,如何產生事件,如何交付到應用處理。

EventDispatcher的processTasks能夠稍微跟一下,發現它是處理全部「任務」的一個接口,但什麼是任務,咱們還不知道,不繼續跟了。

processTimers, processStats也暫時不跟了。。。

ii. 事件派發器的網絡事件:

咱們跟到processNetwork裏面去:

int EventDispatcher::processNetwork(bool shouldIdle)

{

double maxWait = shouldIdle ? this->calculateWait() : 0.0;

return pPoller_->processPendingEvents(maxWait);

}

這個pPoller是一個EventPoller的實例。EventPoller是一個抽象類(event_poller.h/.cpp),它目前有兩個派生子類,SelectorPoller(poller_select.h/.cpp)和EpollPoller(poller_epoll.h/.cpp),顧名思義,它們分別是利用select和epoll系統api進行異步i/o工做的。(在win32上面若是使用iocp的話性能應該是能夠和epoll匹敵的,不過因爲iocp和epoll工做方式不同,我估計這是kbe裏面沒有在win32上使用iocp的緣由,若是要將這兩個工做方式抽象爲一種,工做量估計比kbe自己小不了多少,若是是個人話,我會直接使用asio或者libevent,不過kbe做者爲啥沒用,可能就像redis的做者的解釋同樣,雖然我以爲那是很操蛋的解釋,使用現有庫的好處是顯而易見的,若是碰巧像我這種對asio或者libevent有經驗的,那這裏kbe的網絡底層我能夠一掠而過,我也能夠把更多的精力放在這個項目自己要解決的問題上。繼續發牢騷可能)

select和epoll的工做方式一致,因此咱們任選一個閱讀都行,我傾向於使用epoll。

int EpollPoller::processPendingEvents(double maxWait)

{

const int MAX_EVENTS = 10;

struct epoll_event events[ MAX_EVENTS ];

int maxWaitInMilliseconds = int(ceil(maxWait * 1000));

#if ENABLE_WATCHERS

g_idleProfile.start();

#else

uint64 startTime = timestamp();

#endif

KBEConcurrency::onStartMainThreadIdling();

int nfds = epoll_wait(epfd_, events, MAX_EVENTS, maxWaitInMilliseconds);

KBEConcurrency::onEndMainThreadIdling();

#if ENABLE_WATCHERS

g_idleProfile.stop();

spareTime_ += g_idleProfile.lastTime_;

#else

spareTime_ += timestamp() - startTime;

#endif

for (int i = 0; i < nfds; ++i)

{

if (events[i].events & (EPOLLERR|EPOLLHUP))

{

this->triggerError(events[i].data.fd);

}

else

{

if (events[i].events & EPOLLIN)

{

this->triggerRead(events[i].data.fd);

}

if (events[i].events & EPOLLOUT)

{

this->triggerWrite(events[i].data.fd);

}

}

}

return nfds;

}

大意就是對Poller內註冊的文件描述符進行事件等待,而後對事件進行區分以後觸發讀(triggerRead)或者寫(triggerWrite)的接口。咱們跟一下這兩個接口:

iii. 網絡可讀事件:

bool EventPoller::triggerRead(int fd)

{

FDReadHandlers::iterator iter = fdReadHandlers_.find(fd);

if (iter == fdReadHandlers_.end())

{

return false;

}

iter->second->handleInputNotification(fd);

return true;

}

iv. 網絡可寫事件:

//-------------------------------------------------------------------------------------

bool EventPoller::triggerWrite(int fd)

{

FDWriteHandlers::iterator iter = fdWriteHandlers_.find(fd);

if (iter == fdWriteHandlers_.end())

{

return false;

}

iter->second->handleOutputNotification(fd);

return true;

}

v. 事件處理:

能夠看到就是對註冊的文件描述符查找相應的輸入輸出處理接口(這裏也指導了咱們下一步的閱讀方向,找到註冊文件描述符的地方)。至此咱們找到了事件如何產生。(其實我一直不習慣poll流的模型,我比較喜歡iocp的模型,雖然理論上來說poll會給予應用層更多的變化點。打個不太形象的比喻,你讓poll或者iocp給你準備三輛車。你給poll說完,poll稍後會告訴你:老爺,車備好了,在車庫裏,但具體有幾輛我也不清楚,您自個去看看。你給iocp三把鑰匙,iocp稍後會告訴你:老爺,三輛車準備好了,就停在門外。)

爲了找到註冊文件描述符和事件處理接口的流程,咱們再次回到kbeMainT。映入眼簾的是這行代碼:

Network::NetworkInterface networkInterface(&dispatcher,

extlisteningPort_min, extlisteningPort_max, extlisteningInterface,

channelCommon.extReadBufferSize, channelCommon.extWriteBufferSize,

(intlisteningPort != -1) ? htons(intlisteningPort) : -1, intlisteningInterface,

channelCommon.intReadBufferSize, channelCommon.intWriteBufferSize);

Network::NetworkInterface的構造函數:

NetworkInterface::NetworkInterface(Network::EventDispatcher * pDispatcher,

int32 extlisteningPort_min, int32 extlisteningPort_max, const char * extlisteningInterface,

uint32 extrbuffer, uint32 extwbuffer,

int32 intlisteningPort, const char * intlisteningInterface,

uint32 intrbuffer, uint32 intwbuffer):

extEndpoint_(),

intEndpoint_(),

channelMap_(),

pDispatcher_(pDispatcher),

pExtensionData_(NULL),

pExtListenerReceiver_(NULL),

pIntListenerReceiver_(NULL),

pDelayedChannels_(new DelayedChannels()),

pChannelTimeOutHandler_(NULL),

pChannelDeregisterHandler_(NULL),

isExternal_(extlisteningPort_min != -1),

numExtChannels_(0)

{

if(isExternal())

{

pExtListenerReceiver_ = new ListenerReceiver(extEndpoint_, Channel::EXTERNAL, *this);

this->recreateListeningSocket("EXTERNAL", htons(extlisteningPort_min), htons(extlisteningPort_max),

extlisteningInterface, &extEndpoint_, pExtListenerReceiver_, extrbuffer, extwbuffer);

// 若是配置了對外端口範圍, 若是範圍太小這裏extEndpoint_可能沒有端口可用了

if(extlisteningPort_min != -1)

{

KBE_ASSERT(extEndpoint_.good() && "Channel::EXTERNAL: no available port, "

"please check for kbengine_defs.xml!\n");

}

}

if(intlisteningPort != -1)

{

pIntListenerReceiver_ = new ListenerReceiver(intEndpoint_, Channel::INTERNAL, *this);

this->recreateListeningSocket("INTERNAL", intlisteningPort, intlisteningPort,

intlisteningInterface, &intEndpoint_, pIntListenerReceiver_, intrbuffer, intwbuffer);

}

KBE_ASSERT(good() && "NetworkInterface::NetworkInterface: no available port, "

"please check for kbengine_defs.xml!\n");

pDelayedChannels_->init(this->dispatcher(), this);

}

在recreateListeningSocket接口的代碼中:

bool NetworkInterface::recreateListeningSocket(const char* pEndPointName, uint16 listeningPort_min, uint16 listeningPort_max,

const char * listeningInterface, EndPoint* pEP, ListenerReceiver* pLR, uint32 rbuffer,

uint32 wbuffer)

{

KBE_ASSERT(listeningInterface && pEP && pLR);

if (pEP->good())

{

this->dispatcher().deregisterReadFileDescriptor( *pEP );

pEP->close();

}

Address address;

address.ip = 0;

address.port = 0;

pEP->socket(SOCK_STREAM);

if (!pEP->good())

{

ERROR_MSG(fmt::format("NetworkInterface::recreateListeningSocket({}): couldn't create a socket\n",

pEndPointName));

return false;

}

/*

pEP->setreuseaddr(true);

*/

this->dispatcher().registerReadFileDescriptor(*pEP, pLR);

找到了事件派發器註冊文件描述符的地方,而註冊的事件處理接口也就是這個ListenerReceiver(listener_receiver.h/.cpp)。

跟到ListenerReceiver的handleInputNotification接口:

int ListenerReceiver::handleInputNotification(int fd)

{

int tickcount = 0;

while(tickcount ++ < 256)

{

EndPoint* pNewEndPoint = endpoint_.accept();

if(pNewEndPoint == NULL){

if(tickcount == 1)

{

WARNING_MSG(fmt::format("PacketReceiver::handleInputNotification: accept endpoint({}) {}!\n",

fd, kbe_strerror()));

this->dispatcher().errorReporter().reportException(

REASON_GENERAL_NETWORK);

}

break;

}

else

{

Channel* pChannel = Network::Channel::ObjPool().createObject();

bool ret = pChannel->initialize(networkInterface_, pNewEndPoint, traits_);

if(!ret)

{

ERROR_MSG(fmt::format("ListenerReceiver::handleInputNotification: initialize({}) is failed!\n",

pChannel->c_str()));

pChannel->destroy();

Network::Channel::ObjPool().reclaimObject(pChannel);

return 0;

}

if(!networkInterface_.registerChannel(pChannel))

{

ERROR_MSG(fmt::format("ListenerReceiver::handleInputNotification: registerChannel({}) is failed!\n",

pChannel->c_str()));

pChannel->destroy();

Network::Channel::ObjPool().reclaimObject(pChannel);

}

}

}

return 0;

}

若是你手工擼過epoll就會知道在監聽套接口上若是有可讀事件,則表明着有新的鏈接進來,

一般咱們就是使用accept來接收這個新鏈接,而後註冊到epoll上,可是kbe在這個ListenerReceiver中不是這麼作的,它只是關心監聽套接口的可讀事件,而後將新的鏈接封裝到它所謂的一個Channel中去了。(kbe用一個EndPoint來表徵一個socket終端)。因此咱們再跟進這個Channel看看。

bool Channel::initialize(NetworkInterface & networkInterface,

const EndPoint * pEndPoint,

Traits traits,

ProtocolType pt,

PacketFilterPtr pFilter,

ChannelID id)

{

id_ = id;

protocoltype_ = pt;

traits_ = traits;

pFilter_ = pFilter;

pNetworkInterface_ = &networkInterface;

this->pEndPoint(pEndPoint);

KBE_ASSERT(pNetworkInterface_ != NULL);

KBE_ASSERT(pEndPoint_ != NULL);

if(protocoltype_ == PROTOCOL_TCP)

{

if(pPacketReceiver_)

{

if(pPacketReceiver_->type() == PacketReceiver::UDP_PACKET_RECEIVER)

{

SAFE_RELEASE(pPacketReceiver_);

pPacketReceiver_ = new TCPPacketReceiver(*pEndPoint_, *pNetworkInterface_);

}

}

else

{

pPacketReceiver_ = new TCPPacketReceiver(*pEndPoint_, *pNetworkInterface_);

}

KBE_ASSERT(pPacketReceiver_->type() == PacketReceiver::TCP_PACKET_RECEIVER);

// UDP不須要註冊描述符

pNetworkInterface_->dispatcher().registerReadFileDescriptor(*pEndPoint_, pPacketReceiver_);

// 須要發送數據時再註冊

// pPacketSender_ = new TCPPacketSender(*pEndPoint_, *pNetworkInterface_);

// pNetworkInterface_->dispatcher().registerWriteFileDescriptor(*pEndPoint_, pPacketSender_);

}

else

{

if(pPacketReceiver_)

{

if(pPacketReceiver_->type() == PacketReceiver::TCP_PACKET_RECEIVER)

{

SAFE_RELEASE(pPacketReceiver_);

pPacketReceiver_ = new UDPPacketReceiver(*pEndPoint_, *pNetworkInterface_);

}

}

else

{

pPacketReceiver_ = new UDPPacketReceiver(*pEndPoint_, *pNetworkInterface_);

}

KBE_ASSERT(pPacketReceiver_->type() == PacketReceiver::UDP_PACKET_RECEIVER);

}

pPacketReceiver_->pEndPoint(pEndPoint_);

if(pPacketSender_)

pPacketSender_->pEndPoint(pEndPoint_);

startInactivityDetection((traits_ == INTERNAL) ? g_channelInternalTimeout :

g_channelExternalTimeout,

(traits_ == INTERNAL) ? g_channelInternalTimeout / 2.f:

g_channelExternalTimeout / 2.f);

return true;

}

能夠看到在這個initialize接口中,新的EndPoint仍是註冊到了事件派發器中,只是處理的方式變爲了PacketReceiver(雖然實現了UDP和TCP兩種PacketReceiver,不過目前彷佛只有TCPPacketReceiver被用到了)。

看看PacketReceiver的handleInputNotification:

int PacketReceiver::handleInputNotification(int fd)

{

if (this->processRecv(/*expectingPacket:*/true))

{

while (this->processRecv(/*expectingPacket:*/false))

{

/* pass */;

}

}

return 0;

}

咱們跟進processRecv(tcp_packet_receiver.cpp):

bool TCPPacketReceiver::processRecv(bool expectingPacket)

{

Channel* pChannel = getChannel();

KBE_ASSERT(pChannel != NULL);

if(pChannel->isCondemn())

{

return false;

}

TCPPacket* pReceiveWindow = TCPPacket::ObjPool().createObject();

int len = pReceiveWindow->recvFromEndPoint(*pEndpoint_);

if (len < 0)

{

TCPPacket::ObjPool().reclaimObject(pReceiveWindow);

PacketReceiver::RecvState rstate = this->checkSocketErrors(len, expectingPacket);

if(rstate == PacketReceiver::RECV_STATE_INTERRUPT)

{

onGetError(pChannel);

return false;

}

return rstate == PacketReceiver::RECV_STATE_CONTINUE;

}

else if(len == 0) // 客戶端正常退出

{

TCPPacket::ObjPool().reclaimObject(pReceiveWindow);

onGetError(pChannel);

return false;

}

Reason ret = this->processPacket(pChannel, pReceiveWindow);

if(ret != REASON_SUCCESS)

this->dispatcher().errorReporter().reportException(ret, pEndpoint_->addr());

return true;

}

不難發現正常狀況會調用processPacket,咱們跟進(packet_receiver.cpp):

Reason PacketReceiver::processPacket(Channel* pChannel, Packet * pPacket)

{

if (pChannel != NULL)

{

pChannel->onPacketReceived(pPacket->length());

if (pChannel->pFilter())

{

return pChannel->pFilter()->recv(pChannel, *this, pPacket);

}

}

return this->processFilteredPacket(pChannel, pPacket);

}

在沒有filter的狀況,流程會轉向processFilteredPacket(tcp_packet_receiver.cpp):

Reason TCPPacketReceiver::processFilteredPacket(Channel* pChannel, Packet * pPacket)

{

// 若是爲None, 則多是被過濾器過濾掉了(過濾器正在按照本身的規則組包解密)

if(pPacket)

{

pChannel->addReceiveWindow(pPacket);

}

return REASON_SUCCESS;

}

咱們跟進addReceiveWindow(channel.cpp):

void Channel::addReceiveWindow(Packet* pPacket)

{

bufferedReceives_.push_back(pPacket);

uint32 size = (uint32)bufferedReceives_.size();

if(Network::g_receiveWindowMessagesOverflowCritical > 0 && size > Network::g_receiveWindowMessagesOverflowCritical)

{

if(this->isExternal())

{

if(Network::g_extReceiveWindowMessagesOverflow > 0 &&

size > Network::g_extReceiveWindowMessagesOverflow)

{

ERROR_MSG(fmt::format("Channel::addReceiveWindow[{:p}]: external channel({}), receive window has overflowed({} > {}), Try adjusting the kbengine_defs.xml->receiveWindowOverflow.\n",

(void*)this, this->c_str(), size, Network::g_extReceiveWindowMessagesOverflow));

this->condemn();

}

else

{

WARNING_MSG(fmt::format("Channel::addReceiveWindow[{:p}]: external channel({}), receive window has overflowed({} > {}).\n",

(void*)this, this->c_str(), size, Network::g_receiveWindowMessagesOverflowCritical));

}

}

else

{

if(Network::g_intReceiveWindowMessagesOverflow > 0 &&

size > Network::g_intReceiveWindowMessagesOverflow)

{

WARNING_MSG(fmt::format("Channel::addReceiveWindow[{:p}]: internal channel({}), receive window has overflowed({} > {}).\n",

(void*)this, this->c_str(), size, Network::g_intReceiveWindowMessagesOverflow));

}

}

}

}

能夠看到,正常狀況下,一個包(客戶端與服務端的一個有效連接上的負載)接收以後先被放到Channel的bufferedReceives隊列。因而咱們還須要找到何到處理這個包。

vi. 處理接收數據:

咱們能夠在Channel的processPackets接口內找處處理bufferedReceives(channel.cpp):

void Channel::processPackets(KBEngine::Network::MessageHandlers* pMsgHandlers)

{

lastTickBytesReceived_ = 0;

lastTickBytesSent_ = 0;

if(pMsgHandlers_ != NULL)

{

pMsgHandlers = pMsgHandlers_;

}

if (this->isDestroyed())

{

ERROR_MSG(fmt::format("Channel::processPackets({}): channel[{:p}] is destroyed.\n",

this->c_str(), (void*)this));

return;

}

if(this->isCondemn())

{

ERROR_MSG(fmt::format("Channel::processPackets({}): channel[{:p}] is condemn.\n",

this->c_str(), (void*)this));

//this->destroy();

return;

}

if(pPacketReader_ == NULL)

{

handshake();

}

try

{

BufferedReceives::iterator packetIter = bufferedReceives_.begin();

for(; packetIter != bufferedReceives_.end(); ++packetIter)

{

Packet* pPacket = (*packetIter);

pPacketReader_->processMessages(pMsgHandlers, pPacket);

RECLAIM_PACKET(pPacket->isTCPPacket(), pPacket);

}

}catch(MemoryStreamException &)

{

Network::MessageHandler* pMsgHandler = pMsgHandlers->find(pPacketReader_->currMsgID());

WARNING_MSG(fmt::format("Channel::processPackets({}): packet invalid. currMsg=(name={}, id={}, len={}), currMsgLen={}\n",

this->c_str()

, (pMsgHandler == NULL ? "unknown" : pMsgHandler->name)

, pPacketReader_->currMsgID()

, (pMsgHandler == NULL ? -1 : pMsgHandler->msgLen)

, pPacketReader_->currMsgLen()));

pPacketReader_->currMsgID(0);

pPacketReader_->currMsgLen(0);

condemn();

}

bufferedReceives_.clear();

}

到這裏咱們又要弄清楚兩個問題,這個接口什麼時候被誰調用,調用又作了些什麼,爲了聯通整個流程,咱們仍是先弄清楚這個接口在哪被誰調用。

經過vs的「Find All References」功能我順利地找到了這個接口被調用的地方(network_interface.cpp):

clip_image002

void NetworkInterface::processChannels(KBEngine::Network::MessageHandlers* pMsgHandlers)

{

ChannelMap::iterator iter = channelMap_.begin();

for(; iter != channelMap_.end(); )

{

Network::Channel* pChannel = iter->second;

if(pChannel->isDestroyed())

{

++iter;

}

else if(pChannel->isCondemn())

{

++iter;

deregisterChannel(pChannel);

pChannel->destroy();

Network::Channel::ObjPool().reclaimObject(pChannel);

}

else

{

pChannel->processPackets(pMsgHandlers);

++iter;

}

}

}

同理,咱們得找到processChannels被調用的地方。(幾乎在每一個繼承自ServerApp的XXXApp的handleCheckStatusTick接口內都發現了相似下面的代碼):(loginapp.cpp)

void Loginapp::handleCheckStatusTick()

{

threadPool_.onMainThreadTick();

networkInterface().processChannels(&LoginappInterface::messageHandlers);

pendingLoginMgr_.process();

pendingCreateMgr_.process();

}

vii. 組件的定時器超時處理:

在XXXApp的handleTimeout的接口內咱們找到了下面的代碼:(loginapp.cpp)

void Loginapp::handleTimeout(TimerHandle handle, void * arg)

{

switch (reinterpret_cast<uintptr>(arg))

{

case TIMEOUT_CHECK_STATUS:

this->handleCheckStatusTick();

return;

default:

break;

}

ServerApp::handleTimeout(handle, arg);

}

根據ServerApp的父類TimerHandler的handleTimeout接口順利地找到了下面的代碼:(timer.inl)

template <class TIME_STAMP>

void TimersT< TIME_STAMP >::Time::triggerTimer()

{

if (!this->isCancelled())

{

state_ = TIME_EXECUTING;

pHandler_->handleTimeout( TimerHandle( this ), pUserData_ );

if ((interval_ == 0) && !this->isCancelled())

{

this->cancel();

}

}

if (!this->isCancelled())

{

time_ += interval_;

state_ = TIME_PENDING;

}

}

找到調用triggerTimer的地方:(timer.inl)

template <class TIME_STAMP>

int TimersT< TIME_STAMP >::process(TimeStamp now)

{

int numFired = 0;

while ((!timeQueue_.empty()) && (

timeQueue_.top()->time() <= now ||

timeQueue_.top()->isCancelled()))

{

Time * pTime = pProcessingNode_ = timeQueue_.top();

timeQueue_.pop();

if (!pTime->isCancelled())

{

++numFired;

pTime->triggerTimer();

}

if (!pTime->isCancelled())

{

timeQueue_.push( pTime );

}

else

{

delete pTime;

KBE_ASSERT( numCancelled_ > 0 );

--numCancelled_;

}

}

pProcessingNode_ = NULL;

lastProcessTime_ = now;

return numFired;

}

找到調用TimerHandler的process接口的地方:(event_dispatcher.cpp,是否是感受這個文件很熟悉。。。)

void EventDispatcher::processTimers()

{

numTimerCalls_ += pTimers_->process(timestamp());

}

**NEW in version 0.0.2**

爲何EventDispatcher::processTimer內會調用XXXapp的handleTimeout接口,這個要在咱們前面漏掉的一個接口(XXXapp::initialize)裏面找尋一下。(我發現這一點的思考過程是這樣的:既然超時後以後被調用的接口,那確定是在此以前註冊了計時器,能夠看到EventDispatcher::addTimer接口就是向EventDispatcher::pTimers_內註冊計時器的接口,因而乎在XXXapp的initialize/initializeBegin/initializeEnd中看到這個接口的調用,這裏摘自loginapp.cpp):(kbe的Timer/Time也有的可寫,不過咱們如今先不深究)

bool Loginapp::initializeEnd()

{

// 添加一個timer, 每秒檢查一些狀態

loopCheckTimerHandle_ = this->dispatcher().addTimer(1000000 / 50, this,

reinterpret_cast<void *>(TIMEOUT_CHECK_STATUS));

return true;

}

最終終於找到了咱們失散多年的總體流程:(event_dispatcher.cpp)

int EventDispatcher::processOnce(bool shouldIdle)

{

if(breakProcessing_ != EVENT_DISPATCHER_STATUS_BREAK_PROCESSING)

breakProcessing_ = EVENT_DISPATCHER_STATUS_RUNNING;

this->processTasks();

if(breakProcessing_ != EVENT_DISPATCHER_STATUS_BREAK_PROCESSING){

this->processTimers();

}

this->processStats();

if(breakProcessing_ != EVENT_DISPATCHER_STATUS_BREAK_PROCESSING){

return this->processNetwork(shouldIdle);

}

return 0;

}

其實上面的這個過程有點小艱辛,花了好幾個小時才完工。。。其實看到Channel::addReceiveWindow就應該想到這裏的,由於這裏的bufferedReceives的push_back操做沒有加鎖(stl的vector是非線程安全的),因此他應該是和EventDispatcher::processNetwork接口在同一個線程的同步調用流程上,如此一來就只多是EventDispatcher::processTasks,EventDispatcher::processTimers,EventDispatcher::processStats中的一個了。

咱們接着回到以前的包處理的分支流程(就是Channel的processPackets接口)。

在packet_reader.cpp內咱們能夠找到PacketReader::processMesages接口(代碼太長,不貼了)。最終咱們能夠看到消息對應的handler的處理接口調用:

pMsgHandler->handle(pChannel_, *pFragmentStream_);

創建消息對應的handler的映射在XXXapp_interface.h中(各類宏,看的頭都大了,留到後面各個組件的單獨消息分析中再說, 注:version 0.0.2中以loginapp進行了解讀

至此,咱們能夠得出一個kbengine底層的大體流程的輪廓。

其實kbe裏面還有不少「輪子」能夠拿出來細說,像是單例,定時器,內存池,線程池,以及用來通信的序列化和反序列化(MemoryStream)機制。。。。。。

總得來說,大致由於底層網絡(select,epoll)的異步回調機制,使得整個流程都有點小亂的感受。「輪子」有點多,不利於利用現有的知識。一個高性能的c/c++項目幾乎總會用到一些語言的 奇淫技巧,不過對於分佈式的項目,既然咱們已經作好了不追求單點高性能的心理準備,就不該該再爲了追求性能而損失可讀性。在單點性能可接受的程度上提升代碼的可讀性,提升總體的水平擴展能力,纔是分佈式項目的正道。

**NEW in version 0.0.2**

負載消息與處理handler映射的創建:

在0.0.1中咱們分析到了processMessages這裏會調用對應的消息handler,但這個映射的創建過程咱們還沒分析,此次咱們就一探究竟。(直接放在loginapp中敘述)

組件:

machine

loginapp

消息與handler映射的創建:

**NEW in version 0.0.2**

每一個app組件的接口定義都在xxxapp_interface.cpp中開始,代碼以下:

#include "loginapp_interface.h"

#define DEFINE_IN_INTERFACE

#define LOGINAPP

#include "loginapp_interface.h"

namespace KBEngine{

namespace LoginappInterface{

//-------------------------------------------------------------------------------------

}

}

全部的戲法都是經過包含loginapp_interface.h先後定義了DEFINE_IN_INTERFACE和LOGINAPP來完成的。第一次的包含就是各類變量,類的聲明(固然也有一些類是聲明類時使用類inline函數定義完成了,好比MESSAGE_ARGS0/1/2......)。

咱們看看loginapp_interface.h中的代碼:

首先是這一句:

NETWORK_INTERFACE_DECLARE_BEGIN(LoginappInterface)

此句展開的話聲明和定義了Network::MessageHandlers messageHandlers,展開宏以後的代碼看起來像這樣(是的,你的眼睛是好的,沒有}閉合):

聲明:

namespace LoginappInterface {

extern Network::MessageHandlers messageHandlers;

定義:

namespace LoginappInterface {

Network::MessageHandlers messageHandlers;

而後是這一句:

LOGINAPP_MESSAGE_DECLARE_ARGS0(importClientMessages, NETWORK_FIXED_MESSAGE)

此句展開的話分明聲明和定義了一個importClientMessagesLoginappMessagehandler0的類,這個類繼承自Network::MessageHandler,這裏就是實現了handle的虛函數接口;聲明和定義了importClientMessagesLoginappMessagehandler0的一個名爲importClientMessages的全局變量;聲明和定義了importClientMessagesArgs0的類,這個類繼承自Network::MessageArgs。咱們一個個地分析一下:

首先展開下面的宏:

LOGINAPP_MESSAGE_DECLARE_ARGS0(importClientMessages, NETWORK_FIXED_MESSAGE)

以後是這樣:

#define LOGINAPP_MESSAGE_DECLARE_ARGS0(NAME, MSG_LENGTH) \

LOGINAPP_MESSAGE_HANDLER_ARGS0(NAME) \

NETWORK_MESSAGE_DECLARE_ARGS0(Loginapp, NAME, \

NAME##LoginappMessagehandler0, MSG_LENGTH)

展開LOGINAPP_MESSAGE_HANDLER_ARGS0(NAME)以後分別獲得importClientMessagesLoginappMessagehandler0的聲明和定義:

聲明:

class importClientMessagesLoginappMessagehandler0 : public Network::MessageHandler

{

public:

virtual void handle(Network::Channel* pChannel, KBEngine::MemoryStream& s);

};

定義:

void importClientMessagesLoginappMessagehandler0::handle(Network::Channel* pChannel, KBEngine::MemoryStream& s)

{

KBEngine::Loginapp::getSingleton().importClientMessages(pChannel);

}

上面完成了至關因而importClientMessages消息的handler的聲明和定義,下面則將這個類實例化以後添加到messageHandlers:

#define NETWORK_MESSAGE_DECLARE_ARGS0(DOMAIN, NAME, MSGHANDLER, \

MSG_LENGTH) \

NETWORK_MESSAGE_HANDLER(DOMAIN, NAME, MSGHANDLER, MSG_LENGTH, 0)\

MESSAGE_ARGS0(NAME) \

展開NETWORK_MESSAGE_HANDLER(DOMAIN, NAME, MSGHANDLER, MSG_LENGTH, 0)以後獲得importClientMessages的handler類(importClientMessagesLoginappMessagehandler0)的名爲importClientMessages的全局變量。

聲明:

extern const importClientMessagesLoginappMessagehandler0& importClientMessages;

定義:

importClientMessagesLoginappMessagehandler0* pimportClientMessages = static_cast<importClientMessagesLoginappMessagehandler0*>(messageHandlers.add("Loginapp::importClientMessages",new importClientMessagesArgs0, NETWORK_FIXED_MESSAGE, new importClientMessagesLoginappMessagehandler0);

const importClientMessagesLoginappMessagehandler0& importClientMessages = *pimportClientMessages;

下面的MESSAGE_ARGS0(NAME)展開後對importClientMessagesArgs0進行了聲明和定義(其餘它聲明的時候就已經完成了所有的定義),聲明的時候就是個空語句:

聲明兼定義:

class importClientMessagesArgs0 : public Network::MessageArgs

{

public:

importClientMessagesArgs0() :Network::MessageArgs() {}

~importClientMessagesArgs0() {}

static void staticAddToBundle(Network::Bundle& s)

{

}

static void staticAddToStream(MemoryStream& s)

{

}

virtual int32 dataSize(void)

{

return 0;

}

virtual void addToStream(MemoryStream& s)

{

}

virtual void createFromStream(MemoryStream& s)

{

}

};

惟一須要小注意一下的就是importClientMessagesArgs0的聲明(兼定義)是和importClientMessagesLoginappMessagehandler0的實例的聲明和定義是錯開的,由於後者實例化添加到messageHandlers的時候須要new一個importClientMessagesArgs0的實例。

稍微整理一下以後,使用LOGINAPP_MESSAGE_HANDLER_ARGSn創建一個消息到handler的映射的代碼很像是這樣:

聲明:(第一次包含loginapp_interface.h產生的代碼)

class importClientMessagesLoginappMessagehandler0 : public Network::MessageHandler

{

public:

virtual void handle(Network::Channel* pChannel, KBEngine::MemoryStream& s);

};

extern const importClientMessagesLoginappMessagehandler0& importClientMessages;

class importClientMessagesArgs0 : public Network::MessageArgs

{

public:

importClientMessagesArgs0() :Network::MessageArgs() {}

~importClientMessagesArgs0() {}

static void staticAddToBundle(Network::Bundle& s)

{

}

static void staticAddToStream(MemoryStream& s)

{

}

virtual int32 dataSize(void)

{

return 0;

}

virtual void addToStream(MemoryStream& s)

{

}

virtual void createFromStream(MemoryStream& s)

{

}

};

定義:(定義DEFINE_IN_INTERFACE和LOGINAPP以後第二次包含loginapp_interface.h產生的代碼)

void importClientMessagesLoginappMessagehandler0::handle(Network::Channel* pChannel, KBEngine::MemoryStream& s)

{

KBEngine::Loginapp::getSingleton().importClientMessages(pChannel);

}

importClientMessagesLoginappMessagehandler0* pimportClientMessages

= static_cast<importClientMessagesLoginappMessagehandler0*>(messageHandlers.add("Loginapp::importClientMessages",

new importClientMessagesArgs0,

NETWORK_FIXED_MESSAGE,

new importClientMessagesLoginappMessagehandler0);

const importClientMessagesLoginappMessagehandler0& importClientMessages = *pimportClientMessages;

dbmgr

caseapp

baseappmgr

cellapp

cellappmgr

腳本邏輯層:

各類工具:

相關文章
相關標籤/搜索