基於kbengine 0.4.20 解讀

【如下文章轉自kbe論壇】c++

MMOG服務端是一種高品質的工程項目,品讀開源的kbe是一種樂趣。本文檔我帶童鞋們一塊兒領略一下。囿於我知識面和經驗方面所限,文中所述之處不免有錯誤存在,還請讀童鞋們睜大慧眼,若是你發現錯誤,能夠 電郵至shawhen2012@hotmail.com。(由於我我的懶散或者時間倉促的關係,這個文檔的排版有點小亂。。。)redis

其餘牛逼哄哄的前言就不說了。api

從理論上來說,咱們閱讀一份源代碼,首先應該基於現有的文檔從總體上把握項目的架構以後再庖丁解牛通常地細分閱讀,不過在我寫這個文檔的如今,我暫時沒發現這樣的文檔,因此我就按照我本身的閱讀順序從而編排這個文檔的內容。安全

從已有的文檔可知(我得假設你已經大體看完了kbe官網的現有文檔),kbe由幾個組件共同協做,因此咱們先看看組件們:網絡

各個組件被設計爲獨立的app,使用網絡通訊進行協做。C++程序天然是從main函數開始。架構

看起來彷佛全部的組件都有一個這樣的宏(KBENGINE_MAIN)來包裹main函數app

int KBENGINE_MAIN(int argc, char* argv[])
{
    ENGINE_COMPONENT_INFO& info = g_kbeSrvConfig.getXXX();
    return kbeMainT<XXX>(argc, argv, YYY, info.externalPorts_min, 
        info.externalPorts_max, info.externalInterface, 0, info.internalInterface);
}

這個宏展開是這樣子:異步

#if KBE_PLATFORM == PLATFORM_WIN32
#define KBENGINE_MAIN                                                                                                    \
kbeMain(int argc, char* argv[]);                                                                                        \
int main(int argc, char* argv[])                                                                                        \
{                                                                                                                        \
    loadConfig();                                                                                                        \
    g_componentID = genUUID64();                                                                                        \
    parseMainCommandArgs(argc, argv);                                                                                    \
    char dumpname[MAX_BUF] = {0};                                                                                        \
    kbe_snprintf(dumpname, MAX_BUF, "%"PRAppID, g_componentID);                                                            \
    KBEngine::exception::installCrashHandler(1, dumpname);                                                                \
    int retcode = -1;                                                                                                    \
    THREAD_TRY_EXECUTION;                                                                                                \
    retcode = kbeMain(argc, argv);                                                                                        \
    THREAD_HANDLE_CRASH;                                                                                                \
    return retcode;                                                                                                        \
}                                                                                                                        \
int kbeMain
#else
#define KBENGINE_MAIN                                                                                                    \
kbeMain(int argc, char* argv[]);                                                                                        \
int main(int argc, char* argv[])                                                                                        \
{                                                                                                                        \
    loadConfig();                                                                                                        \
    g_componentID = genUUID64();                                                                                        \
    parseMainCommandArgs(argc, argv);                                                                                    \
    return kbeMain(argc, argv);                                                                                            \
}                                                                                                                        \
int kbeMain
#endif
}

將上面的代碼整理以後,很像下面的樣子socket

int kbeMain(int argc, char* argv[]);
int main(int argc, char* argv[])
{
    loadConfig();
    g_componentID = genUUID64();
    parseMainCommandArgs(argc, argv);
    char dumpname[MAX_BUF] = {0};
    kbe_snprintf(dumpname, MAX_BUF, "%"PRAppID, g_componentID);
    KBEngine::exception::installCrashHandler(1, dumpname);
    int retcode = -1;
    THREAD_TRY_EXECUTION;
    retcode = kbeMain(argc, argv);
    THREAD_HANDLE_CRASH;
    return (retcode);
}
int kbeMain(int argc, char* argv[])
{
            ENGINE_COMPONENT_INFO& info = g_kbeSrvConfig.getXXX();
            return kbeMainT<XXX>(argc, argv, YYY, info.externalPorts_min, info.externalPorts_max, info.externalInterface, 0, info.internalInterface);
}

基本能夠理解爲每一個組件的main函數流程都是同樣的,只是在特化kbeMainT時所給參數不同。tcp

咱們跟着main函數的loadConfig進去看看(kbemain.h)

inline void loadConfig()
{
    Resmgr::getSingleton().initialize();

    // "../../res/server/kbengine_defs.xml"
    g_kbeSrvConfig.loadConfig("server/kbengine_defs.xml");

    // "../../../assets/res/server/kbengine.xml"
    g_kbeSrvConfig.loadConfig("server/kbengine.xml");
}

在serverconfig.h中能夠看到這樣的代碼:

#define g_kbeSrvConfig ServerConfig::getSingleton()

Resmgr和ServerConfig,這兩個類都是被搞成單例了的(kbe的單例不算太嚴格的單例,線程安全,編譯時阻斷都沒法知足,咱們先不細究),

Resmgr是資源管理器,在resmgr.h/.cpp中聲明和定義,在FixedMessages類(fixed_messages.h/.cpp)的構造函數中被new出來。(有點小隱晦,我是調試了一下才跟到的。。。)

過程是這樣,在各個組件的xxx_interface.cpp中,有這樣的代碼:(摘自loginapp)

#include "loginapp_interface.h"

#define DEFINE_IN_INTERFACE

#define LOGINAPP

#include "loginapp_interface.h"

xxx_interface.h中,有這樣的代碼:

#if defined(DEFINE_IN_INTERFACE)

#undef KBE_LOGINAPP_INTERFACE_H

#endif

#ifndef KBE_LOGINAPP_INTERFACE_H

#define KBE_LOGINAPP_INTERFACE_H

大意能夠理解爲在xxx_interface.cpp中經過在包含xxx_interface.h先後定義DEFINE_IN_INTERFACE和LOGINAPP宏,使得xxx_interface.h被包含了兩次(但產生的代碼確實不一樣的),從而對xxx_interface.h內的一些變量實現了聲明(第一次)和定義(第二次)。

在xxx_interface.h中有這樣一句:

NETWORK_INTERFACE_DECLARE_BEGIN(LoginappInterface)

展開就是:

// 定義接口域名稱

#ifndef DEFINE_IN_INTERFACE

#define NETWORK_INTERFACE_DECLARE_BEGIN(INAME) \

namespace INAME \

