做者:Conygit
導語:微服務開源框架TARS的RPC調用包含客戶端與服務端,《微服務開源框架TARS的RPC源碼解析》系列文章將從初識客戶端、客戶端的同步及異步調用、初識服務端、服務端的工做流程四部分,以C++語言爲載體,深刻淺出地帶你瞭解TARS RPC調用的原理。github
TARS是騰訊使用十年的微服務開發框架,目前支持C++、Java、PHP、Node.js、Go語言。該開源項目爲用戶提供了涉及到開發、運維、以及測試的一整套微服務平臺PaaS解決方案,幫助一個產品或者服務快速開發、部署、測試、上線。目前該框架應用在騰訊各大核心業務,基於該框架部署運行的服務節點規模達到數十萬。web
TARS的通訊模型中包含客戶端和服務端。客戶端服務端之間主要是利用RPC進行通訊。本系列文章分上下兩篇,對RPC調用部分進行源碼解析。本文是下篇,咱們將以C++語言爲載體,帶你們瞭解一下TARS的服務端。緩存
在使用TARS構建RPC服務端的時候,TARS會幫你生成一個XXXServer類,這個類是繼承自Application類的,聲明變量XXXServer g_app,以及調用函數:安全
g_app.main(argc, argv); g_app.waitForShutdown();
即可以開啓TARS的RPC服務了。在開始剖析TARS的服務端代碼以前,先介紹幾個重要的類,讓你們有一個大體的認識。服務器
正如前面所言,一個服務端就是一個Application,Application幫助用戶讀取配置文件,根據配置文件初始化代理(假如這個服務端須要調用其餘服務,那麼就須要初始化代理了)與服務,新建以及啓動網絡線程與業務線程。網絡
TC_EpollServer纔是真正的服務端,若是把Application比做風扇,那麼TC_EpollServer就是那個馬達。TC_EpollServer掌管兩大模塊——網絡模塊與業務模塊,就是下面即將介紹的兩個類。多線程
表明着網絡模塊,內含TC_Epoller做爲IO複用,TC_Socket創建socket鏈接,ConnectionList記錄衆多對客戶端的socket鏈接。任何與網絡相關的數據收發都與NetThread有關。在配置文件中,利用/tars/application/server 下的netthread配置NetThread的個數app
表明着業務模塊,Handle是執行PRC服務的一個線程,而衆多Handle組成的HandleGroup就是同一個RPC服務的一組業務線程了。業務線程負責調用用戶定義好的服務代碼,並將處理結果放到發送緩存中等待網絡模塊發送,下文將會詳細講解業務線程如何調用用戶定義的代碼的,這裏用到了簡單的C++反射,這點在不少資料中都沒有被說起。在配置文件中,利用/tars/application/server/xxxAdapter 下的threads配置一個HandleGroup中的Handle(業務線程)的個數。框架
表明一個RPC服務實體,在配置文件中的/tars/application/server下面的xxxAdapter就是對BindAdapter的配置,一個BindAdapter表明一個服務實體,看其配置就知道BindAdapter的做用是什麼了,其表明一個RPC服務對外的監聽套接字,還聲明瞭鏈接的最大數量,接收隊列的大小,業務線程數,RPC服務名,所使用的協議等。
BindAdapter自己能夠認爲是一個服務的實例,能創建真實存在的監聽socket並對外服務,與網絡模塊NetThread以及業務模塊HandleGroup都有關聯,例如,多個NetThread的第一個線程負責對BindAdapter的listen socket進行監聽,有客戶鏈接到BindAdapter的listen socket就隨機在多個NetThread中選取一個,將鏈接放進被選中的NetThread的ConnectionList中。BindAdapter則一般會與一組HandleGroup進行關聯,該HandleGroup裏面的業務線程就執行BindAdapter對應的服務。可見,BindAdapter與網絡模塊以及業務模塊都有所關聯。
好了,介紹完這幾個類以後,經過類圖看看他們之間的關係:
服務端TC_EpollServer管理類圖中左側的網絡模塊與右側的業務模塊,前者負責創建與管理服務端的網絡關係,後者負責執行服務端的業務代碼,二者經過BindAdapter構成一個總體,對外進行RPC服務。
與客戶端同樣,服務端也須要進行初始化,來構建上面所說的總體,按照上面的介紹,能夠將初始化分爲兩模塊——網絡模塊的初始化與業務模塊的初始化。初始化的全部代碼在Application的void main()以及void waitForQuit()中,初始化包括屏蔽pipe信號,讀取配置文件等,這些將忽略不講,主要看看其如何經過epoll與創建listen socket來構建網絡部分,以及如何設置業務線程組構建業務部分。
在初始化網絡模塊與業務模塊以前,TC_EpollServer須要先初始化,主要代碼在:
void Application::main(int argc, char *argv[]) { ...... //初始化Server部分 initializeServer(); ...... }
在initializeServer()中會填充ServerConfig裏面的各個靜態成員變量,留待須要的時候取用。能夠看到有_epollServer = new TC_EpollServer(iNetThreadNum),服務端TC_EpollServer被建立出來,並且網絡線程NetThread也被創建出來了:
TC_EpollServer::TC_EpollServer(unsigned int iNetThreadNum) { if(_netThreadNum < 1) { _netThreadNum = 1; } //網絡線程的配置數目不能15個 if(_netThreadNum > 15) { _netThreadNum = 15; } for (size_t i = 0; i < _netThreadNum; ++i) { TC_EpollServer::NetThread* netThreads = new TC_EpollServer::NetThread(this); _netThreads.push_back(netThreads); } }
此後,其實有一個AdminAdapter被創建,但其與通常的RPC服務BindAdapter不一樣,這裏不展開介紹。
好了,TC_EpollServer被構建以後,如何給他安排左(網絡模塊)右(業務模塊)護法呢?
在講解網絡模塊以前,再認真地看看網絡模塊的相關類圖:
先看看Application中哪些代碼與網絡模塊的初始化有關吧:
void Application::main(int argc, char *argv[]) { ...... vector<TC_EpollServer::BindAdapterPtr> adapters; //綁定對象和端口 bindAdapter(adapters); ...... _epollServer->createEpoll(); } void Application::waitForShutdown() { waitForQuit(); ...... }
網絡部分的初始化,離不開創建各RPC服務的監聽端口(socket,bind,listen),接收客戶端的鏈接(accept),創建epoll等。那麼什麼時候何地調用這些函數呢?大體過程以下圖所示:
首先在Application::main()中,調用:
vector<TC_EpollServer::BindAdapterPtr> adapters; //綁定對象和端口 bindAdapter(adapters);
在Application::bindAdapter()創建一個個服務實體BindAdapter,經過讀取配置文件中的/tars/application/server下面的xxxAdapter來肯定服務實體BindAdapter的個數及不一樣服務實體的配置,而後再調用:
BindAdapterPtr bindAdapter = new BindAdapter(_epollServer.get()); _epollServer->bind(bindAdapter);
來肯定服務實體的listen socket。能夠看到,在TC_EpollServer::bind()中:
int TC_EpollServer::bind(TC_EpollServer::BindAdapterPtr &lsPtr) { int iRet = 0; for(size_t i = 0; i < _netThreads.size(); ++i) { if(i == 0) { iRet = _netThreads[i]->bind(lsPtr); } else { //當網絡線程中listeners沒有監聽socket時,list使用adapter中設置的最大鏈接數做爲初始化 _netThreads[i]->setListSize(lsPtr->getMaxConns()); } } return iRet; }
將上文TC_EpollServer的初始化時建立的網絡線程組中的第一條網絡線程負責建立並監聽服務實體的listen socket,那樣就能夠避免多線程監聽同一個fd的驚羣效應。
能夠看到,接下來繼續調用NetThread::bind(BindAdapterPtr &lsPtr),其負責作一些準備工做,實際建立socket的是在NetThread::bind(BindAdapterPtr &lsPtr)中執行的NetThread::bind(const TC_Endpoint &ep, TC_Socket &s):
void TC_EpollServer::NetThread::bind(const TC_Endpoint &ep, TC_Socket &s) { int type = ep.isUnixLocal()?AF_LOCAL:AF_INET; if(ep.isTcp()) { s.createSocket(SOCK_STREAM, type); } else { s.createSocket(SOCK_DGRAM, type); } if(ep.isUnixLocal()) { s.bind(ep.getHost().c_str()); } else { s.bind(ep.getHost(), ep.getPort()); } if(ep.isTcp() && !ep.isUnixLocal()) { s.listen(1024); s.setKeepAlive(); s.setTcpNoDelay(); //不要設置close wait不然http服務回包主動關閉鏈接會有問題 s.setNoCloseWait(); } s.setblock(false); }
執行到這裏,已經建立了服務實體BindAdapter的listen socket了,代碼退回到NetThread::bind(BindAdapterPtr &lsPtr)後,還能夠看到NetThread記錄fd其所負責監聽的BindAdapter:
_listeners[s.getfd()] = lsPtr;
下圖是對建立服務實體的listen socket的流程總結
代碼回到Application::main()中,經過執行:
_epollServer->createEpoll();
來讓TC_EpollServer在其掌管的網絡線程中創建epoll:
void TC_EpollServer::createEpoll() { for(size_t i = 0; i < _netThreads.size(); ++i) { _netThreads[i]->createEpoll(i+1); } //必須先等全部網絡線程調用createEpoll(),初始化list後,才能調用initUdp() for(size_t i = 0; i < _netThreads.size(); ++i) { _netThreads[i]->initUdp(); } }
代碼來到NetThread::createEpoll(uint32_t iIndex),這個函數能夠做爲網絡線程NetThread的初始化函數,在函數裏面創建了網絡線程的內存池,建立了epoll,還將上面建立的listen socket加入epoll中,固然只有第一條網絡線程纔有listen socket,此外還初始化了鏈接管理鏈表ConnectionList _list。看下圖對本流程的總結:
因爲NetThread是線程,須要執行其start()函數才能啓動線程。而這個工做不是在Application::main()中完成,而是在Application::waitForShutdown()中的Application::waitForQuit()完成,跟着下面的流程圖看代碼,就清楚明白了:
一樣,與網絡模塊同樣,在講解業務模塊以前,先認真地看看業務模塊的相關類圖:
在業務模塊初始化中,咱們須要理清楚兩個問題:業務模塊如何與用戶填充實現的XXXServantImp創建聯繫,從而使請求到來的時候,Handle可以調用用戶定義好的RPC方法?業務線程在什麼時候何地被啓動,如何等待着請求的到達?
看看Application中哪些代碼與業務模塊的初始化有關吧:
void Application::main(int argc, char *argv[]) { ...... vector<TC_EpollServer::BindAdapterPtr> adapters; bindAdapter(adapters); //業務應用的初始化 initialize(); //設置HandleGroup分組,啓動線程 for (size_t i = 0; i < adapters.size(); ++i) { string name = adapters[i]->getName(); string groupName = adapters[i]->getHandleGroupName(); if(name != groupName) { TC_EpollServer::BindAdapterPtr ptr = _epollServer->getBindAdapter(groupName); if (!ptr) { throw runtime_error("[TARS][adater `" + name + "` setHandle to group `" + groupName + "` fail!"); } } setHandle(adapters[i]); } //啓動業務處理線程 _epollServer->startHandle(); ...... }
在bindAdapter(adapters)與initialize()中解決了前面提到的第一個問題,剩下的代碼實現了handle業務線程組的建立與啓動。
如何進行關聯?先看看下面的代碼流程圖:
如何讓業務線程可以調用用戶自定義的代碼?這裏引入了ServantHelperManager,先簡單劇透一下,經過ServantHelperManager做爲橋樑,業務線程能夠經過BindAdapter的ID索引到服務ID,而後經過服務ID索引到用戶自定義的XXXServantImp類的生成器,有了生成器,業務線程就能夠生成XXXServantImp類並調用裏面的方法了。下面一步一步分析。
在Application::main()調用的Application::bindAdapter()中看到有下面的代碼:
for (size_t i = 0; i < adapterName.size(); i++) { …… string servant = _conf.get("/tars/application/server/" + adapterName[i] + "<servant>"); checkServantNameValid(servant, sPrefix); ServantHelperManager::getInstance()->setAdapterServant(adapterName[i], servant); …… }
舉個例子,adapterNamei爲MyDemo.StringServer.StringServantAdapter,而servant爲MyDemo.StringServer.StringServantObj,這些都是在配置文件中讀取的,前者是BindAdapter的ID,然後者是服務ID。在ServantHelperManager:: setAdapterServant()中,僅僅是執行:
void ServantHelperManager::setAdapterServant(const string &sAdapter, const string &sServant) { _adapter_servant[sAdapter] = sServant; _servant_adapter[sServant] = sAdapter; }
而這兩個成員變量僅僅是:
/** * Adapter包含的Servant(Adapter名稱:servant名稱) */ map<string, string> _adapter_servant; /** * Adapter包含的Servant(Servant名稱:Adapter名稱) */ map<string, string> _servant_adapter;
在這裏僅僅是做一個映射記錄,後續能夠經過BindAdapter的ID能夠索引到服務的ID,經過服務的ID能夠利用簡單的C++反射得出用戶實現的XXXServantImp類,從而獲得用戶實現的方法。
如何實現從服務ID到類的反射?一樣須要經過ServantHelperManager的幫助。在Application::main()中,執行完Application::bindAdapter()會執行initialize(),這是一個純虛函數,實際會執行派生類XXXServer的函數,相似:
void StringServer::initialize() { //initialize application here: //... addServant<StringServantImp>(ServerConfig::Application + "." + ServerConfig::ServerName + ".StringServantObj"); }
代碼最終會執行ServantHelperManager:: addServant<T>():
template<typename T> void addServant(const string &id,bool check = false) { if(check && _servant_adapter.end() == _servant_adapter.find(id)) { cerr<<"[TARS]ServantHelperManager::addServant "<< id <<" not find adapter.(maybe not conf in the web)"<<endl; throw runtime_error("[TARS]ServantHelperManager::addServant " + id + " not find adapter.(maybe not conf in the web)"); } _servant_creator[id] = new ServantCreation<T>(); }
其中參數const string& id是服務ID,例如上文的MyDemo.StringServer.StringServantObj,T是用戶填充實現的XXXServantImp類。
上面代碼的_servant_creatorid = new ServantCreation<T>()是函數的關鍵,_servant_creator是map<string, ServantHelperCreationPtr>,能夠經過服務ID索引到ServantHelperCreationPtr,而ServantHelperCreationPtr是什麼?是幫助咱們生成XXXServantImp實例的類生成器,這就是簡單的C++反射:
/** * Servant */ class ServantHelperCreation : public TC_HandleBase { public: virtual ServantPtr create(const string &s) = 0; }; typedef TC_AutoPtr<ServantHelperCreation> ServantHelperCreationPtr; ////////////////////////////////////////////////////////////////////////////// /** * Servant */ template<class T> struct ServantCreation : public ServantHelperCreation { ServantPtr create(const string &s) { T *p = new T; p->setName(s); return p; } };
以上就是經過服務ID生成相應XXXServantImp類的簡單反射技術,業務線程組裏面的業務線程只須要獲取到所需執行的業務的BindAdapter的ID,就能夠經過ServantHelperManager得到服務ID,有了服務ID就能夠獲取XXXServantImp類的生成器從而生成XXXServantImp類執行裏面由用戶定義好的RPC方法。如今從新看圖(2-8)就大體清楚整個流程了。
剩下的部分就是HandleGroup的建立,並將其與BindAdapter進行相互綁定關聯,同時也須要綁定到TC_EpollServer中,隨後建立/啓動HandleGroup下面的Handle業務線程,啓動Handle的過程涉及上文「將BindAdapter與用戶定義的方法關聯起來」提到的獲取服務類生成器。先看看大體的代碼流程圖:
在這裏分兩部分,第一部分是在Application::main()中執行下列代碼:
//設置HandleGroup分組,啓動線程 for (size_t i = 0; i < adapters.size(); ++i) { string name = adapters[i]->getName(); string groupName = adapters[i]->getHandleGroupName(); if(name != groupName) { TC_EpollServer::BindAdapterPtr ptr = _epollServer->getBindAdapter(groupName); if (!ptr) { throw runtime_error("[TARS][adater `" + name + "` setHandle to group `" + groupName + "` fail!"); } } setHandle(adapters[i]); }
遍歷在配置文件中定義好的每個BindAdapter(例如MyDemo.StringServer.StringServantAdapter),併爲其設置業務線程組HandleGroup,讓線程組的全部線程均可以執行該BindAdapter所對應的RPC方法。跟蹤代碼以下:
void Application::setHandle(TC_EpollServer::BindAdapterPtr& adapter) { adapter->setHandle<ServantHandle>(); }
注意,ServantHandle是Handle的派生類,就是業務處理線程類,隨後來到:
template<typename T> void setHandle() { _pEpollServer->setHandleGroup<T>(_handleGroupName, _iHandleNum, this); }
真正建立業務線程組HandleGroup以及組內的線程,並將線程組與BindAdapter,TC_EpollServer關聯起來的代碼在TC_EpollServer:: setHandleGroup()中:
/** * 建立一個handle對象組,若是已經存在則直接返回 * @param name * @return HandlePtr */ template<class T> void setHandleGroup(const string& groupName, int32_t handleNum, BindAdapterPtr adapter) { map<string, HandleGroupPtr>::iterator it = _handleGroups.find(groupName); if (it == _handleGroups.end()) { HandleGroupPtr hg = new HandleGroup(); hg->name = groupName; adapter->_handleGroup = hg; for (int32_t i = 0; i < handleNum; ++i) { HandlePtr handle = new T(); handle->setEpollServer(this); handle->setHandleGroup(hg); hg->handles.push_back(handle); } _handleGroups[groupName] = hg; it = _handleGroups.find(groupName); } it->second->adapters[adapter->getName()] = adapter; adapter->_handleGroup = it->second; }
在這裏,能夠看到業務線程組的建立:HandleGroupPtr hg = new HandleGroup();業務線程的建立:HandlePtr handle = new T()(T是ServantHandle);創建關係,例如BindAdapter與HandleGroup的相互關聯:it->second->adaptersadapter->getName() = adapter和adapter->_handleGroup = it->second。執行完上面的代碼,就能夠獲得下面的類圖了:
這裏再經過函數流程圖簡單複習一下上述代碼的流程,主要內容均在TC_EpollServer:: setHandleGroup()中:
隨着函數的層層退出,代碼從新來到Application::main()中,隨後執行:
//啓動業務處理線程 _epollServer->startHandle();
在TC_EpollServer::startHandle()中,遍歷TC_EpollServer控制的業務模塊HandleGroup中的全部業務線程組,並遍歷組內的各個Handle,執行其start()方法進行線程的啓動:
void TC_EpollServer::startHandle() { if (!_handleStarted) { _handleStarted = true; for (auto& kv : _handleGroups) { auto& hds = kv.second->handles; for (auto& handle : hds) { if (!handle->isAlive()) handle->start(); } } } }
因爲Handle是繼承自TC_Thread的,在執行Handle::start()中,會執行虛函數Handle::run(),在Handle::run()中主要是執行兩個函數,一個是ServantHandle::initialize(),另外一個是Handle::handleImp():
void TC_EpollServer::Handle::run() { initialize(); handleImp(); }
ServantHandle::initialize()的主要做用是取得用戶實現的RPC方法,其實現原理與上文(「2.2.3業務模塊的初始化」中的第1小點「將BindAdapter與用戶定義的方法關聯起來」)說起的同樣,藉助與其關聯的BindAdapter的ID號,以及ServantHelpManager,來查找到用戶填充實現的XXXServantImp類的生成器並生成XXXServantImp類的實例,將這個實例與服務名構成pair <string, ServantPtr>變量,放進map<string, ServantPtr> ServantHandle:: _servants中,等待業務線程Handle須要執行用戶自定義方法的時候,從map<string, ServantPtr> ServantHandle:: _servants中查找:
void ServantHandle::initialize() { map<string, TC_EpollServer::BindAdapterPtr>::iterator adpit; // 獲取本Handle所關聯的BindAdapter map<string, TC_EpollServer::BindAdapterPtr>& adapters = _handleGroup->adapters; // 遍歷全部BindAdapter for (adpit = adapters.begin(); adpit != adapters.end(); ++adpit) { // 藉助ServantHelperManager來獲取服務指針——XXXServantImp類的指針 ServantPtr servant = ServantHelperManager::getInstance()->create(adpit->first); // 將指針放進map<string, ServantPtr> ServantHandle:: _servants中 if (servant) { _servants[servant->getName()] = servant; } else { TLOGERROR("[TARS]ServantHandle initialize createServant ret null, for adapter `" + adpit->first + "`" << endl); } } ...... }
而Handle::handleImp()的主要做用是使業務線程阻塞在等待在條件變量上,在這裏,能夠看到_handleGroup->monitor.timedWait(_iWaitTime)函數,阻塞等待在條件變量上:
void TC_EpollServer::Handle::handleImp() { ...... struct timespec ts; while (!getEpollServer()->isTerminate()) { { TC_ThreadLock::Lock lock(_handleGroup->monitor); if (allAdapterIsEmpty() && allFilterIsEmpty()) { _handleGroup->monitor.timedWait(_iWaitTime); } } } ...... }
Handle線程經過條件變量來讓全部業務線程阻塞等待被喚醒 ,由於本章是介紹初始化,所以代碼解讀到這裏先告一段落,稍後再詳解服務端中的業務線程Handle被喚醒後,如何經過map<string, ServantPtr> ServantHandle:: _servants查找並執行業務。如今經過函數流程圖複習一下上述的代碼流程:
通過了初始化工做後,服務端就進入工做狀態了,服務端的工做線程分爲兩類,正如前面所介紹的網絡線程與業務線程,網絡線程負責接受客戶端的鏈接與收發數據,而業務線程則只關注執行用戶所定義的PRC方法,兩種線程在初始化的時候都已經執行start()啓動了。
大部分服務器都是按照accept()->read()->write()->close()的流程執行的,大體工做流程圖以下圖所示:
TARS的服務端也不例外。
斷定邏輯採用Epoll IO複用模型實現,每一條網絡線程NetThread都有一個TC_Epoller來作事件的收集、偵聽、分發。
正如前面所介紹,只有第一條網絡線程會執行鏈接的監聽工做,接受新的鏈接以後,就會構造一個Connection實例,並選擇處理這個鏈接的網絡線程。
請求被讀入後,將暫存在接收隊列中,並通知業務線程進行處理,在這裏,業務線程終於登場了,處理完請求後,將結果放到發送隊列。
發送隊列有數據,天然須要通知網絡線程進行發送,接收到發送通知的網絡線程會將響應發往客戶端。
TARS服務器的工做流程大體就是如此,如上圖所示的普通服務器工做流程沒有多大的區別,下面將按着接受客戶端鏈接,讀入RPC請求,處理RPC請求,發送RPC響應四部分逐一介紹介紹服務端的工做。
討論服務器接受請求,很明顯是從網絡線程(並且是網絡線程組的第一條網絡線程)的NetThread::run()開始分析,在上面說到的建立TC_Epoller並將監聽fd放進TC_Epoller的時候,執行的是:
_epoller.add(kv.first, H64(ET_LISTEN) | kv.first, EPOLLIN);
那麼從epoll_wait()返回的時候,epoll_event中的聯合體epoll_data將會是(ET_LISTEN | listen socket’fd),從中獲取高32位,就是ET_LISTEN,而後執行下面switch中case ET_LISTEN的分支
try { const epoll_event &ev = _epoller.get(i); uint32_t h = ev.data.u64 >> 32; switch(h) { case ET_LISTEN: { //監聽端口有請求 auto it = _listeners.find(ev.data.u32); if( it != _listeners.end()) { if(ev.events & EPOLLIN) { bool ret; do { ret = accept(ev.data.u32); }while(ret); } } } break; case ET_CLOSE: //關閉請求 break; case ET_NOTIFY: //發送通知 ...... break; case ET_NET: //網絡請求 ...... break; default: assert(true); } }
而ret = accept(ev.data.u32)的整個函數流程以下圖所示(ev.data.u32就是被激活的BindAdapter對應的監聽socket的fd):
在講解以前,先複習一下網絡線程相關類圖,以及經過圖解對accept有個大體的印象:
好了,跟着圖(2-14),如今從NetThread::run()的NetThread::accept(int fd)講起。
進入NetThread::accept(int fd),能夠看到代碼執行了:
//接收鏈接 TC_Socket s; s.init(fd, false, AF_INET); int iRetCode = s.accept(cs, (struct sockaddr *) &stSockAddr, iSockAddrSize);
經過TC_Socket::accept(),調用系統函數accept()接受了客戶端的辛辛苦苦三次握手來的socket鏈接,而後對客戶端的IP與端口進行打印以及檢查,並分析對應的BindAdapter是否過載,過載則關閉鏈接。隨後對客戶端socket進行設置:
cs.setblock(false); cs.setKeepAlive(); cs.setTcpNoDelay(); cs.setCloseWaitDefault();
到此,對應圖(2-16)的第一步——接受客戶端鏈接(流程以下圖所示),已經完成。
接下來是爲新來的客戶端socket建立一個Connection,在NetThread::accept(int fd)中,建立Connection的代碼以下:
int timeout = _listeners[fd]->getEndpoint().getTimeout()/1000; Connection *cPtr = new Connection(_listeners[fd].get(), fd, (timeout < 2 ? 2 : timeout), cs.getfd(), ip, port);
構造函數中的參數依次是,此次新客戶端所對應的BindAdapter指針,BindAdapter對應的listen socket的fd,超時時間,客戶端socket的fd,客戶端的ip以及端口。在Connection的構造函數中,經過fd也關聯其TC_Socket:
// 服務鏈接 Connection::Connection(TC_EpollServer::BindAdapter *pBindAdapter, int lfd, int timeout, int fd, const string& ip, uint16_t port) { ...... _sock.init(fd, true, AF_INET); }
那麼關聯TC_Socket以後,經過Connection實例就能夠操做的客戶端socket了。至此,對應圖(2-16)的第二步——爲客戶端socket建立Connection就完成了(流程以下圖所示)。
最後,就是爲這個Connection選擇一個網絡線程,將其加入網絡線程對應的ConnectionList,在NetThread::accept(int fd)中,執行:
//addTcpConnection(cPtr); _epollServer->addConnection(cPtr, cs.getfd(), TCP_CONNECTION);
TC_EpollServer::addConnection()的代碼以下所示:
void TC_EpollServer::addConnection(TC_EpollServer::NetThread::Connection * cPtr, int fd, int iType) { TC_EpollServer::NetThread* netThread = getNetThreadOfFd(fd); if(iType == 0) { netThread->addTcpConnection(cPtr); } else { netThread->addUdpConnection(cPtr); } }
看到,先爲Connection* cPtr選擇網絡線程,在流程圖中,被選中的網絡線程稱爲Chosen_NetThread。選網絡線程的函數是TC_EpollServer::getNetThreadOfFd(int fd),根據客戶端socket的fd求餘數獲得,具體代碼以下:
NetThread* getNetThreadOfFd(int fd) { return _netThreads[fd % _netThreads.size()]; }
接着調用被選中線程的NetThread::addTcpConnection()方法(或者
NetThread::addUdpConnection(),這裏只介紹TCP的方法),將Connection加入被選中網絡線程的ConnectionList中,最後會執行_epoller.add(cPtr->getfd(), cPtr->getId(), EPOLLIN | EPOLLOUT)將客戶端socket的fd加入本網絡線程的TC_Epoller中,讓本網絡線程負責對本客戶端的數據收發。至此對應圖(28)的第三步就執行完畢了(具體流程以下圖所示)。
討論服務器接收RPC請求,一樣從網絡線程的NetThread::run()開始分析,上面是進入switch中的case ET_LISTEN分支來接受客戶端的鏈接,那麼如今就是進入case ET_NET分支了,爲何是case ET_NET分支呢?由於上面提到,將客戶端socket的fd加入TC_Epoller來監聽其讀寫,採用的是_epoller.add(cPtr->getfd(), cPtr->getId(), EPOLLIN | EPOLLOUT),傳遞給函數的第二個參數是32位的整形cPtr->getId(),而函數的第二個參數要求必須是64位的整型,所以,這個參數將會是高32位是0,低32位是cPtr->getId()的64位整形。而第二個參數的做用是當該註冊的事件引發epoll_wait()退出的時候,會做爲激活事件epoll_event 結構體中的64位聯合體epoll_data_t data返回給用戶。所以,看下面NetThread::run()代碼:
try { const epoll_event &ev = _epoller.get(i); uint32_t h = ev.data.u64 >> 32; switch(h) { case ET_LISTEN: …… break; case ET_CLOSE: //關閉請求 break; case ET_NOTIFY: //發送通知 ...... break; case ET_NET: //網絡請求 processNet(ev); break; default: assert(true); } }
代碼中的h是64位聯合體epoll_data_t data的高32位,通過上面分析,客戶端socket若由於接收到數據而引發epoll_wait()退出的話,epoll_data_t data的高32位是0,低32位是cPtr->getId(),所以h將會是0。而ET_NET就是0,所以客戶端socket有數據來到的話,會執行case ET_NET分支。下面看看執行case ET_NET分支的函數流程圖。
收到RPC請求,進入到NetThread::processNet(),服務器須要知道是哪個客戶端socket被激活了,所以在NetThread::processNet()中執行:
void TC_EpollServer::NetThread::processNet(const epoll_event &ev) { uint32_t uid = ev.data.u32; Connection *cPtr = getConnectionPtr(uid); ...... }
正如上面說的,epoll_data_t data的高32位是0,低32位是cPtr->getId(),那麼獲取到uid以後,經過NetThread::getConnectionPtr()就能夠從ConnectionList中返回此時此刻所須要讀取RPC請求的Connection了。以後對獲取的Connection進行簡單的檢查工做,並看看epoll_event::events是不是EPOLLERR或者EPOLLHUP(具體流程以下圖所示)。
接着,就須要接收客戶端的請求數據了,有數據接收意味着epoll_event::events是EPOLLIN,看下面代碼,主要是NetThread::recvBuffer()讀取RPC請求數據,以及以及Connection:: insertRecvQueue()喚醒業務線程發送數據。
if(ev.events & EPOLLIN) //有數據須要讀取 { recv_queue::queue_type vRecvData; int ret = recvBuffer(cPtr, vRecvData); if(ret < 0) { delConnection(cPtr,true,EM_CLIENT_CLOSE); return; } if(!vRecvData.empty()) { cPtr->insertRecvQueue(vRecvData); } }
先看看NetThread::recvBuffer(),首先服務端會先建立一個線程安全隊列來承載接收到的數據recv_queue::queue_type vRecvData,再將剛剛獲取的Connection cPtr以及recv_queue::queue_type vRecvData做爲參數調用NetThread::recvBuffer(cPtr, vRecvData)。
而NetThread::recvBuffer()進一步調用Connection::recv()函數:
int NetThread::recvBuffer(NetThread::Connection *cPtr, recv_queue::queue_type &v) { return cPtr->recv(v); }
Connection::recv()會依照不一樣的傳輸層協議(若UDP傳輸,lfd==-1),執行不一樣的接收方法,例如TCP會執行:
iBytesReceived = ::read(_sock.getfd(), (void*)buffer, sizeof(buffer))
根據數據接收狀況,如收到FIN分節,errno==EAGAIN等執行不一樣的動做。若收到真實的請求信息包,會將接收到的數據放在string Connection::_recvbuffer中,而後調用Connection:: parseProtocol()。
在Connection:: parseProtocol()中會回調協議解析函數對接收到的數據進行檢驗,檢驗經過後,會構造線程安全隊列中的元素tagRecvData* recv,並將其放進線程安全隊列中:
tagRecvData* recv = new tagRecvData(); recv->buffer = std::move(ro); recv->ip = _ip; recv->port = _port; recv->recvTimeStamp = TNOWMS; recv->uid = getId(); recv->isOverload = false; recv->isClosed = false; recv->fd = getfd(); //收到完整的包纔算 this->_bEmptyConn = false; //收到完整包 o.push_back(recv);
到此,RPC請求數據已經被徹底獲取並放置在線程安全隊列中(具體過程以下圖所示)。
代碼運行至此,線程安全隊列裏面終於有RPC請求包數據了,能夠喚醒業務線程Handle進行處理了,代碼回到NetThread::processNet(),只要線程安全隊列非空,就執行Connection:: insertRecvQueue():
void NetThread::processNet(const epoll_event &ev) { ...... if(ev.events & EPOLLIN) //有數據須要讀取 { ...... if(!vRecvData.empty()) { cPtr->insertRecvQueue(vRecvData); } } ...... }
在Connection:: insertRecvQueue()中,會先對BindAdapter進行過載判斷,分爲未過載,半過載以及全過載三種狀況。若全過載會丟棄線程安全隊列中的全部RPC請求數據,不然會執行BindAdapter::insertRecvQueue()。
在BindAdapter::insertRecvQueue()中,代碼主要有兩個動做,第一個是將獲取到的RPC請求包放進BindAdapter的接收隊列——recv_queue _rbuffer中:
_rbuffer.push_back(vtRecvData)
第二個是喚醒等待條件變量的HandleGroup線程組:
_handleGroup->monitor.notify()
如今,服務端的網絡線程在接收RPC請求數據後,終於喚醒了業務線程(具體流程看下圖所示),接下來輪到業務模塊登場,看看如何處理RPC請求了。
與前文接收到請求數據後,喚醒業務線程組HandleGroup(就是剛剛纔介紹完的_handleGroup->monitor.notify())遙相呼應的地方是在「2.2.3業務模塊的初始化」第2小點「Handle業務線程的啓動」中提到的,在Handle::handleImp()函數中的_handleGroup->monitor.timedWait(_iWaitTime)。經過條件變量,業務線程組HandleGroup裏面的業務線程一塊兒阻塞等待着網絡線程對其發起喚醒。如今,終於對條件變量發起通知了,接下來將會如何處理請求呢?在這裏,須要先對2.2.3節進行復習,瞭解到ServantHandle::_servants裏面究竟承載着什麼。
好了,處理RPC請求分爲三步:構造請求上下文,調用用戶實現的方法處理請求,將響應數據包push到線程安全隊列中並通知網絡線程,具體函數流程以下圖所示,如今進一步分析:
當業務線程從條件變量上被喚醒以後,從其負責的BindAdapter中獲取請求數據:adapter->waitForRecvQueue(recv, 0),在BindAdapter::waitForRecvQueue()中,將從線程安全隊列recv_queue BindAdapter::_ rbuffer中獲取數據:
bool BindAdapter::waitForRecvQueue(tagRecvData* &recv, uint32_t iWaitTime) { bool bRet = false; bRet = _rbuffer.pop_front(recv, iWaitTime); if(!bRet) { return bRet; } return bRet; }
還記得在哪裏將數據壓入線程安全隊列的嗎?對,就在「2.3.2接收RPC請求」的第3點「線程安全隊列非空,喚醒業務線程發送」中。
接着,調用ServantHandle::handle()對接收到的RPC請求數據進行處理。
處理的第一步正如本節小標題所示——構造請求上下文,用的是ServantHandle::createCurrent():
void ServantHandle::handle(const TC_EpollServer::tagRecvData &stRecvData) { TarsCurrentPtr current = createCurrent(stRecvData); ...... }
在ServantHandle::createCurrent()中,先new出TarsCurrent實例,而後調用其initialize()方法,在TarsCurrent::initialize(const TC_EpollServer::tagRecvData &stRecvData, int64_t beginTime)中,將RPC請求包的內容放進請求上下文TarsCurrentPtr current中,後續只需關注這個請求上下文便可。另外能夠稍微關注一下,若採用TARS協議會使用TarsCurrent::initialize(const string &sRecvBuffer)將請求包的內容放進請求上下文中,不然直接採用memcpy()系統調用來拷貝內容。下面稍微總結一下這小節的流程:
當獲取到請求上下文以後,就須要對其進行處理了。
void ServantHandle::handle(const TC_EpollServer::tagRecvData &stRecvData) { // 構造請求上下文 TarsCurrentPtr current = createCurrent(stRecvData); if (!current) return; // 處理請求 if (current->getBindAdapter()->isTarsProtocol()) { handleTarsProtocol(current); } else { handleNoTarsProtocol(current); } }
本RPC框架支持TARS協議與非TARS協議,下面只會介紹對TARS協議的處理,對於非TARS協議,分析流程也是差很少,對非TARS協議協議感興趣的讀者能夠對比着來分析非TARS協議部分。在介紹以前,先看看服務相關的繼承體系,下面不要混淆這三個類了:
好了,如今重點放在ServantHandle::handleTarsProtocol(const TarsCurrentPtr ¤t)函數上面。先貼代碼:
void ServantHandle::handleTarsProtocol(const TarsCurrentPtr ¤t) { // 1-對請求上下文current進行預處理 // 2-尋找合適的服務servant map<string, ServantPtr>::iterator sit = _servants.find(current->getServantName()); if (sit == _servants.end()) { current->sendResponse(TARSSERVERNOSERVANTERR); return; } int ret = TARSSERVERUNKNOWNERR; string sResultDesc = ""; vector<char> buffer; try { //3-業務邏輯處理 ret = sit->second->dispatch(current, buffer); } catch(TarsDecodeException &ex) { …… } catch(TarsEncodeException &ex) { …… } catch(exception &ex) { …… } catch(...) { …… } //回送響應,第3小點再分析吧 …… }
進入函數中,會先對請求上下文進行預處理,例如set調用合法性檢查,染色處理等。隨後,就依據上下文中的服務名來獲取服務對象:map<string, ServantPtr>::iterator sit = _servants.find(current->getServantName()),_servants在「2.2.3業務模塊的初始化」第2小點「Handle業務線程的啓動」中被賦予內容,其key是服務ID(或者叫服務名),value是用戶實現的服務XXXServantImp實例指針。
隨後就能夠利用XXXServantImp實例指針來執行RPC請求了:ret = sit->second->dispatch(current, buffer),在Servant:: dispatch()(如圖(2-26)由於XXXServantImp是繼承自XXXServant,而XXXServant繼承自Servant,因此實際是執行Servant的方法)中,使用不一樣的協議會有不一樣的處理方式,這裏只介紹TARS協議的,調用了XXXServant::onDispatch(tars::TarsCurrentPtr _current, vector<char> &_sResponseBuffer)方法:
int Servant::dispatch(TarsCurrentPtr current, vector<char> &buffer) { int ret = TARSSERVERUNKNOWNERR; if (current->getFuncName() == "tars_ping") { //略 } else if (!current->getBindAdapter()->isTarsProtocol()) { //略 } else { TC_LockT<TC_ThreadRecMutex> lock(*this); ret = onDispatch(current, buffer); } return ret; }
XXXServant類就是執行Tars2Cpp的時候生成的,會依據用戶定義的tars文件來生成相應的純虛函數,以及onDispatch()方法,該方法的動做有:
上述步驟是按照默認的服務端自動回覆的思路去闡述,在實際中,用戶能夠關閉自動回覆功能(如:current->setResponse(false)),並自行發送回覆(如:servant->async_response_XXXAsync(current, ret, rStr))。到此,服務端已經執行了RPC方法,下面稍微總結一下本小節的內容:
處理完RPC請求,執行完RPC方法以後,須要將結果(下面代碼中的buffer)回送給客戶端:
void ServantHandle::handleTarsProtocol(const TarsCurrentPtr ¤t) { // 1-對請求上下文current進行預處理 // 2-尋找合適的服務servant //3-業務邏輯處理 //回送響應,本節分析 if (current->isResponse()) { current->sendResponse(ret, buffer, TarsCurrent::TARS_STATUS(), sResultDesc); } }
因爲業務與網絡是獨立開來的,網絡線程收到請求包以後利用條件變量來通知業務線程,而業務線程纔有什麼方式來通知網絡線程呢?由前面可知,網絡線程是阻塞在epoll中的,所以須要利用epoll來通知網絡線程。此次先看圖解總結,再分析代碼:
在ServantHandle::handleTarsProtocol()中,最後的一步就是回送響應包。數據包的回送經歷的步驟是:編碼響應信息——找出與接收請求信息的網絡線程,由於咱們須要通知他來幹活——將響應包放進該網絡線程的發送隊列——利用epoll的特性喚醒網絡線程,咱們重點看看NetThread::send():
void TC_EpollServer::NetThread::send(uint32_t uid, const string &s, const string &ip, uint16_t port) { if(_bTerminate) { return; } tagSendData* send = new tagSendData(); send->uid = uid; send->cmd = 's'; send->buffer = s; send->ip = ip; send->port = port; _sbuffer.push_back(send); //通知epoll響應, 有數據要發送 _epoller.mod(_notify.getfd(), H64(ET_NOTIFY), EPOLLOUT); }
到此,服務器中的業務模塊已經完成他的使命,後續將響應數據發給客戶端是網絡模塊的工做了。
獲取了請求,固然須要回覆響應,從上面知道業務模塊經過_epoller.mod(_notify.getfd(), H64(ET_NOTIFY), EPOLLOUT)通知網絡線程的,再加上以前分析「2.3.1接受客戶端請鏈接」以及「2.3.2接收RPC請求」的經驗,咱們知道,這裏必須從NetThread::run()開始講起,並且是進入case ET_NOTIFY分支:
try { const epoll_event &ev = _epoller.get(i); uint32_t h = ev.data.u64 >> 32; switch(h) { case ET_LISTEN: …… break; case ET_CLOSE: //關閉請求 break; case ET_NOTIFY: //發送通知 processPipe(); break; case ET_NET: //網絡請求 …… break; default: assert(true); } }
在NetThread::processPipe()中,先從線程安全隊列中取響應信息包:_sBufQueue.dequeue(sendp, false),這裏與「2.3.3處理RPC請求」的第3小點「將響應數據包push到線程安全隊列中並通知網絡線程」遙相呼應。而後從響應信息中取得與請求信息相對應的那個Connection的uid,利用uid獲取Connection:Connection *cPtr = getConnectionPtr(sendp->uid)。因爲Connection是聚合了TC_Socket的,後續經過Connection將響應數據回送給客戶端,具體流程以下圖所示:
這裏用圖解總結一下服務端的工做過程:
TARS能夠在考慮到易用性和高性能的同時快速構建系統並自動生成代碼,幫助開發人員和企業以微服務的方式快速構建本身穩定可靠的分佈式應用,從而令開發人員只關注業務邏輯,提升運營效率。多語言、敏捷研發、高可用和高效運營的特性使 TARS 成爲企業級產品。
《微服務開源框架TARS的RPC源碼解析》系列文章分上下兩篇,對RPC調用部分進行源碼解析。本文是下篇,咱們帶你們瞭解了一下TARS的服務端。歡迎閱讀上篇《初識TARS C++客戶端》
TARS微服務助您數字化轉型,歡迎訪問:
TARS官網:https://TarsCloud.org
TARS源碼:https://github.com/TarsCloud
獲取《TARS官方培訓電子書》:https://wj.qq.com/s2/6570357/3adb/
或掃碼獲取: