做者 hermanmysql
導語ios
本文源自herman的系列文章之一《鵝廠開源框架TARS之基礎組件》。相關代碼已按TARS開源社區最新版本更新。git
TARS開源框架庫裏面用C++實現了比較多的公用組件,這些組件通常統一放在 util
文件夾,在應用層也能夠自由使用,工欲善其事必先利其器,因此有必要把這些工具組件作了解,更好的使用,提升效率。接下來,本文將對以下TarsCpp組件進行分析:github
- 線程操做
- 智能指針
- DB操做
- 網絡操做
- 服務配置
- 仿函數
- Hash
- 異常處理
線程安全隊列: TC_ThreadQueue
先看下框架對TC_ThreadQueue
類的使用以下:算法
typedef TC_ThreadQueue<tagRecvData*, deque<tagRecvData*> > recv_queue; // 接收隊列 typedef TC_ThreadQueue<tagSendData*, deque<tagSendData*> > send_queue; // 發送隊列
TC_ThreadQueue
的實現比較簡單,在TARS的網絡層實現中能夠發現這個類比較重要,由於從框架中收到的網絡包都會加入到這個緩存隊列裏面,而後多業務線程 ServantHandle
會調用 waitForRecvQueue
從該隊列裏面取網絡數據包,而後調用 dispatch
調用協議消息對應的處理函數,先看下框架對 TC_ThreadQueue
的實現:sql
/** * @brief 線程安全隊列 */ template<typename T, typename D = deque<T> > class TC_ThreadQueue { public: TC_ThreadQueue():_size(0){}; public: typedef D queue_type; /** * @brief 從頭部獲取數據, 沒有數據拋異常 * * @param t * @return bool: true, 獲取了數據, false, 無數據 */ T front(); /** * @brief 從頭部獲取數據, 沒有數據則等待. * * @param t * @param millsecond(wait = true時才生效) 阻塞等待時間(ms) * 0 表示不阻塞 * -1 永久等待 * @param wait, 是否wait * @return bool: true, 獲取了數據, false, 無數據 */ bool pop_front(T& t, size_t millsecond = 0, bool wait = true); ... ... }
TC_ThreadQueue
使用了C++11標準庫中的<mutex>
和<condition_variable>
用於實現線程鎖和 wait,以下,看下隊列的成員函數:push_front
在隊列前面加入數據,數據庫
template<typename T, typename D> void TC_ThreadQueue<T, D>::push_front(const T& t, bool notify) { if(notify) { std::unique_lock<std::mutex> lock(_mutex); _cond.notify_one(); _queue.push_front(t); ++_size; } else { std::lock_guard<std::mutex> lock (_mutex); _queue.push_front(t); ++_size; } }
如上圖調用push_front
函數的時候調用 std::unique_lock<std::mutex> lock(_mutex)
加鎖 ,避免網絡層接收數據和業務層取同一隊列的數據衝突,_cond.notify_one()
通知等待在該鎖上某一個線程醒過來,調用該函數以前必須加鎖,由於有數據過來了,例如網絡層有線程須要取包並進行分發處理。緩存
再看一個成員函數pop_front
,從頭部獲取數據,沒有數據則等待。millisecond
阻塞等待時間(ms)安全
0
表示不阻塞-1
永久等待
template<typename T, typename D> bool TC_ThreadQueue<T, D>::pop_front(T& t, size_t millsecond, bool wait) { if(wait) { std::unique_lock<std::mutex> lock(_mutex); if (_queue.empty()) { if (millsecond == 0) { return false; } if (millsecond == (size_t) -1) { _cond.wait(lock); } else { //超時了 if (_cond.wait_for(lock, std::chrono::milliseconds(millsecond)) == std::cv_status::timeout) { return false; } } } if (_queue.empty()) { return false; } t = _queue.front(); _queue.pop_front(); assert(_size > 0); --_size; return true; } else { std::lock_guard<std::mutex> lock (_mutex); if (_queue.empty()) { return false; } t = _queue.front(); _queue.pop_front(); assert(_size > 0); --_size; return true; } }
BindAdapter::waitForRecvQueue
的函數就是調用了pop_front
函數,用於等待接收隊列,函數原型以下:服務器
bool TC_EpollServer::BindAdapter::waitForRecvQueue(uint32_t handleIndex, shared_ptr<RecvContext> &data) { bool bRet = getRecvQueue(handleIndex).pop_front(data); if (!bRet) { return bRet; } --_iRecvBufferSize; return bRet; }
這裏BindAdapter::waitForRecvQueue
用於業務線程在等待服務器監聽的適配器收到網絡包後進行業務包的處理,這裏傳入的handleIndex
表示接收隊列索引,獲取對應的_rbuffer
。
普通線程鎖: TC_ThreadLock
TC_ThreadLock
類的定義以下
typedef TC_Monitor<TC_ThreadMutex, TC_ThreadCond> TC_ThreadLock;
TC_Monitor
線程鎖監控模板類。一般線程鎖,都經過該類來使用,而不是直接用TC_ThreadMutex
、TC_ThreadRecMutex
。
類的定義template <class T, class P> class TC_Monitor
須要傳入兩個模板參數,TC_Monitor
包括如下成員變量:
mutable int _nnotify; // 上鎖的次數 mutable P _cond; // 條件變量 T _mutex; // 互斥鎖 /** * @brief 定義鎖控制對象 */ typedef TC_LockT<TC_Monitor<T, P> > Lock; typedef TC_TryLockT<TC_Monitor<T, P> > TryLock;
第一個參數 TC_ThreadMutex
表明線程鎖:同一個線程不能夠重複加鎖 ,包含成員變量
mutable std::mutex _mutex
延伸閱讀,這裏
tc_thread_mutex.h
還包括另一個循環鎖類TC_ThreadRecMutex
,即一個線程能夠加屢次鎖,定義以下:
// 定義於tc_monitor.h中 typedef TC_Monitor<TC_ThreadRecMutex, TC_ThreadCond> TC_ThreadRecLock;
第二個參數 TC_ThreadCond
表明線程信號條件類:全部鎖能夠在上面等待信號發生,包含線程條件成員變量:
mutable std::condition_variable_any _cond
結合實際的使用場景,TC_Monitor::timedWait()
會調用 TC_ThreadCond
對象的 timedWait
函數,下一步調用 chrono
庫的 milliseconds
;TC_ThreadCond::signal()
實現發送信號,等待在該條件上的一個線程會醒。
TC_LockT
類定義: template <typename T> class TC_LockT
鎖模板類,與其餘具體鎖配合使用,構造時候加鎖,析夠的時候解鎖。
TC_LockT
構造函數,傳入互斥量初始化成員變量 _mutex
,TC_LockT
構造函數實現:
TC_LockT(const T& mutex) : _mutex(mutex) { _mutex.lock(); _acquired = true; }
到這裏就能夠看出 TC_Monitor
定義的 typedef TC_LockT<TC_Monitor<T, P> > Lock
,這裏 Lock
類型的模板參數用的是 TC_Monitor
類。
實際使用場景以下:
Lock lock(*this);
TC_LockT
的構造函數,傳入參數 this
爲 TC_Monitor
的子類對象,TC_LockT
的構造函數調用_mutex.lock()
;實際就是調用了 TC_Monitor
對象的 lock
函數,TC_Monitor
的 lock
函數實現:
void lock() const { _mutex.lock(); _nnotify = 0; }
這裏 _mutex
爲 TC_ThreadMutex
對象,進一步調用了 TC_ThreadRecMutex::lock()
成員函數,實現以下:
void TC_ThreadMutex::lock() const { _mutex.lock(); }
而後上面定義的lock棧變量退出函數的時候調用 TC_LockT
的析構函數:實現以下:
virtual ~TC_LockT() { if (_acquired) { _mutex.unlock(); //這裏會調用TC_Monitor的unlock函數 } }
TC_Monitor
的 unlock
函數實現:
void unlock() const { notifyImpl(_nnotify); _mutex.unlock(); //這裏會調用C++標準庫<mutex>中的unlock }
這裏調用 notifyImpl
函數是由於 TC_Monitor
類不僅能夠實現簡單的互斥鎖功能,還能夠實現條件變量Condition功能,其中 notifyImpl
的實現爲
void notifyImpl(int nnotify) const { if(nnotify != 0) { if(nnotify == -1) { _cond.broadcast(); return; } else { while(nnotify > 0) { _cond.signal(); --nnotify; } } } }
線程基類: TC_Thread
仍是老樣子,先看下項目實際對線程基類的使用。實際項目使用中,咱們對 TC_Thread
又封裝了一下,實現了一個BasicThread
類,下面看下 BasicThread
的定義:
class BasicThread : public tars::TC_Thread, public tars::TC_ThreadLock { ... void terminate() { _bTerm = true; { Lock lock(*this); notifyAll(); } getThreadControl().join(); } }
BasicThread
類,繼承了 TC_Thread
和 TC_ThreadLock
,其中 TC_ThreadLock
第二點已經說明過了,因此這裏重點看下 TC_Thread
類的使用,TC_Thread
的定義
class TC_Thread : public TC_Runable { ... /** * 使用了C++11標準線程庫std::thread, 構造函數傳參數threadEntry線程函數, * 返回 TC_ThreadControl(_th),其中_th爲std::thread對象 */ TC_ThreadControl start(); static void threadEntry(TC_Thread *pThread); //靜態函數, 線程入口 virtual void run() = 0; ... }
下一步看下線程控制類 TC_ThreadControl
的定義:
class TC_ThreadControl { ... explicit TC_ThreadControl(std::thread *th); // 構造,傳入std::thread對象 void join(); // 調用std::thread的join()阻塞當前的線程,直到另一個線程運行結束 static void sleep(); // 調用std::this_thread::sleep函數線程將暫停執行 ... }
下一步看下 TC_Runable
的定義:
class TC_Runable { public: virtual ~TC_Runable(){}; virtual void run() = 0; //定義了run純虛函數 };
最後看下實際項目中對線程類的使用
class AntiSdkSyncThread : public BasicThread //這裏等於多繼承了TC_Thread和TC_ThreadLock兩個類 { void run() //實現基類的純虛函數 { Lock lock(*this); timedWait(10 * 1000); (間隔執行時間,實現了線程的定時執行功能) if(NULL != g_busi_interf) { Int32 ret = g_busi_interf->proc_(); //須要按期執行的函數 } } }
定義好了 AntiSdkSyncThread g_antiSdkSyncThread;
類,那麼須要啓動線程的時候執行g_antiSdkSyncThread.start();
就會天然建立線程,而且 threadEntry
線程函數會調用 pThread->run()
多態函數,進程退出的時候調用 g_antiSdkSyncThread.terminate();
。
智能指針類: TC_AutoPtr
這裏的智能指針能夠放在容器中,且線程安全的智能指針,CPP11標準庫的auto_ptr
是不能放在容器中的,貌似已經被淘汰了,目前多數使用CPP11標準庫的shared_ptr
,不過須要編譯器支持CPP11。
TC_HandleBase
智能指針基類的定義以下,全部須要智能指針的類都須要從該對象繼承,其中使用了C++11標準庫中的<atomic>
進行原子計數。
class UTIL_DLL_API TC_HandleBase { public: /** * @brief 複製 * * @return TC_HandleBase& */ TC_HandleBase& operator=(const TC_HandleBase&) { return *this; } /** * @brief 增長計數 */ void incRef() { ++_atomic; } /** * @brief 減小計數 */ void decRef() { if((--_atomic) == 0 && !_bNoDelete) { _bNoDelete = true; delete this; } } /** * @brief 獲取計數. * * @return int 計數值 */ int getRef() const { return _atomic; } /** * @brief 設置不自動釋放. * * @param b 是否自動刪除,true or false */ void setNoDelete(bool b) { _bNoDelete = b; } protected: /** * @brief 構造函數 */ TC_HandleBase() : _atomic(0), _bNoDelete(false) { } /** * @brief 拷貝構造 */ TC_HandleBase(const TC_HandleBase&) : _atomic(0), _bNoDelete(false) { } /** * @brief 析構 */ virtual ~TC_HandleBase() { } protected: std::atomic<int> _atomic; // 引用計數 bool _bNoDelete; // 是否自動刪除 };
下一步看 TC_AutoPtr
智能指針模板類,能夠放在容器中,且線程安全的智能指針,該智能指針經過引用計數實現,其構造函數和析構函數定義以下:
template<typename T> class TC_AutoPtr { TC_AutoPtr(T* p = 0) { _ptr = p; if(_ptr) { _ptr->incRef(); //構造函數 引用計算加1 } } ... ~TC_AutoPtr() { if(_ptr) { _ptr->decRef(); //析構函數 引用計算減1 } } }
例子:實戰項目使用
struct ConnStruct : public TC_HandleBase{...} typedef TC_AutoPtr<ConnStruct> ConnStructPtr;
TC_AutoPtr
拷貝構造調用 _ptr->incRef();
這裏 ptr
爲 ConnStruct
,ConnStruct
繼承於TC_HandleBase
,等於調用了TC_HandleBaseT<int>::incRef() {++_atomic;}
引用計數原子操做加一、析構引用計數原子操做減1,當引用計數減小到0時根據設置的開關是否要進行刪除來決定是否觸發delete。
例子:這是TARS使用異步rpc回調的典型例子,這裏回調類使用了智能指針
// 定義回調函數智能指針,其中SessionCallback父類繼承於TC_HandleBase typedef TC_AutoPtr<SessionCallback> SessionCallbackPtr; //建立回調類SessionCallbackPtr,並傳入初始化參數uin gameid等; SessionCallbackPtr cb = new SessionCallback(iUin, iGameId, iSeqID, iCmd,sSessionID, theServant, current, cs, this); //異步調用sessionserver遠程接口 getSessionPrx()->async_getSession(cb, iUin, iGameId);
接口返回完成,回調SessionCallback::callback_getSession(tars::Int32 ret, const MGComm::SessionValue& retValue)
函數,接收sessionserver
接口的返回的SessionValue
結構。
由於 SessionCallbackPtr
使用了智能指針,因此業務不須要去手動釋放前面 new
出來的 SessionCallbackPtr
,仍是比較方便的。
MySQL操做類: TC_Mysql
TC_Mysql封裝好的mysql操做類,非線程安全,對於 insert/update 能夠有更好的函數封裝,防止SQL注入
使用方式:
TC_Mysql mysql; //初始化mysql,init時不連接,請求時自動創建連接; //數據庫能夠爲空; //端口默認爲3306 mysql.init("192.168.1.2", "pc", "pc@sn", "db_tars_demo");
一般用:void init(const TC_DBConf& tcDBConf);
直接初始化數據庫。例如:stDirectMysql.init(_stZoneDirectDBConf);
看下TC_DBConf
的定義
struct TC_DBConf { string _host; string _user; string _password; string _database; string _charset; int _port; int _flag; //客戶端標識 TC_DBConf() : _port(0) , _flag(0) {} /** * @brief 讀取數據庫配置. * * @param mpParam 存放數據庫配置的map * dbhost: 主機地址 * dbuser:用戶名 * dbpass:密碼 * dbname:數據庫名稱 * dbport:端口 */ void loadFromMap(const map<string, string> &mpParam) { map<string, string> mpTmp = mpParam; _host = mpTmp["dbhost"]; _user = mpTmp["dbuser"]; _password = mpTmp["dbpass"]; _database = mpTmp["dbname"]; _charset = mpTmp["charset"]; _port = atoi(mpTmp["dbport"].c_str()); _flag = 0; if(mpTmp["dbport"] == "") { _port = 3306; } } };
//進一步看下獲取數據的使用 TC_Mysql::MysqlData data; data = mysql.queryRecord("select * from t_app_users"); for(size_t i = 0; i < data.size(); i++) { //若是不存在ID字段,則拋出異常 cout << data[i]["ID"] << endl; }
查詢出來的mysql數據用MysqlData
封裝
class MysqlData { ... vector<map<string, string> >& data(); ... }
//插入數據,指定數據的類型:數值 或 字符串,對於字符串會自動轉義 map<string, pair<TC_Mysql::FT, string> > m; m["ID"] = make_pair(TC_Mysql::DB_INT, "2334"); m["USERID"] = make_pair(TC_Mysql::DB_STR, "abcttt"); m["APP"] = make_pair(TC_Mysql::DB_STR, "abcapbbp"); m["LASTTIME"] = make_pair(TC_Mysql::DB_INT, "now()"); mysql.replaceRecord("t_user_logs", m);
網絡組件
整個TARS核心就提供一個很完善的網絡框架,包括RPC功能,這裏只介紹幾個經常使用的網絡組件。
TC_Socket : 封裝了socket的基本方法
提供socket的操做類;支持tcp/udp socket;支持本地域套接字。
再下一層TARS封裝了TC_TCPClient
和TC_UDPClient
兩個類用於實際操做tcp和udp應用。
使用方式:
例如:tcp客戶端
TC_TCPClient stRouterClient; stRouterClient.init(sIP, iPort, iTimeOut); // 這裏傳入ip和端口而後調用sendRecv進行消息的收發 Int32 ret = stRouterClient.sendRecv(request.c_str(), request.length(), recvBuf, iRecvLen);
注意多線程使用的時候,不能多線程同時send/recv,當心串包。
TC_Epoller
提供網絡epoll的操做類,默認是ET模式,當狀態發生變化的時候纔得到通知,提供add、mod、del、wait等基礎操做。
TC_ClientSocket : 客戶端socket相關操做基類
提供關鍵成員函數init(const string &sIp, int iPort, int iTimeout)
,傳入 IP 端口 和 超時時間
TC_TCPClient
繼承於 TC_ClientSocket
提供成員函數:
sendRecv
(發送到服務器, 從服務器返回不超過iRecvLen的字節)sendRecvBySep
( 發送倒服務器, 並等待服務器直到結尾字符, 包含結尾字符)
例子:
stRouterClient.init(sIP, iPort, iTimeOut); size_t iRecvLen = sizeof(recvBuf)-1; Int32 ret = stRouterClient.sendRecv(request.c_str(), request.length(), recvBuf, iRecvLen);
同理還有TC_UDPClient
實現UDP客戶端。
命令解析類: TC_Option
- 命令解析類;
- 一般用於解析命令行參數;
- 只支持雙—的參數形式
- 分析
main
的輸入參數,支持如下形式的參數:
./main.exe --name=value --param1 param2 param3
TC_Option op; //解析命令行 op.decode(argc, argv); //獲取成對的參數,即獲取 - - 表示的全部參數對 map<string, string> mp = op.getMulti(); //表示非 – 的參數:即 param2, param3 vector<string> d = op.getSingle();
若是value,param有空格或者 --
,用引號括起來就能夠了。
配置文件類: TC_Config
- 配置文件解析類(兼容wbl模式);
- 支持從string中解析配置文件;
- 支持生成配置文件;
- 解析出錯拋出異常;
- 採用[]獲取配置,若是無配置則拋出異常;
- 採用get獲取配置,不存在則返回空;
- 讀取配置文件是線程安全的,insert域等函數非線程安全
例子:
TC_Config config; config.parseFile(ServerConfig::BasePath + ServerConfig::ServerName + ".conf"); stTmpGameServerConfig.iGameId = TC_Common::strto<UInt32>(config["/Main/<GameId>"]);
配置文件樣例
<Main> GameId = 3001 ZoneId = 102 AsyncThreadCheckInterval = 1000 ... </Main>
使用get
方法例子:若是讀不到該配置,則返回默認值 sDefault
,即下面例子中的 20000000
stTmpGameServerConfig.iMaxRegNum = TC_Common::strto<Int32>(config.get("/Main/<MaxRegNum>", "20000000"));
通用仿函數類: TC_Functor
TC_Functor
參考loki
庫的設計
-
仿函數對象調用方式, 即對上述的幾種方式均可以在右側添加一對圓括號,並在括號內部放一組合適的參數來調用,例如
a(p1,p2);
-
把整個調用(包括參數)封裝一個函數對象, 調用對象創建時就傳入了參數,調用的時候不用傳入參數,例如
A a(p1, p2); a();
簡單又好用的封裝,具體見下面使用例子天然明白:
C函數調用
void TestFunction3(const string &s, int i){ cout << "TestFunction3('" << s << "', '" << i << "')" << endl; } //採用函數指針構造對象 TC_Functor<void, TL::TLMaker<const string&, int>::Result > cmd3(TestFunction3); string s3("s3"); cmd3(s3, 10);
C函數調用用wrapper封裝:
//調用封裝,構造的時候傳入參數 TC_Functor<void,TL::TLMaker<const string&, int>::Result>::wrapper_type fwrapper3(cmd3, s3, 10); fwrapper3(); //參數已經在構造的時候傳入,調用的時候不用傳參數了
說明:
void
: 函數的返回值TL::TLMaker<const string&, int>::Result
: 表明參數類型
對於調用的封裝,注意對於傳引用類型,具體的調用時候要保證引用的對象存在。
C++指向類成員函數的調用
struct TestMember { void mem3(const string &s, int i) { cout << "TestMember::mem3(" << s << "," << i << ") called" << endl; } } TC_Functor<void, TL::TLMaker<const string&, int>::Result > cmd3(&tm, &TestMember::mem3); cmd3("a", 33);
指向類成員函數的調用用wrapper封裝:
TC_Functor<void, TL::TLMaker<const string&, int>::Result >::wrapper_type fwrapper3(cmd3, "a", 10); fwrapper3();
實際例子:註冊協議解析器
服務初始化initialize的時候,通常會調用
addServantProtocol(sRouterObj, AppProtocol::parseStream<0, uint16_t, false>,iHeaderLen);
這裏設置BindAdapter
的協議解析函數 protocol_functor _pf
爲 parseStream
函數,以下:
/** * @param T * @param offset * @param netorder * @param in * @param out * @return int */ template<size_t offset, typename T, bool netorder> static TC_NetWorkBuffer::PACKET_TYPE parseStream(TC_NetWorkBuffer& in,vector<char>& out) { size_t len = offset + sizeof(T); if (in.getBufferLength() < len) { return TC_NetWorkBuffer::PACKET_LESS; } string header; in.getHeader(len, header); assert(header.size() == len); T iHeaderLen = 0; ::memcpy(&iHeaderLen, header.c_str() + offset, sizeof(T)); if (netorder) { iHeaderLen = net2host<T>(iHeaderLen); } //長度保護一下 if (iHeaderLen < (T)(len) || (uint32_t)iHeaderLen > TARS_NET_MAX_PACKAGE_SIZE) { return TC_NetWorkBuffer::PACKET_ERR; } if (in.getBufferLength() < (uint32_t)iHeaderLen) { return TC_NetWorkBuffer::PACKET_LESS; } in.getHeader(iHeaderLen, out); assert(out.size() == iHeaderLen); in.moveHeader(iHeaderLen); return TC_NetWorkBuffer::PACKET_FULL; }
註冊好解析函數以後,網絡層收包調用parseProtocol
函數
int TC_EpollServer::Connection::parseProtocol(TC_NetWorkBuffer &rbuf) { ... TC_NetWorkBuffer::PACKET_TYPE b = _pBindAdapter->getProtocol()(rbuf, ro); //這裏回調前面設置好的協議解析函數,從而實現協議解析 ... }
hash算法
util/tc_hash_fun.h
中包含了對hash算法的實現,使用 hash_new
,能夠對輸入的字節流進行hash獲得至關均勻的hash值,使用方式以下
#include "util/tc_hash_fun.h" #include <iterator> #include <iostream> #include <sys/time.h> using namespace tars; using namespace std; int main(int argc, char* *argv[]) { unsigned int i = tars::hash_new<string>()("abcd"); cout << i << endl; return 0; }
異常類: TC_Exception
class TC_Exception : public exception { /** * @brief 構造函數,提供了一個能夠傳入errno的構造函數, * 異常拋出時直接獲取的錯誤信息 * * @param buffer 異常的告警信息 * @param err 錯誤碼, 可用strerror獲取錯誤信息 */ TC_Exception(const string &buffer, int err); }
總結
本文介紹分析了TARS框架中用C++實現的公用基礎組件,加深對這些工具類基礎組件的理解,減小在使用這些組件過程當中產生的問題,提升開發效率。
TARS能夠在考慮到易用性和高性能的同時快速構建系統並自動生成代碼,幫助開發人員和企業以微服務的方式快速構建本身穩定可靠的分佈式應用,從而令開發人員只關注業務邏輯,提升運營效率。多語言、敏捷研發、高可用和高效運營的特性使 TARS 成爲企業級產品。
TARS微服務助您數字化轉型,歡迎訪問:
TARS官網:https://TarsCloud.org
TARS源碼:https://github.com/TarsCloud
獲取《TARS官方培訓電子書》:https://wj.qq.com/s2/6570357/3adb/
或掃碼獲取: