在分析 鏈接的創建過程 以前,先來看一下UDT API的用法。在UDT網絡中,一般要有一個UDT Server監聽在某臺機器的某個UDP端口上,等待客戶端的鏈接;有一個或多個客戶端鏈接UDT Server;UDT Server接收到來自客戶端的鏈接請求後,建立另一個單獨的UDT Socket用於與該客戶端進行通訊。ios
先來看一下UDT Server的簡單的實現,UDT的開發者已經提供了一些demo程序可供參考,位於app/目錄下。編程
#include <unistd.h> #include <cstdlib> #include <cstring> #include <netdb.h> #include <iostream> #include <udt.h> using namespace std; void* recvdata(void*); struct UDTUpDown { UDTUpDown() { // use this function to initialize the UDT library UDT::startup(); } ~UDTUpDown() { // use this function to release the UDT library UDT::cleanup(); } }; int main(int argc, char* argv[]) { if ((1 != argc) && ((2 != argc) || (0 == atoi(argv[1])))) { cout << "usage: appserver [server_port]" << endl; return 0; } // Automatically start up and clean up UDT module. UDTUpDown _udt_; addrinfo hints; addrinfo* res; memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_flags = AI_PASSIVE; hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; //hints.ai_socktype = SOCK_DGRAM; string service("9000"); if (2 == argc) service = argv[1]; if (0 != getaddrinfo(NULL, service.c_str(), &hints, &res)) { cout << "illegal port number or port is busy.\n" << endl; return 0; } UDTSOCKET serv = UDT::socket(res->ai_family, res->ai_socktype, res->ai_protocol); // UDT Options //UDT::setsockopt(serv, 0, UDT_CC, new CCCFactory<CUDPBlast>, sizeof(CCCFactory<CUDPBlast>)); //UDT::setsockopt(serv, 0, UDT_MSS, new int(9000), sizeof(int)); //UDT::setsockopt(serv, 0, UDT_RCVBUF, new int(10000000), sizeof(int)); //UDT::setsockopt(serv, 0, UDP_RCVBUF, new int(10000000), sizeof(int)); if (UDT::ERROR == UDT::bind(serv, res->ai_addr, res->ai_addrlen)) { cout << "bind: " << UDT::getlasterror().getErrorMessage() << endl; return 0; } freeaddrinfo(res); cout << "server is ready at port: " << service << endl; if (UDT::ERROR == UDT::listen(serv, 10)) { cout << "listen: " << UDT::getlasterror().getErrorMessage() << endl; return 0; } sockaddr_storage clientaddr; int addrlen = sizeof(clientaddr); UDTSOCKET recver; while (true) { if (UDT::INVALID_SOCK == (recver = UDT::accept(serv, (sockaddr*) &clientaddr, &addrlen))) { cout << "accept: " << UDT::getlasterror().getErrorMessage() << endl; return 0; } char clienthost[NI_MAXHOST]; char clientservice[NI_MAXSERV]; getnameinfo((sockaddr *) &clientaddr, addrlen, clienthost, sizeof(clienthost), clientservice, sizeof(clientservice), NI_NUMERICHOST | NI_NUMERICSERV); cout << "new connection: " << clienthost << ":" << clientservice << endl; pthread_t rcvthread; pthread_create(&rcvthread, NULL, recvdata, new UDTSOCKET(recver)); pthread_detach(rcvthread); } UDT::close(serv); return 0; } void* recvdata(void* usocket) { UDTSOCKET recver = *(UDTSOCKET*) usocket; delete (UDTSOCKET*) usocket; char* data; int size = 100000; data = new char[size]; while (true) { int rsize = 0; int rs; while (rsize < size) { int rcv_size; int var_size = sizeof(int); UDT::getsockopt(recver, 0, UDT_RCVDATA, &rcv_size, &var_size); if (UDT::ERROR == (rs = UDT::recv(recver, data + rsize, size - rsize, 0))) { cout << "recv:" << UDT::getlasterror().getErrorMessage() << endl; break; } rsize += rs; } if (rsize < size) break; } delete[] data; UDT::close(recver); return NULL; }
1. 如在 UDT協議實現分析——UDT初始化和銷燬 一文中提到的, 在調用任何UDT API以前,須要首先調用UDT::startup()來對庫作一個初始化,並在結束以後執行UDT::cleanup()作最後的清理。在這個示例中,是建立了一個helper類UDTUpDown來幫助作這些事情的。利用編程語言自己提供的構造-析夠機制來對UDT進行初始化和銷燬要比手動調用這些函數要可靠得多。api
2. 得到本地UDP端口的網絡地址。網絡
3. 調用UDT::socket()函數建立一個UDT Socket。這個Socket是UDT抽象出來的一個邏輯的Socket,咱們並不能利用這個Socket自己來收發數據。併發
4. 調用UDT::bind()將建立的UDT Socket綁定到本地端口的網絡地址。這一步將會把UDT的邏輯Socket與可以進行數據收發的系統UDP socket進行關聯,自此咱們就能夠利用UDT Socket進行收發數據了。app
5. 調用UDT::listen()告訴UDT,把這個UDT Socket作爲這個端口上的Listening Socket。一個UDP端口能夠被多個UDT Socket複用,在調用UDT::listen()以後,UDT就知道,在有其它節點鏈接這個UDP端口時,須要把相關的鏈接請求消息發送到哪一個UDT Socket的接收緩衝區了。less
對綁定到相同UDP端口的多個不一樣UDT Socket調用UDT::listen()時,UDT是如何處理的?咱們知道,一個UDP端口上最多隻能有一個listening Socket。dom
6. 調用UDT::accept()函數等待其它節點的鏈接。其它節點鏈接時,這個函數返回另一個單獨的UDT Socket,以用於與發起鏈接的節點進行通訊。socket
UDT::accept()函數返回的UDT Socket會被綁定到另外的一個不一樣的UDP端口,仍是會被綁定到listening Socket所綁定的UDP端口?編程語言
7. 使用UDT::accept()返回的UDT Socket,利用UDT::recv()與UDT::send()等函數同發起鏈接的節點進行數據傳輸。
8. 在再也不須要UDT Socket時,調用UDT::close()函數關掉它,以釋放資源。
而後再來看下UDT Client的實現:
#include <unistd.h> #include <cstdlib> #include <cstring> #include <netdb.h> #include <iostream> #include <udt.h> using namespace std; void* monitor(void*); struct UDTUpDown { UDTUpDown() { // use this function to initialize the UDT library UDT::startup(); } ~UDTUpDown() { // use this function to release the UDT library UDT::cleanup(); } }; int main(int argc, char* argv[]) { if ((3 != argc) || (0 == atoi(argv[2]))) { cout << "usage: appclient server_ip server_port" << endl; return 0; } // Automatically start up and clean up UDT module. UDTUpDown _udt_; struct addrinfo hints, *local, *peer; memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_flags = AI_PASSIVE; hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; //hints.ai_socktype = SOCK_DGRAM; if (0 != getaddrinfo(NULL, "9000", &hints, &local)) { cout << "incorrect network address.\n" << endl; return 0; } UDTSOCKET client = UDT::socket(local->ai_family, local->ai_socktype, local->ai_protocol); // UDT Options //UDT::setsockopt(client, 0, UDT_CC, new CCCFactory<CUDPBlast>, sizeof(CCCFactory<CUDPBlast>)); //UDT::setsockopt(client, 0, UDT_MSS, new int(9000), sizeof(int)); //UDT::setsockopt(client, 0, UDT_SNDBUF, new int(10000000), sizeof(int)); //UDT::setsockopt(client, 0, UDP_SNDBUF, new int(10000000), sizeof(int)); //UDT::setsockopt(client, 0, UDT_MAXBW, new int64_t(12500000), sizeof(int)); // for rendezvous connection, enable the code below /* UDT::setsockopt(client, 0, UDT_RENDEZVOUS, new bool(true), sizeof(bool)); if (UDT::ERROR == UDT::bind(client, local->ai_addr, local->ai_addrlen)) { cout << "bind: " << UDT::getlasterror().getErrorMessage() << endl; return 0; } */ freeaddrinfo(local); if (0 != getaddrinfo(argv[1], argv[2], &hints, &peer)) { cout << "incorrect server/peer address. " << argv[1] << ":" << argv[2] << endl; return 0; } // connect to the server, implict bind if (UDT::ERROR == UDT::connect(client, peer->ai_addr, peer->ai_addrlen)) { cout << "connect: " << UDT::getlasterror().getErrorMessage() << endl; return 0; } freeaddrinfo(peer); int size = 100000; char* data = new char[size]; pthread_create(new pthread_t, NULL, monitor, &client); for (int i = 0; i < 1000000; i++) { int ssize = 0; int ss; while (ssize < size) { if (UDT::ERROR == (ss = UDT::send(client, data + ssize, size - ssize, 0))) { cout << "send:" << UDT::getlasterror().getErrorMessage() << endl; break; } ssize += ss; } if (ssize < size) break; } UDT::close(client); delete[] data; return 0; } void* monitor(void* s) { UDTSOCKET u = *(UDTSOCKET*) s; UDT::TRACEINFO perf; cout << "SendRate(Mb/s)\tRTT(ms)\tCWnd\tPktSndPeriod(us)\tRecvACK\tRecvNAK" << endl; while (true) { sleep(1); if (UDT::ERROR == UDT::perfmon(u, &perf)) { cout << "perfmon: " << UDT::getlasterror().getErrorMessage() << endl; break; } cout << perf.mbpsSendRate << "\t\t" << perf.msRTT << "\t" << perf.pktCongestionWindow << "\t" << perf.usPktSndPeriod << "\t\t\t" << perf.pktRecvACK << "\t" << perf.pktRecvNAK << endl; } return NULL; }
能夠看到UDT Client鏈接UDT Server併發送數據的過程大致以下:
1. 一樣須要在調用任何UDT API以前,先調用UDT::startup()來對庫作初始化,並在結束以後執行UDT::cleanup()作最後的清理。這裏一樣使用helper類UDTUpDown來幫助作這些事情。
2. 得到UDT Server的網絡地址。
3. 調用UDT::socket()函數建立一個UDT Socket。這個Socket能夠與特定的本地地址綁定,也能夠不綁定。若是綁定,則發送數據時的出口地址就是該端口,若是不綁定,出口地址則是一個不肯定的值。
4. 調用UDT::connect()鏈接UDT Server。
5. 使用UDT Socket,利用UDT::recv()與UDT::send()等函數同UDT Server進行數據傳輸。
6. 在再也不須要UDT Socket時,調用UDT::close()函數關掉它,以釋放資源。
UDT基本的收發數據的API的用法大致如上面所示。接着咱們來看,這些函數是如何實現的。
不管是UDT Server要listening,仍是UDT client要鏈接UDT Server,在調用UDT::startup()初始化UDT以後,首先要作的事情都是調用UDT::socket()建立UDT Socket了。咱們就來看一下建立UDT Socket的過程:
UDTSOCKET CUDTUnited::newSocket(int af, int type) { if ((type != SOCK_STREAM) && (type != SOCK_DGRAM)) throw CUDTException(5, 3, 0); CUDTSocket* ns = NULL; try { ns = new CUDTSocket; ns->m_pUDT = new CUDT; if (AF_INET == af) { ns->m_pSelfAddr = (sockaddr*) (new sockaddr_in); ((sockaddr_in*) (ns->m_pSelfAddr))->sin_port = 0; } else { ns->m_pSelfAddr = (sockaddr*) (new sockaddr_in6); ((sockaddr_in6*) (ns->m_pSelfAddr))->sin6_port = 0; } } catch (...) { delete ns; throw CUDTException(3, 2, 0); } CGuard::enterCS(m_IDLock); ns->m_SocketID = --m_SocketID; CGuard::leaveCS(m_IDLock); ns->m_Status = INIT; ns->m_ListenSocket = 0; ns->m_pUDT->m_SocketID = ns->m_SocketID; ns->m_pUDT->m_iSockType = (SOCK_STREAM == type) ? UDT_STREAM : UDT_DGRAM; ns->m_pUDT->m_iIPversion = ns->m_iIPversion = af; ns->m_pUDT->m_pCache = m_pCache; // protect the m_Sockets structure. CGuard::enterCS(m_ControlLock); try { m_Sockets[ns->m_SocketID] = ns; } catch (...) { //failure and rollback CGuard::leaveCS(m_ControlLock); delete ns; ns = NULL; } CGuard::leaveCS(m_ControlLock); if (NULL == ns) throw CUDTException(3, 2, 0); return ns->m_SocketID; } UDTSOCKET CUDT::socket(int af, int type, int) { if (!s_UDTUnited.m_bGCStatus) s_UDTUnited.startup(); try { return s_UDTUnited.newSocket(af, type); } catch (CUDTException& e) { s_UDTUnited.setError(new CUDTException(e)); return INVALID_SOCK; } catch (bad_alloc&) { s_UDTUnited.setError(new CUDTException(3, 2, 0)); return INVALID_SOCK; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return INVALID_SOCK; } } UDTSOCKET socket(int af, int type, int protocol) { return CUDT::socket(af, type, protocol); }
調用流程爲,UDT::socket() -> CUDT::socket() -> CUDTUnited::newSocket()。
在CUDT::socket()中,咱們看到,它會首先檢查s_UDTUnited.m_bGCStatus,若發現UDT尚未初始化完成的話,則會調用s_UDTUnited.startup()進行初始化。這個地方對UDT狀態s_UDTUnited.m_bGCStatus的檢查沒有問題,但在發現UDT沒有初始化完成時調用s_UDTUnited.startup()彷佛並不恰當。這個地方調用了s_UDTUnited.startup(),那與此次調用相對應的cleanup()又在哪調用了呢?顯然在UDT內部是沒有。如果調用cleanup()的職責老是在UDT的使用者,那倒不如在這個地方返回錯誤給調用者,徹底讓調用者來管理UDT的生命週期,以儘量地避免資源泄漏。
建立UDT Socket的工做實際都在CUDTUnited::newSocket()函數中完成。能夠看到,在這個函數中主要作了以下的這樣一些事情:
1. 建立一個CUDTSocket對象ns,並建立一個CUDT對象被ns->m_pUDT引用。初始化ns對象的Self網絡地址,端口會被設置爲0。在UDT中使用CUDTSocket和CUDT共同來描述一個Socket。每一個UDT Socket都會有其相應的CUDTSocket對象和CUDT對象。
能夠看一下CUDTSocket類的定義,來了解它都描述了UDT Socket的哪些屬性(src/api.h):
class CUDTSocket { public: CUDTSocket(); ~CUDTSocket(); UDTSTATUS m_Status; // current socket state uint64_t m_TimeStamp; // time when the socket is closed int m_iIPversion; // IP version sockaddr* m_pSelfAddr; // pointer to the local address of the socket sockaddr* m_pPeerAddr; // pointer to the peer address of the socket UDTSOCKET m_SocketID; // socket ID UDTSOCKET m_ListenSocket; // ID of the listener socket; 0 means this is an independent socket UDTSOCKET m_PeerID; // peer socket ID int32_t m_iISN; // initial sequence number, used to tell different connection from same IP:port CUDT* m_pUDT; // pointer to the UDT entity std::set<UDTSOCKET>* m_pQueuedSockets; // set of connections waiting for accept() std::set<UDTSOCKET>* m_pAcceptSockets; // set of accept()ed connections pthread_cond_t m_AcceptCond; // used to block "accept" call pthread_mutex_t m_AcceptLock; // mutex associated to m_AcceptCond unsigned int m_uiBackLog; // maximum number of connections in queue int m_iMuxID; // multiplexer ID pthread_mutex_t m_ControlLock; // lock this socket exclusively for control APIs: bind/listen/connect private: CUDTSocket(const CUDTSocket&); CUDTSocket& operator=(const CUDTSocket&); };
這個類只提供了構造和析構兩個成員函數。還聲明瞭私有的copy構造函數和賦值操做符函數,但沒有定義它們,以免類對象的複製。
能夠再來看一下CUDTSocket的構造函數實現(src/api.h):
CUDTSocket::CUDTSocket() : m_Status(INIT), m_TimeStamp(0), m_iIPversion(0), m_pSelfAddr(NULL), m_pPeerAddr(NULL), m_SocketID(0), m_ListenSocket(0), m_PeerID(0), m_iISN(0), m_pUDT(NULL), m_pQueuedSockets(NULL), m_pAcceptSockets(NULL), m_AcceptCond(), m_AcceptLock(), m_uiBackLog(0), m_iMuxID(-1) { #ifndef WIN32 pthread_mutex_init(&m_AcceptLock, NULL); pthread_cond_init(&m_AcceptCond, NULL); pthread_mutex_init(&m_ControlLock, NULL); #else m_AcceptLock = CreateMutex(NULL, false, NULL); m_AcceptCond = CreateEvent(NULL, false, false, NULL); m_ControlLock = CreateMutex(NULL, false, NULL); #endif }
特別注意m_Status的初始化,該值被初始化爲了INIT。從狀態機的角度來看CUDTSocket,在它剛被new出來時,它處於INIT狀態。
CUDT類有兩個主要的職責,一是描述UDT Socket,包括全部的非靜態成員變量和非靜態成員函數,定義UDT Socket的大部分屬性和所能提供操做;二是提供API,包括絕大部分的static成員函數,這些函數將調用者與UDT內部的實現鏈接起來。CUDT類這樣的設計,明顯違背了OO的SRP單一職責原則,這多少仍是給代碼的閱讀帶來了必定的障礙。再來看一下CUDT的構造函數實現(src/core.cpp):
CUDT::CUDT() { m_pSndBuffer = NULL; m_pRcvBuffer = NULL; m_pSndLossList = NULL; m_pRcvLossList = NULL; m_pACKWindow = NULL; m_pSndTimeWindow = NULL; m_pRcvTimeWindow = NULL; m_pSndQueue = NULL; m_pRcvQueue = NULL; m_pPeerAddr = NULL; m_pSNode = NULL; m_pRNode = NULL; // Initilize mutex and condition variables initSynch(); // Default UDT configurations m_iMSS = 1500; m_bSynSending = true; m_bSynRecving = true; m_iFlightFlagSize = 25600; m_iSndBufSize = 8192; m_iRcvBufSize = 8192; //Rcv buffer MUST NOT be bigger than Flight Flag size m_Linger.l_onoff = 1; m_Linger.l_linger = 180; m_iUDPSndBufSize = 65536; m_iUDPRcvBufSize = m_iRcvBufSize * m_iMSS; m_iSockType = UDT_STREAM; m_iIPversion = AF_INET; m_bRendezvous = false; m_iSndTimeOut = -1; m_iRcvTimeOut = -1; m_bReuseAddr = true; m_llMaxBW = -1; m_pCCFactory = new CCCFactory<CUDTCC>; m_pCC = NULL; m_pCache = NULL; // Initial status m_bOpened = false; m_bListening = false; m_bConnecting = false; m_bConnected = false; m_bClosing = false; m_bShutdown = false; m_bBroken = false; m_bPeerHealth = true; m_ullLingerExpiration = 0; } void CUDT::initSynch() { #ifndef WIN32 pthread_mutex_init(&m_SendBlockLock, NULL); pthread_cond_init(&m_SendBlockCond, NULL); pthread_mutex_init(&m_RecvDataLock, NULL); pthread_cond_init(&m_RecvDataCond, NULL); pthread_mutex_init(&m_SendLock, NULL); pthread_mutex_init(&m_RecvLock, NULL); pthread_mutex_init(&m_AckLock, NULL); pthread_mutex_init(&m_ConnectionLock, NULL); #else m_SendBlockLock = CreateMutex(NULL, false, NULL); m_SendBlockCond = CreateEvent(NULL, false, false, NULL); m_RecvDataLock = CreateMutex(NULL, false, NULL); m_RecvDataCond = CreateEvent(NULL, false, false, NULL); m_SendLock = CreateMutex(NULL, false, NULL); m_RecvLock = CreateMutex(NULL, false, NULL); m_AckLock = CreateMutex(NULL, false, NULL); m_ConnectionLock = CreateMutex(NULL, false, NULL); #endif }
都是成員變量的初始化,後續再來詳細瞭解這些成員變量的做用。
2. 爲Socket分配SocketID,其值爲CUDTUnited的m_SocketID遞減的結果。m_SocketID在CUDTUnited的構造函數中初始化:
CUDTUnited::CUDTUnited() : m_Sockets(), m_ControlLock(), m_IDLock(), m_SocketID(0), m_TLSError(), m_mMultiplexer(), m_MultiplexerLock(), m_pCache(NULL), m_bClosing(false), m_GCStopLock(), m_GCStopCond(), m_InitLock(), m_iInstanceCount(0), m_bGCStatus(false), m_GCThread(), m_ClosedSockets() { // Socket ID MUST start from a random value srand((unsigned int) CTimer::getTime()); m_SocketID = 1 + (int) ((1 << 30) * (double(rand()) / RAND_MAX)); #ifndef WIN32 pthread_mutex_init(&m_ControlLock, NULL); pthread_mutex_init(&m_IDLock, NULL); pthread_mutex_init(&m_InitLock, NULL); #else m_ControlLock = CreateMutex(NULL, false, NULL); m_IDLock = CreateMutex(NULL, false, NULL); m_InitLock = CreateMutex(NULL, false, NULL); #endif #ifndef WIN32 pthread_key_create(&m_TLSError, TLSDestroy); #else m_TLSError = TlsAlloc(); m_TLSLock = CreateMutex(NULL, false, NULL); #endif m_pCache = new CCache<CInfoBlock>; }
m_SocketID的初始值是一個隨機數。
3. 初始化ns及它的CUDT對象的一些成員變量。特別注意ns->m_Status的賦值,這裏該值被賦爲了INIT。從狀態機的角度來看待CUDTSocket,在執行UDT Socket建立結束執行時,它處於INIT狀態下。
4. 將ns放在std::map<UDTSOCKET, CUDTSocket*> m_Sockets中。
5. 將UDT socket的SocketID返回給調用者。
這個接口直接返回一個表示UDT Socket的類對象,而不是一個handle,以便於調用者在調用UDT API時無需每次都輸入UDT Socket handle參數,這樣的API設計相對於UDT的這種API設計,有什麼樣的優缺點?
在前面UDT Server和UDT Client的demo程序中,咱們有看到,全部的UDT API都會在出錯時返回一個錯誤碼,好比UDT::bind()、UDT::bind2()、UDT::listen()和UDT::connect()等返回值類型爲int的函數,在出錯時返回UDT::ERROR,UDT::socket()和UDT::accept()等返回值類型爲UDTSOCKET的函數在出錯時返回UDT::INVALID_SOCK。
UDT API的調用者在檢測到它們返回了錯誤值時,經過調用UDT::getlasterror()函數來獲取關於異常更加詳細的信息。這裏咱們就來看一下這套機制的實現。
注意看CUDT::socket()函數的實現。在這個函數中,會將實際建立UDT Socket的任務委託給s_UDTUnited.newSocket(),但這個調用會被包在一個try-catch塊中。在s_UDTUnited.newSocket()函數執行的過程當中發生任何的異常,都會先被CUDT::socket()捕獲。CUDT::socket()函數在捕獲這些異常以後,會根據捕獲的異常的類型建立不一樣的CUDTException對象,並經過調用CUDTUnited::setError()函數將該CUDTException對象設置給s_UDTUnited。
咱們來看一下CUDTUnited::setError()函數的實現(src/api.cpp):
void CUDTUnited::setError(CUDTException* e) { #ifndef WIN32 delete (CUDTException*) pthread_getspecific(m_TLSError); pthread_setspecific(m_TLSError, e); #else CGuard tg(m_TLSLock); delete (CUDTException*)TlsGetValue(m_TLSError); TlsSetValue(m_TLSError, e); m_mTLSRecord[GetCurrentThreadId()] = e; #endif }
在這個函數中,會首先取出線程局部存儲變量m_TLSError中保存的本線程上一次建立的 CUDTException對象,將其delete掉,並將本次建立的CUDTException對象設置進去。CUDTUnited類定義中m_TLSError的聲明(src/api.h):
private: pthread_key_t m_TLSError; // thread local error record (last error) #ifndef WIN32 static void TLSDestroy(void* e) { if (NULL != e) delete (CUDTException*) e; } #else std::map<DWORD, CUDTException*> m_mTLSRecord; void checkTLSValue(); pthread_mutex_t m_TLSLock; #endif
在CUDTUnited構造函數中也能夠看到對這個對象的初始化。 CUDTUnited::TLSDestroy()函數是m_TLSError的析構函數,在m_TLSError最後被銷燬時,這個函數被調用,以便於釋放搜有還未釋放的資源。
再來看UDT API的調用者獲取上一次發生的異常的函數UDT::getlasterror():
CUDTException* CUDTUnited::getError() { #ifndef WIN32 if (NULL == pthread_getspecific(m_TLSError)) pthread_setspecific(m_TLSError, new CUDTException); return (CUDTException*) pthread_getspecific(m_TLSError); #else CGuard tg(m_TLSLock); if(NULL == TlsGetValue(m_TLSError)) { CUDTException* e = new CUDTException; TlsSetValue(m_TLSError, e); m_mTLSRecord[GetCurrentThreadId()] = e; } return (CUDTException*)TlsGetValue(m_TLSError); #endif } CUDTException& CUDT::getlasterror() { return *s_UDTUnited.getError(); } ERRORINFO& getlasterror() { return CUDT::getlasterror(); }
調用過程爲UDT::getlasterror() -> CUDT::getlasterror() -> CUDTUnited::getError()。
主要就是從s_UDTUnited的線程局部存儲變量m_TLSError中取出前面設置的本線程上次建立的CUDTException對象返回給調用者。
總結一下,UDT的使用者在調用UDT API時,UDT API會直接調用CUDT類對應的static API函數,在CUDT類的這些static API函數中會將作實際事情的工做委託給s_UDTUnited的相應函數,但這個委託調用會被包在一個try-catch block中。s_UDTUnited的函數在遇到異常狀況時拋出異常,CUDT類的static API函數捕獲異常,根據捕獲到的異常的具體類型,建立不一樣的CUDTException對象設置給s_UDTUnited的線程局部存儲變量m_TLSError中並向UDT API調用者返回錯誤碼,UDT API的調用者檢測到錯誤碼後,經過UDT::getlasterror()獲取存儲在m_TLSError中的異常。
此處能夠看到,CUDT提供的這一層API,一個比較重要的做用大概就是作異常處理了。
在UDT中,使用CUDTException來描述全部出現的異常。能夠看一下這個類的定義(src/udt.h):
class UDT_API CUDTException { public: CUDTException(int major = 0, int minor = 0, int err = -1); CUDTException(const CUDTException& e); virtual ~CUDTException(); // Functionality: // Get the description of the exception. // Parameters: // None. // Returned value: // Text message for the exception description. virtual const char* getErrorMessage(); // Functionality: // Get the system errno for the exception. // Parameters: // None. // Returned value: // errno. virtual int getErrorCode() const; // Functionality: // Clear the error code. // Parameters: // None. // Returned value: // None. virtual void clear(); private: int m_iMajor; // major exception categories // 0: correct condition // 1: network setup exception // 2: network connection broken // 3: memory exception // 4: file exception // 5: method not supported // 6+: undefined error int m_iMinor; // for specific error reasons int m_iErrno; // errno returned by the system if there is any std::string m_strMsg; // text error message std::string m_strAPI; // the name of UDT function that returns the error std::string m_strDebug; // debug information, set to the original place that causes the error public: // Error Code static const int SUCCESS; static const int ECONNSETUP; static const int ENOSERVER; static const int ECONNREJ; static const int ESOCKFAIL; static const int ESECFAIL; static const int ECONNFAIL; static const int ECONNLOST; static const int ENOCONN; static const int ERESOURCE; static const int ETHREAD; static const int ENOBUF; static const int EFILE; static const int EINVRDOFF; static const int ERDPERM; static const int EINVWROFF; static const int EWRPERM; static const int EINVOP; static const int EBOUNDSOCK; static const int ECONNSOCK; static const int EINVPARAM; static const int EINVSOCK; static const int EUNBOUNDSOCK; static const int ENOLISTEN; static const int ERDVNOSERV; static const int ERDVUNBOUND; static const int ESTREAMILL; static const int EDGRAMILL; static const int EDUPLISTEN; static const int ELARGEMSG; static const int EINVPOLLID; static const int EASYNCFAIL; static const int EASYNCSND; static const int EASYNCRCV; static const int ETIMEOUT; static const int EPEERERR; static const int EUNKNOWN; };
這個class主要經過Major錯誤碼和Minor錯誤碼來描述異常狀況,若是是調用系統調用出錯了,還會用Errno值。
具體看一下這個class的實現,特別是CUDTException::getErrorMessage()函數,來了解每個錯誤碼所表明的含義:
CUDTException::CUDTException(int major, int minor, int err) : m_iMajor(major), m_iMinor(minor) { if (-1 == err) #ifndef WIN32 m_iErrno = errno; #else m_iErrno = GetLastError(); #endif else m_iErrno = err; } CUDTException::CUDTException(const CUDTException& e) : m_iMajor(e.m_iMajor), m_iMinor(e.m_iMinor), m_iErrno(e.m_iErrno), m_strMsg() { } CUDTException::~CUDTException() { } const char* CUDTException::getErrorMessage() { // translate "Major:Minor" code into text message. switch (m_iMajor) { case 0: m_strMsg = "Success"; break; case 1: m_strMsg = "Connection setup failure"; switch (m_iMinor) { case 1: m_strMsg += ": connection time out"; break; case 2: m_strMsg += ": connection rejected"; break; case 3: m_strMsg += ": unable to create/configure UDP socket"; break; case 4: m_strMsg += ": abort for security reasons"; break; default: break; } break; case 2: switch (m_iMinor) { case 1: m_strMsg = "Connection was broken"; break; case 2: m_strMsg = "Connection does not exist"; break; default: break; } break; case 3: m_strMsg = "System resource failure"; switch (m_iMinor) { case 1: m_strMsg += ": unable to create new threads"; break; case 2: m_strMsg += ": unable to allocate buffers"; break; default: break; } break; case 4: m_strMsg = "File system failure"; switch (m_iMinor) { case 1: m_strMsg += ": cannot seek read position"; break; case 2: m_strMsg += ": failure in read"; break; case 3: m_strMsg += ": cannot seek write position"; break; case 4: m_strMsg += ": failure in write"; break; default: break; } break; case 5: m_strMsg = "Operation not supported"; switch (m_iMinor) { case 1: m_strMsg += ": Cannot do this operation on a BOUND socket"; break; case 2: m_strMsg += ": Cannot do this operation on a CONNECTED socket"; break; case 3: m_strMsg += ": Bad parameters"; break; case 4: m_strMsg += ": Invalid socket ID"; break; case 5: m_strMsg += ": Cannot do this operation on an UNBOUND socket"; break; case 6: m_strMsg += ": Socket is not in listening state"; break; case 7: m_strMsg += ": Listen/accept is not supported in rendezous connection setup"; break; case 8: m_strMsg += ": Cannot call connect on UNBOUND socket in rendezvous connection setup"; break; case 9: m_strMsg += ": This operation is not supported in SOCK_STREAM mode"; break; case 10: m_strMsg += ": This operation is not supported in SOCK_DGRAM mode"; break; case 11: m_strMsg += ": Another socket is already listening on the same port"; break; case 12: m_strMsg += ": Message is too large to send (it must be less than the UDT send buffer size)"; break; case 13: m_strMsg += ": Invalid epoll ID"; break; default: break; } break; case 6: m_strMsg = "Non-blocking call failure"; switch (m_iMinor) { case 1: m_strMsg += ": no buffer available for sending"; break; case 2: m_strMsg += ": no data available for reading"; break; default: break; } break; case 7: m_strMsg = "The peer side has signalled an error"; break; default: m_strMsg = "Unknown error"; } // Adding "errno" information if ((0 != m_iMajor) && (0 < m_iErrno)) { m_strMsg += ": "; #ifndef WIN32 char errmsg[1024]; if (strerror_r(m_iErrno, errmsg, 1024) == 0) m_strMsg += errmsg; #else LPVOID lpMsgBuf; FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, m_iErrno, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPTSTR)&lpMsgBuf, 0, NULL); m_strMsg += (char*)lpMsgBuf; LocalFree(lpMsgBuf); #endif } // period #ifndef WIN32 m_strMsg += "."; #endif return m_strMsg.c_str(); } int CUDTException::getErrorCode() const { return m_iMajor * 1000 + m_iMinor; } void CUDTException::clear() { m_iMajor = 0; m_iMinor = 0; m_iErrno = 0; } const int CUDTException::SUCCESS = 0; const int CUDTException::ECONNSETUP = 1000; const int CUDTException::ENOSERVER = 1001; const int CUDTException::ECONNREJ = 1002; const int CUDTException::ESOCKFAIL = 1003; const int CUDTException::ESECFAIL = 1004; const int CUDTException::ECONNFAIL = 2000; const int CUDTException::ECONNLOST = 2001; const int CUDTException::ENOCONN = 2002; const int CUDTException::ERESOURCE = 3000; const int CUDTException::ETHREAD = 3001; const int CUDTException::ENOBUF = 3002; const int CUDTException::EFILE = 4000; const int CUDTException::EINVRDOFF = 4001; const int CUDTException::ERDPERM = 4002; const int CUDTException::EINVWROFF = 4003; const int CUDTException::EWRPERM = 4004; const int CUDTException::EINVOP = 5000; const int CUDTException::EBOUNDSOCK = 5001; const int CUDTException::ECONNSOCK = 5002; const int CUDTException::EINVPARAM = 5003; const int CUDTException::EINVSOCK = 5004; const int CUDTException::EUNBOUNDSOCK = 5005; const int CUDTException::ENOLISTEN = 5006; const int CUDTException::ERDVNOSERV = 5007; const int CUDTException::ERDVUNBOUND = 5008; const int CUDTException::ESTREAMILL = 5009; const int CUDTException::EDGRAMILL = 5010; const int CUDTException::EDUPLISTEN = 5011; const int CUDTException::ELARGEMSG = 5012; const int CUDTException::EINVPOLLID = 5013; const int CUDTException::EASYNCFAIL = 6000; const int CUDTException::EASYNCSND = 6001; const int CUDTException::EASYNCRCV = 6002; const int CUDTException::ETIMEOUT = 6003; const int CUDTException::EPEERERR = 7000; const int CUDTException::EUNKNOWN = -1;
Done。