{ \

extern Network::MessageHandlers messageHandlers; \

#else

#define NETWORK_INTERFACE_DECLARE_BEGIN(INAME) \

namespace INAME \

{ \

Network::MessageHandlers messageHandlers; \

#endif

#define NETWORK_INTERFACE_DECLARE_END() }

在第一次包含xxx_interface.h的時候就是extern Network….這樣的外部全局變量引用聲明,第二次包含的時候就是Network::M…..這樣的全局變量的定義了。

在Network::MessageHandles的構造函數(message_handler.cpp)中,有:

MessageHandlers::MessageHandlers():

msgHandlers_(),

msgID_(1),

exposedMessages_()

{

g_fm = Network::FixedMessages::getSingletonPtr();

if(g_fm == NULL)

g_fm = new Network::FixedMessages;

Network::FixedMessages::getSingleton().loadConfig("server/messages_fixed.xml");

messageHandlers().push_back(this);

}

至此,Network::FixedMessages類被有機會實例化,構造函數中有:

FixedMessages::FixedMessages():

_infomap(),

_loaded(false)

{

new Resmgr();

Resmgr::getSingleton().initialize();

}

ServerConfig在各個組件(好比loginapp在loginapp.cpp)的類的定義文件中實例化。

ServerConfig g_serverConfig;

KBE_SINGLETON_INIT(Loginapp);

咱們再次回到loadConfig,裏面的函數小跟一下就能讀明白了,咱們繼續跟進主幹流程。

下面的語句給組件生成一個隨機id

g_componentID = genUUID64();

下面的語句解析主函數的參數:(好比設定指定的組件id,以及gus,我也還沒了解到gus搞啥用的。。。不影響咱們閱讀總體流程,不細究)

parseMainCommandArgs(argc, argv);

下面的語句進行crash處理:(不影響咱們閱讀總體流程,不細究)

char dumpname[MAX_BUF] = {0}; \

kbe_snprintf(dumpname, MAX_BUF, "%"PRAppID, g_componentID); \

KBEngine::exception::installCrashHandler(1, dumpname);

下面的語句就是一個標準的main函數轉向:

int retcode = -1; \

THREAD_TRY_EXECUTION; \

retcode = kbeMain(argc, argv); \

THREAD_HANDLE_CRASH; \

return retcode;

在kbemain.h中能夠看到KBENGINE_MAIN針對不一樣的平臺有不一樣的定義。。。其實就是非win32平臺沒有crash處理。

kbeMain是在各個組件的main.cpp中定義的:(摘自loginapp)

{

ENGINE_COMPONENT_INFO& info = g_kbeSrvConfig.getLoginApp();

return kbeMainT<Loginapp>(argc, argv, LOGINAPP_TYPE, info.externalPorts_min, 

info.externalPorts_max, info.externalInterface, 0, info.internalInterface);

}

第一句就是獲取到各個組件相關的信息,而後流程再次轉向,到了kbeMainT函數。

kbeMainT是一個模板函數,根據各個組件的主類的不一樣,產生不一樣的代碼(。。。我是否是有點過了)

前面的一些語句咱們暫且不看(後面會須要看的),有設置環境變量的,設置密鑰對的。能夠看到kbeMainT的流程以各個組件的實例的run接口而終止。

大體看了幾個組件的run函數,有的是調用ServerApp的run接口,有的是直接調用dispatcher的processXXX接口,總的來說,就是依靠事件派發器來進行工做了。因此咱們有必要去看看kbe的事件派發器了。

l loginapp組件的run接口:

bool Loginapp::run()

{

return ServerApp::run();

}

l machine組件的run接口:

bool Machine::run()

{

bool ret = true;

while(!this->dispatcher().hasBreakProcessing())

{

threadPool_.onMainThreadTick();

this->dispatcher().processOnce(false);

networkInterface().processChannels(&MachineInterface::messageHandlers);

KBEngine::sleep(100);

};

return ret;

}

l baseapp組件的run接口:

bool Baseapp::run()

{

return EntityApp<Base>::run();

}

實際上EntityApp也是繼承自ServerApp

在kbeMainT中能夠看到:

Network::EventDispatcher dispatcher;

這裏構造了事件派發器,咱們得看看它所謂的process到底幹了些什麼。

在event_dispatcher.cpp能夠看到:

int EventDispatcher::processOnce(bool shouldIdle)

{

if(breakProcessing_ != EVENT_DISPATCHER_STATUS_BREAK_PROCESSING)

breakProcessing_ = EVENT_DISPATCHER_STATUS_RUNNING;

this->processTasks();

if(breakProcessing_ != EVENT_DISPATCHER_STATUS_BREAK_PROCESSING){

this->processTimers();

}

this->processStats();

if(breakProcessing_ != EVENT_DISPATCHER_STATUS_BREAK_PROCESSING){

return this->processNetwork(shouldIdle);

}

return 0;

}

這裏咱們有必要明白一些常識,對於從select,poll,epoll,iocp,kqueue的api,到boost的asio,ace,libevent,libev的庫,這些網絡i/o複用模型都是爲了讓咱們監聽nic上的事件,而後提交給相應的handler處理。他們除了工做方式致使的性能和編碼方式有區別外,還有回調的時機的區別,iocp是對於資源的指定動做完成後回調,其餘的unix族(kqueue是bsd的)接口都是資源對於指定動做準備好時回調。

因此咱們要解讀這個事件派發器得找到兩個重點,如何產生事件,如何交付到應用處理。

EventDispatcher的processTasks能夠稍微跟一下,發現它是處理全部「任務」的一個接口,但什麼是任務,咱們還不知道,不繼續跟了。

processTimers, processStats也暫時不跟了。。。

咱們跟到processNetwork裏面去:

int EventDispatcher::processNetwork(bool shouldIdle)

{

double maxWait = shouldIdle ? this->calculateWait() : 0.0;

return pPoller_->processPendingEvents(maxWait);

}

這個pPoller是一個EventPoller的實例。EventPoller是一個抽象類(event_poller.h/.cpp),它目前有兩個派生子類,SelectorPoller(poller_select.h/.cpp)和EpollPoller(poller_epoll.h/.cpp),顧名思義,它們分別是利用select和epoll系統api進行異步i/o工做的。(在win32上面若是使用iocp的話性能應該是能夠和epoll匹敵的,不過因爲iocp和epoll工做方式不同,我估計這是kbe裏面沒有在win32上使用iocp的緣由,若是要將這兩個工做方式抽象爲一種,工做量估計比kbe自己小不了多少,若是是個人話,我會直接使用asio或者libevent,不過kbe做者爲啥沒用,可能就像redis的做者的解釋同樣,雖然我以爲那是很操蛋的解釋,使用現有庫的好處是顯而易見的,若是碰巧像我這種對asio或者libevent有經驗的,那這裏kbe的網絡底層我能夠一掠而過,我也能夠把更多的精力放在這個項目自己要解決的問題上。繼續發牢騷可能)

select和epoll的工做方式一致,因此咱們任選一個閱讀都行,我傾向於使用epoll。

int EpollPoller::processPendingEvents(double maxWait)

{

const int MAX_EVENTS = 10;

struct epoll_event events[ MAX_EVENTS ];

int maxWaitInMilliseconds = int(ceil(maxWait * 1000));

#if ENABLE_WATCHERS

g_idleProfile.start();

#else

uint64 startTime = timestamp();

#endif

KBEConcurrency::onStartMainThreadIdling();

int nfds = epoll_wait(epfd_, events, MAX_EVENTS, maxWaitInMilliseconds);

KBEConcurrency::onEndMainThreadIdling();

#if ENABLE_WATCHERS

g_idleProfile.stop();

spareTime_ += g_idleProfile.lastTime_;

#else

spareTime_ += timestamp() - startTime;

#endif

for (int i = 0; i < nfds; ++i)

{

if (events[i].events & (EPOLLERR|EPOLLHUP))

{

this->triggerError(events[i].data.fd);

}

else

{

if (events[i].events & EPOLLIN)

{

this->triggerRead(events[i].data.fd);

}

if (events[i].events & EPOLLOUT)

{

this->triggerWrite(events[i].data.fd);

}

}

}

return nfds;

}

大意就是對Poller內註冊的文件描述符進行事件等待,而後對事件進行區分以後觸發讀(triggerRead)或者寫(triggerWrite)的接口。咱們跟一下這兩個接口:

bool EventPoller::triggerRead(int fd)

{

FDReadHandlers::iterator iter = fdReadHandlers_.find(fd);

if (iter == fdReadHandlers_.end())

{

return false;

}

iter->second->handleInputNotification(fd);

return true;

}

//-------------------------------------------------------------------------------------

bool EventPoller::triggerWrite(int fd)

{

FDWriteHandlers::iterator iter = fdWriteHandlers_.find(fd);

if (iter == fdWriteHandlers_.end())

{

return false;

}

iter->second->handleOutputNotification(fd);

return true;

}

能夠看到就是對註冊的文件描述符查找相應的輸入輸出處理接口(這裏也指導了咱們下一步的閱讀方向,找到註冊文件描述符的地方)。至此咱們找到了事件如何產生。(其實我一直不習慣poll流的模型,我比較喜歡iocp的模型,雖然理論上來說poll會給予應用層更多的變化點。打個不太形象的比喻,你讓poll或者iocp給你準備三輛車。你給poll說完,poll稍後會告訴你:老爺,車備好了,在車庫裏,但具體有幾輛我也不清楚,您自個去看看。你給iocp三把鑰匙,iocp稍後會告訴你:老爺,三輛車準備好了,就停在門外。)

爲了找到註冊文件描述符和事件處理接口的流程,咱們再次回到kbeMainT。映入眼簾的是這行代碼:

Network::NetworkInterface networkInterface(&dispatcher,

extlisteningPort_min, extlisteningPort_max, extlisteningInterface,

channelCommon.extReadBufferSize, channelCommon.extWriteBufferSize,

(intlisteningPort != -1) ? htons(intlisteningPort) : -1, intlisteningInterface,

channelCommon.intReadBufferSize, channelCommon.intWriteBufferSize);

Network::NetworkInterface的構造函數:

NetworkInterface::NetworkInterface(Network::EventDispatcher * pDispatcher,

int32 extlisteningPort_min, int32 extlisteningPort_max, const char * extlisteningInterface,

uint32 extrbuffer, uint32 extwbuffer,

int32 intlisteningPort, const char * intlisteningInterface,

uint32 intrbuffer, uint32 intwbuffer):

extEndpoint_(),

intEndpoint_(),

channelMap_(),

pDispatcher_(pDispatcher),

pExtensionData_(NULL),

pExtListenerReceiver_(NULL),

pIntListenerReceiver_(NULL),

pDelayedChannels_(new DelayedChannels()),

pChannelTimeOutHandler_(NULL),

pChannelDeregisterHandler_(NULL),

isExternal_(extlisteningPort_min != -1),

numExtChannels_(0)

{

if(isExternal())

{

pExtListenerReceiver_ = new ListenerReceiver(extEndpoint_, Channel::EXTERNAL, *this);

this->recreateListeningSocket("EXTERNAL", htons(extlisteningPort_min), htons(extlisteningPort_max),

extlisteningInterface, &extEndpoint_, pExtListenerReceiver_, extrbuffer, extwbuffer);

// 若是配置了對外端口範圍, 若是範圍太小這裏extEndpoint_可能沒有端口可用了

if(extlisteningPort_min != -1)

{

KBE_ASSERT(extEndpoint_.good() && "Channel::EXTERNAL: no available port, "

"please check for kbengine_defs.xml!\n");

}

}

if(intlisteningPort != -1)

{

pIntListenerReceiver_ = new ListenerReceiver(intEndpoint_, Channel::INTERNAL, *this);

this->recreateListeningSocket("INTERNAL", intlisteningPort, intlisteningPort,

intlisteningInterface, &intEndpoint_, pIntListenerReceiver_, intrbuffer, intwbuffer);

}

KBE_ASSERT(good() && "NetworkInterface::NetworkInterface: no available port, "

"please check for kbengine_defs.xml!\n");

pDelayedChannels_->init(this->dispatcher(), this);

}

在recreateListeningSocket接口的代碼中:

bool NetworkInterface::recreateListeningSocket(const char* pEndPointName, uint16 listeningPort_min, uint16 listeningPort_max,

const char * listeningInterface, EndPoint* pEP, ListenerReceiver* pLR, uint32 rbuffer,

uint32 wbuffer)

{

KBE_ASSERT(listeningInterface && pEP && pLR);

if (pEP->good())

{

this->dispatcher().deregisterReadFileDescriptor( *pEP );

pEP->close();

}

Address address;

address.ip = 0;

address.port = 0;

pEP->socket(SOCK_STREAM);

if (!pEP->good())

{

ERROR_MSG(fmt::format("NetworkInterface::recreateListeningSocket({}): couldn't create a socket\n",

pEndPointName));

return false;

}

/*

pEP->setreuseaddr(true);

*/

this->dispatcher().registerReadFileDescriptor(*pEP, pLR);

找到了事件派發器註冊文件描述符的地方,而註冊的事件處理接口也就是這個ListenerReceiver(listener_receiver.h/.cpp)。

跟到ListenerReceiver的handleInputNotification接口:

int ListenerReceiver::handleInputNotification(int fd)

{

int tickcount = 0;

while(tickcount ++ < 256)

{

EndPoint* pNewEndPoint = endpoint_.accept();

if(pNewEndPoint == NULL){

if(tickcount == 1)

{

WARNING_MSG(fmt::format("PacketReceiver::handleInputNotification: accept endpoint({}) {}!\n",

fd, kbe_strerror()));

this->dispatcher().errorReporter().reportException(

REASON_GENERAL_NETWORK);

}

break;

}

else

{

Channel* pChannel = Network::Channel::ObjPool().createObject();

bool ret = pChannel->initialize(networkInterface_, pNewEndPoint, traits_);

if(!ret)

{

ERROR_MSG(fmt::format("ListenerReceiver::handleInputNotification: initialize({}) is failed!\n",

pChannel->c_str()));

pChannel->destroy();

Network::Channel::ObjPool().reclaimObject(pChannel);

return 0;

}

if(!networkInterface_.registerChannel(pChannel))

{

ERROR_MSG(fmt::format("ListenerReceiver::handleInputNotification: registerChannel({}) is failed!\n",

pChannel->c_str()));

pChannel->destroy();

Network::Channel::ObjPool().reclaimObject(pChannel);

}

}

}

return 0;

}

若是你手工擼過epoll就會知道在監聽套接口上若是有可讀事件,則表明着有新的鏈接進來,

一般咱們就是使用accept來接收這個新鏈接,而後註冊到epoll上,可是kbe在這個ListenerReceiver中不是這麼作的,它只是關心監聽套接口的可讀事件,而後將新的鏈接封裝到它所謂的一個Channel中去了。(kbe用一個EndPoint來表徵一個socket終端)。因此咱們再跟進這個Channel看看。

bool Channel::initialize(NetworkInterface & networkInterface,

const EndPoint * pEndPoint,

Traits traits,

ProtocolType pt,

PacketFilterPtr pFilter,

ChannelID id)

{

id_ = id;

protocoltype_ = pt;

traits_ = traits;

pFilter_ = pFilter;

pNetworkInterface_ = &networkInterface;

this->pEndPoint(pEndPoint);

KBE_ASSERT(pNetworkInterface_ != NULL);

KBE_ASSERT(pEndPoint_ != NULL);

if(protocoltype_ == PROTOCOL_TCP)

{

if(pPacketReceiver_)

{

if(pPacketReceiver_->type() == PacketReceiver::UDP_PACKET_RECEIVER)

{

SAFE_RELEASE(pPacketReceiver_);

pPacketReceiver_ = new TCPPacketReceiver(*pEndPoint_, *pNetworkInterface_);

}

}

else

{

pPacketReceiver_ = new TCPPacketReceiver(*pEndPoint_, *pNetworkInterface_);

}

KBE_ASSERT(pPacketReceiver_->type() == PacketReceiver::TCP_PACKET_RECEIVER);

// UDP不須要註冊描述符

pNetworkInterface_->dispatcher().registerReadFileDescriptor(*pEndPoint_, pPacketReceiver_);

// 須要發送數據時再註冊

// pPacketSender_ = new TCPPacketSender(*pEndPoint_, *pNetworkInterface_);

// pNetworkInterface_->dispatcher().registerWriteFileDescriptor(*pEndPoint_, pPacketSender_);

}

else

{

if(pPacketReceiver_)

{

if(pPacketReceiver_->type() == PacketReceiver::TCP_PACKET_RECEIVER)

{

SAFE_RELEASE(pPacketReceiver_);

pPacketReceiver_ = new UDPPacketReceiver(*pEndPoint_, *pNetworkInterface_);

}

}

else

{

pPacketReceiver_ = new UDPPacketReceiver(*pEndPoint_, *pNetworkInterface_);

}

KBE_ASSERT(pPacketReceiver_->type() == PacketReceiver::UDP_PACKET_RECEIVER);

}

pPacketReceiver_->pEndPoint(pEndPoint_);

if(pPacketSender_)

pPacketSender_->pEndPoint(pEndPoint_);

startInactivityDetection((traits_ == INTERNAL) ? g_channelInternalTimeout :

g_channelExternalTimeout,

(traits_ == INTERNAL) ? g_channelInternalTimeout / 2.f:

g_channelExternalTimeout / 2.f);

return true;

}

能夠看到在這個initialize接口中,新的EndPoint仍是註冊到了事件派發器中,只是處理的方式變爲了PacketReceiver(雖然實現了UDP和TCP兩種PacketReceiver,不過目前彷佛只有TCPPacketReceiver被用到了)。

看看PacketReceiver的handleInputNotification:

int PacketReceiver::handleInputNotification(int fd)

{

if (this->processRecv(/*expectingPacket:*/true))

{

while (this->processRecv(/*expectingPacket:*/false))

{

/* pass */;

}

}

return 0;

}

咱們跟進processRecv(tcp_packet_receiver.cpp):

bool TCPPacketReceiver::processRecv(bool expectingPacket)

{

Channel* pChannel = getChannel();

KBE_ASSERT(pChannel != NULL);

if(pChannel->isCondemn())

{

return false;

}

TCPPacket* pReceiveWindow = TCPPacket::ObjPool().createObject();

int len = pReceiveWindow->recvFromEndPoint(*pEndpoint_);

if (len < 0)

{

TCPPacket::ObjPool().reclaimObject(pReceiveWindow);

PacketReceiver::RecvState rstate = this->checkSocketErrors(len, expectingPacket);

if(rstate == PacketReceiver::RECV_STATE_INTERRUPT)

{

onGetError(pChannel);

return false;

}

return rstate == PacketReceiver::RECV_STATE_CONTINUE;

}

else if(len == 0) // 客戶端正常退出

{

TCPPacket::ObjPool().reclaimObject(pReceiveWindow);

onGetError(pChannel);

return false;

}

Reason ret = this->processPacket(pChannel, pReceiveWindow);

if(ret != REASON_SUCCESS)

this->dispatcher().errorReporter().reportException(ret, pEndpoint_->addr());

return true;

}

不難發現正常狀況會調用processPacket,咱們跟進(packet_receiver.cpp):

Reason PacketReceiver::processPacket(Channel* pChannel, Packet * pPacket)

{

if (pChannel != NULL)

{

pChannel->onPacketReceived(pPacket->length());

if (pChannel->pFilter())

{

return pChannel->pFilter()->recv(pChannel, *this, pPacket);

}

}

return this->processFilteredPacket(pChannel, pPacket);

}

在沒有filter的狀況,流程會轉向processFilteredPacket(tcp_packet_receiver.cpp):

Reason TCPPacketReceiver::processFilteredPacket(Channel* pChannel, Packet * pPacket)

{

// 若是爲None, 則多是被過濾器過濾掉了(過濾器正在按照本身的規則組包解密)

if(pPacket)

{

pChannel->addReceiveWindow(pPacket);

}

return REASON_SUCCESS;

}

咱們跟進addReceiveWindow(channel.cpp):

void Channel::addReceiveWindow(Packet* pPacket)

{

bufferedReceives_.push_back(pPacket);

uint32 size = (uint32)bufferedReceives_.size();

if(Network::g_receiveWindowMessagesOverflowCritical > 0 && size > Network::g_receiveWindowMessagesOverflowCritical)

{

if(this->isExternal())

{

if(Network::g_extReceiveWindowMessagesOverflow > 0 &&

size > Network::g_extReceiveWindowMessagesOverflow)

{

ERROR_MSG(fmt::format("Channel::addReceiveWindow[{:p}]: external channel({}), receive window has overflowed({} > {}), Try adjusting the kbengine_defs.xml->receiveWindowOverflow.\n",

(void*)this, this->c_str(), size, Network::g_extReceiveWindowMessagesOverflow));

this->condemn();

}

else

{

WARNING_MSG(fmt::format("Channel::addReceiveWindow[{:p}]: external channel({}), receive window has overflowed({} > {}).\n",

(void*)this, this->c_str(), size, Network::g_receiveWindowMessagesOverflowCritical));

}

}

else

{

if(Network::g_intReceiveWindowMessagesOverflow > 0 &&

size > Network::g_intReceiveWindowMessagesOverflow)

{

WARNING_MSG(fmt::format("Channel::addReceiveWindow[{:p}]: internal channel({}), receive window has overflowed({} > {}).\n",

(void*)this, this->c_str(), size, Network::g_intReceiveWindowMessagesOverflow));

}

}

}

}

能夠看到,正常狀況下,一個包(客戶端與服務端的一個有效連接上的負載)接收以後先被放到Channel的bufferedReceives隊列。因而咱們還須要找到何到處理這個包。

咱們能夠在Channel的processPackets接口內找處處理bufferedReceives(channel.cpp):

void Channel::processPackets(KBEngine::Network::MessageHandlers* pMsgHandlers)

{

lastTickBytesReceived_ = 0;

lastTickBytesSent_ = 0;

if(pMsgHandlers_ != NULL)

{

pMsgHandlers = pMsgHandlers_;

}

if (this->isDestroyed())

{

ERROR_MSG(fmt::format("Channel::processPackets({}): channel[{:p}] is destroyed.\n",

this->c_str(), (void*)this));

return;

}

if(this->isCondemn())

{

ERROR_MSG(fmt::format("Channel::processPackets({}): channel[{:p}] is condemn.\n",

this->c_str(), (void*)this));

//this->destroy();

return;

}

if(pPacketReader_ == NULL)

{

handshake();

}

try

{

BufferedReceives::iterator packetIter = bufferedReceives_.begin();

for(; packetIter != bufferedReceives_.end(); ++packetIter)

{

Packet* pPacket = (*packetIter);

pPacketReader_->processMessages(pMsgHandlers, pPacket);

RECLAIM_PACKET(pPacket->isTCPPacket(), pPacket);

}

}catch(MemoryStreamException &)

{

Network::MessageHandler* pMsgHandler = pMsgHandlers->find(pPacketReader_->currMsgID());

WARNING_MSG(fmt::format("Channel::processPackets({}): packet invalid. currMsg=(name={}, id={}, len={}), currMsgLen={}\n",

this->c_str()

, (pMsgHandler == NULL ? "unknown" : pMsgHandler->name)

, pPacketReader_->currMsgID()

, (pMsgHandler == NULL ? -1 : pMsgHandler->msgLen)

, pPacketReader_->currMsgLen()));

pPacketReader_->currMsgID(0);

pPacketReader_->currMsgLen(0);

condemn();

}

bufferedReceives_.clear();

}

到這裏咱們又要弄清楚兩個問題,這個接口什麼時候被誰調用,調用又作了些什麼,爲了聯通整個流程,咱們仍是先弄清楚這個接口在哪被誰調用。

經過vs的「Find All References」功能我順利地找到了這個接口被調用的地方(network_interface.cpp):

clip_image002

void NetworkInterface::processChannels(KBEngine::Network::MessageHandlers* pMsgHandlers)

{

ChannelMap::iterator iter = channelMap_.begin();

for(; iter != channelMap_.end(); )

{

Network::Channel* pChannel = iter->second;

if(pChannel->isDestroyed())

{

++iter;

}

else if(pChannel->isCondemn())

{

++iter;

deregisterChannel(pChannel);

pChannel->destroy();

Network::Channel::ObjPool().reclaimObject(pChannel);

}

else

{

pChannel->processPackets(pMsgHandlers);

++iter;

}

}

}

同理,咱們得找到processChannels被調用的地方。(幾乎在每一個繼承自ServerApp的XXXApp的handleCheckStatusTick接口內都發現了相似下面的代碼):(loginapp.cpp)

void Loginapp::handleCheckStatusTick()

{

threadPool_.onMainThreadTick();

networkInterface().processChannels(&LoginappInterface::messageHandlers);

pendingLoginMgr_.process();

pendingCreateMgr_.process();

}

在XXXApp的handleTimeout的接口內咱們找到了下面的代碼:(loginapp.cpp)

void Loginapp::handleTimeout(TimerHandle handle, void * arg)

{

switch (reinterpret_cast<uintptr>(arg))

{

case TIMEOUT_CHECK_STATUS:

this->handleCheckStatusTick();

return;

default:

break;

}

ServerApp::handleTimeout(handle, arg);

}

根據ServerApp的父類TimerHandler的handleTimeout接口順利地找到了下面的代碼:(timer.inl)

template <class TIME_STAMP>

void TimersT< TIME_STAMP >::Time::triggerTimer()

{

if (!this->isCancelled())

{

state_ = TIME_EXECUTING;

pHandler_->handleTimeout( TimerHandle( this ), pUserData_ );

if ((interval_ == 0) && !this->isCancelled())

{

this->cancel();

}

}

if (!this->isCancelled())

{

time_ += interval_;

state_ = TIME_PENDING;

}

}

找到調用triggerTimer的地方:(timer.inl)

template <class TIME_STAMP>

int TimersT< TIME_STAMP >::process(TimeStamp now)

{

int numFired = 0;

while ((!timeQueue_.empty()) && (

timeQueue_.top()->time() <= now ||

timeQueue_.top()->isCancelled()))

{

Time * pTime = pProcessingNode_ = timeQueue_.top();

timeQueue_.pop();

if (!pTime->isCancelled())

{

++numFired;

pTime->triggerTimer();

}

if (!pTime->isCancelled())

{

timeQueue_.push( pTime );

}

else

{

delete pTime;

KBE_ASSERT( numCancelled_ > 0 );

--numCancelled_;

}

}

pProcessingNode_ = NULL;

lastProcessTime_ = now;

return numFired;

}

找到調用TimerHandler的process接口的地方:(event_dispatcher.cpp,是否是感受這個文件很熟悉。。。)

void EventDispatcher::processTimers()

{

numTimerCalls_ += pTimers_->process(timestamp());

}

最終終於找到了咱們失散多年的總體流程:(event_dispatcher.cpp)

int EventDispatcher::processOnce(bool shouldIdle)

{

if(breakProcessing_ != EVENT_DISPATCHER_STATUS_BREAK_PROCESSING)

breakProcessing_ = EVENT_DISPATCHER_STATUS_RUNNING;

this->processTasks();

if(breakProcessing_ != EVENT_DISPATCHER_STATUS_BREAK_PROCESSING){

this->processTimers();

}

this->processStats();

if(breakProcessing_ != EVENT_DISPATCHER_STATUS_BREAK_PROCESSING){

return this->processNetwork(shouldIdle);

}

return 0;

}

其實上面的這個過程有點小艱辛,花了好幾個小時才完工。。。其實看到Channel::addReceiveWindow就應該想到這裏的,由於這裏的bufferedReceives的push_back操做沒有加鎖(stl的vector是非線程安全的),因此他應該是和EventDispatcher::processNetwork接口在同一個線程的同步調用流程上,如此一來就只多是EventDispatcher::processTasks,EventDispatcher::processTimers,EventDispatcher::processStats中的一個了。

咱們接着回到以前的包處理的分支流程(就是Channel的processPackets接口)。

在packet_reader.cpp內咱們能夠找到PacketReader::processMesages接口(代碼太長,不貼了)。最終咱們能夠看到消息對應的handler的處理接口調用:

pMsgHandler->handle(pChannel_, *pFragmentStream_);

創建消息對應的handler的映射在XXXapp_interface.h中(各類宏,看的頭都大了,留到後面各個組件的單獨消息分析中再說)

至此,咱們能夠得出一個kbengine底層的大體流程的輪廓。

其實kbe裏面還有不少「輪子」能夠拿出來細說,像是單例,定時器,內存池,線程池,以及用來通信的序列化和反序列化(MemoryStream)機制。。。。。。

總得來說,大致由於底層網絡(select,epoll)的異步回調機制,使得整個流程都有點小亂的感受。「輪子」有點多,不利於利用現有的知識。一個高性能的c/c++項目幾乎總會用到一些語言的 奇淫技巧,不過對於分佈式的項目,既然咱們已經作好了不追求單點高性能的心理準備,就不該該再爲了追求性能而損失可讀性。在單點性能可接受的程度上提升代碼的可讀性,提升總體的水平擴展能力,纔是分佈式項目的正道。

1. machine

2. loginapp

3. dbmgr

4. caseapp

5. baseappmgr

6. cellapp

7. cellappmgr

腳本邏輯層:

各類工具:

相關文章
相關標籤/搜索