UDT Server啓動以後,基於UDT協議的UDP數據可靠傳輸才成爲可能,於是接下來分析與UDT Server有關的幾個主要API的實現,來了解下UDT Server是如何listening在特定UDP端口上的。主要有UDT::bind(),UDT::listen()和UDT::accept()等幾個函數。 node
一般UDT Server在建立UDT Socket以後,首先就要調用UDT::bind(),與一個特定的本地UDP端口地址進行綁定,以即可以在但願的端口上監聽。這裏來看一下UDT::bind()的實現: api
int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen) { CUDTSocket* s = locate(u); if (NULL == s) throw CUDTException(5, 4, 0); CGuard cg(s->m_ControlLock); // cannot bind a socket more than once if (INIT != s->m_Status) throw CUDTException(5, 0, 0); // check the size of SOCKADDR structure if (AF_INET == s->m_iIPversion) { if (namelen != sizeof(sockaddr_in)) throw CUDTException(5, 3, 0); } else { if (namelen != sizeof(sockaddr_in6)) throw CUDTException(5, 3, 0); } s->m_pUDT->open(); updateMux(s, name); s->m_Status = OPENED; // copy address information of local node s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr); return 0; } int CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock) { CUDTSocket* s = locate(u); if (NULL == s) throw CUDTException(5, 4, 0); CGuard cg(s->m_ControlLock); // cannot bind a socket more than once if (INIT != s->m_Status) throw CUDTException(5, 0, 0); sockaddr_in name4; sockaddr_in6 name6; sockaddr* name; socklen_t namelen; if (AF_INET == s->m_iIPversion) { namelen = sizeof(sockaddr_in); name = (sockaddr*) &name4; } else { namelen = sizeof(sockaddr_in6); name = (sockaddr*) &name6; } if (-1 == ::getsockname(udpsock, name, &namelen)) throw CUDTException(5, 3); s->m_pUDT->open(); updateMux(s, name, &udpsock); s->m_Status = OPENED; // copy address information of local node s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr); return 0; } int CUDT::bind(UDTSOCKET u, const sockaddr* name, int namelen) { try { return s_UDTUnited.bind(u, name, namelen); } catch (CUDTException& e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (bad_alloc&) { s_UDTUnited.setError(new CUDTException(3, 2, 0)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } int CUDT::bind(UDTSOCKET u, UDPSOCKET udpsock) { try { return s_UDTUnited.bind(u, udpsock); } catch (CUDTException& e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (bad_alloc&) { s_UDTUnited.setError(new CUDTException(3, 2, 0)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } int bind(UDTSOCKET u, const struct sockaddr* name, int namelen) { return CUDT::bind(u, name, namelen); } int bind2(UDTSOCKET u, UDPSOCKET udpsock) { return CUDT::bind(u, udpsock); }
UDT主要提供了兩個bind接口,分別是UDT::bind()和,UDT::bind2()。UDT::bind()將一個UDT Socket與一個struct sockaddr對象描述的地址進行綁定,這須要UDT本身先建立相應的系統UDP socket,並將該系統UDP socket綁定到地址,而後把UDT Socket綁定到該系統UDP socket;UDT::bind2()則將一個UDT Socket直接與一個已經建立好的系統UDP socket進行綁定。 網絡
這兩個API的實現結構與UDT::socket()的實現結構基本一致,同樣是分爲3層:UDT命名空間中提供了給應用程序調用的接口,可稱爲UDT API或User API;User API調用CUDT API,這一層主要用來作錯誤處理,也就是捕獲動做實際執行過程當中拋出的異常並保存起來,而後給應用程序使用;CUDT API調用CUDTUnited中API的實現。 多線程
這裏主要來看CUDTUnited中bind()函數的實現。先來看CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen)函數的實現: 異步
1. 調用CUDTUnited::locate(),根據SocketID,也就是UDT Socket handle在CUDTUnited的std::map<UDTSOCKET, CUDTSocket*> m_Sockets中找到對應的CUDTSocket結構(src/api.cpp): socket
CUDTSocket* CUDTUnited::locate(const UDTSOCKET u) { CGuard cg(m_ControlLock); map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u); if ((i == m_Sockets.end()) || (i->second->m_Status == CLOSED)) return NULL; return i->second; }
若找不到,則直接返回;不然,繼續執行。 函數
2. 檢查CUDTSocket對象的狀態,若是當前的狀態不爲INIT,直接拋異常退出;不然,繼續執行。 ui
3. 根據本地IP地址的版本,檢查綁定到的目標地址的長度的有效性。IP版本是在UDT Socket建立時指定的。若是無效,則直接拋異常退出;不然,繼續執行。 this
4. 執行相應的CUDT的open()操做(src/core.cpp): spa
void CUDT::open() { CGuard cg(m_ConnectionLock); // Initial sequence number, loss, acknowledgement, etc. m_iPktSize = m_iMSS - 28; m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize; m_iEXPCount = 1; m_iBandwidth = 1; m_iDeliveryRate = 16; m_iAckSeqNo = 0; m_ullLastAckTime = 0; // trace information m_StartTime = CTimer::getTime(); m_llSentTotal = m_llRecvTotal = m_iSndLossTotal = m_iRcvLossTotal = m_iRetransTotal = m_iSentACKTotal = m_iRecvACKTotal = m_iSentNAKTotal = m_iRecvNAKTotal = 0; m_LastSampleTime = CTimer::getTime(); m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceRcvLoss = m_iTraceRetrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0; m_llSndDuration = m_llSndDurationTotal = 0; // structures for queue if (NULL == m_pSNode) m_pSNode = new CSNode; m_pSNode->m_pUDT = this; m_pSNode->m_llTimeStamp = 1; m_pSNode->m_iHeapLoc = -1; if (NULL == m_pRNode) m_pRNode = new CRNode; m_pRNode->m_pUDT = this; m_pRNode->m_llTimeStamp = 1; m_pRNode->m_pPrev = m_pRNode->m_pNext = NULL; m_pRNode->m_bOnList = false; m_iRTT = 10 * m_iSYNInterval; m_iRTTVar = m_iRTT >> 1; m_ullCPUFrequency = CTimer::getCPUFrequency(); // set up the timers m_ullSYNInt = m_iSYNInterval * m_ullCPUFrequency; // set minimum NAK and EXP timeout to 100ms m_ullMinNakInt = 300000 * m_ullCPUFrequency; m_ullMinExpInt = 300000 * m_ullCPUFrequency; m_ullACKInt = m_ullSYNInt; m_ullNAKInt = m_ullMinNakInt; uint64_t currtime; CTimer::rdtsc(currtime); m_ullLastRspTime = currtime; m_ullNextACKTime = currtime + m_ullSYNInt; m_ullNextNAKTime = currtime + m_ullNAKInt; m_iPktCount = 0; m_iLightACKCount = 1; m_ullTargetTime = 0; m_ullTimeDiff = 0; // Now UDT is opened. m_bOpened = true; }
在這個函數中,主要仍是對變量的初始化,後面會再結合UDT可靠傳輸的具體機制,來講明這些變量的具體含義。
5. 執行updateMux()函數更新UDT Socket的多路複用器的相關信息,後面咱們會再來詳細瞭解這個更新操做。
6. 將CUDTSocket對象的狀態更新爲OPENED。
7. 將發送隊列的Channel的地址信息拷貝到本節點的s->m_pSelfAddr,m_pSelfAddrde對象的內存空間是在建立UDT Socket的CUDTUnited::newSocket()函數中分配的。
後面會再來解釋UDT中Channel和多路複用器Multipexer的含義。
8. 返回0給調用者表示成功結束。
再來看CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock)函數將UDT Socket綁定到一個已經建立好的系統UDP socket的過程:
1. 調用CUDTUnited::locate(),根據SocketID,也就是UDT Socket handle在CUDTUnited的std::map<UDTSOCKET, CUDTSocket*> m_Sockets中找到對應的CUDTSocket結構。若找不到,則直接返回;不然,繼續執行。
2. 檢查CUDTSocket對象的狀態,若是當前的狀態不爲INIT,直接拋異常退出;不然,繼續執行。
3. 獲取系統UDP socket的網絡地址(含端口信息)。若獲取失敗則拋異常推出;不然,繼續執行。
4. 執行相應的CUDT的open()操做對一些變量進行初始化。
5. 執行updateMux()函數更新UDT Socket的多路複用器的相關信息,後面咱們會再來詳細瞭解這個更新操做。
6. 將CUDTSocket對象的狀態更新爲OPENED。
7. 將發送隊列的Channel的地址信息拷貝到本節點的s->m_pSelfAddr,m_pSelfAddrde對象的內存空間是在建立UDT Socket的CUDTUnited::newSocket()函數中分配的。
後面會再來解釋UDT中Channel和多路複用器Multipexer的含義。
8. 返回0給調用者表示成功結束。m_MultiplexerLock
整體來講,bind操做使的UDT Socket狀態機的狀態由INIT狀態,轉換到了OPENED狀態。
CUDTUnited的這兩個bind()函數有如此多的重複邏輯,總讓人以爲,是有方法作進一步的抽象,以消除重複的邏輯,並使這兩個函數的實現都更加精簡的。
bind()操做所作的最最重要的事大概就是將UDT Socket與多路複用器關聯,也就是CUDTUnited::updateMux()函數的執行了。爲了後面可以更清晰地說明更新多路複用器的操做過程,這裏先說明一下UDT的多路複用器CMultiplexer、通道CChannel、發送隊列CSndQueue和接收隊列CRcvQueue的含義。
UDT中的通道CChannel是系統UDP socket的一個封裝,它主要封裝了系統UDP socket handle,IP版本號,socket地址的長度,發送緩衝區的大小及接收緩衝區的大小等信息,並提供了用於操做 系統UDP socket進行數據收發或屬性設置等動做的函數。咱們能夠看一下這個class的定義(src/channel.h):
class CChannel { public: CChannel(); CChannel(int version); ~CChannel(); // Functionality: // Open a UDP channel. // Parameters: // 0) [in] addr: The local address that UDP will use. // Returned value: // None. void open(const sockaddr* addr = NULL); // Functionality: // Open a UDP channel based on an existing UDP socket. // Parameters: // 0) [in] udpsock: UDP socket descriptor. // Returned value: // None. void open(UDPSOCKET udpsock); // Functionality: // Disconnect and close the UDP entity. // Parameters: // None. // Returned value: // None. void close() const; // Functionality: // Get the UDP sending buffer size. // Parameters: // None. // Returned value: // Current UDP sending buffer size. int getSndBufSize(); // Functionality: // Get the UDP receiving buffer size. // Parameters: // None. // Returned value: // Current UDP receiving buffer size. int getRcvBufSize(); // Functionality: // Set the UDP sending buffer size. // Parameters: // 0) [in] size: expected UDP sending buffer size. // Returned value: // None. void setSndBufSize(int size); // Functionality: // Set the UDP receiving buffer size. // Parameters: // 0) [in] size: expected UDP receiving buffer size. // Returned value: // None. void setRcvBufSize(int size); // Functionality: // Query the socket address that the channel is using. // Parameters: // 0) [out] addr: pointer to store the returned socket address. // Returned value: // None. void getSockAddr(sockaddr* addr) const; // Functionality: // Send a packet to the given address. // Parameters: // 0) [in] addr: pointer to the destination address. // 1) [in] packet: reference to a CPacket entity. // Returned value: // Actual size of data sent. int sendto(const sockaddr* addr, CPacket& packet) const; // Functionality: // Receive a packet from the channel and record the source address. // Parameters: // 0) [in] addr: pointer to the source address. // 1) [in] packet: reference to a CPacket entity. // Returned value: // Actual size of data received. int recvfrom(sockaddr* addr, CPacket& packet) const; private: void setUDPSockOpt(); private: int m_iIPversion; // IP version int m_iSockAddrSize; // socket address structure size (pre-defined to avoid run-time test) UDPSOCKET m_iSocket; // socket descriptor int m_iSndBufSize; // UDP sending buffer size int m_iRcvBufSize; // UDP receiving buffer size };
接收隊列CRcvQueue在初始化時會起一個線程,該線程在被停掉前,會不斷地由CChannel接收其它節點發送過來的UDP消息,能夠將這個線程看作是listening在系統UDP 端口上的一個UDP Server。在接收到消息以後,該線程會根據消息的類型及目標 SocketID,把消息dispatch給不一樣的UDT Socket的CUDT對象。好比對於Handshake類型的消息就會dispatch給listening的UDT Socket的CUDT對象。後面咱們研究具體的消息收發的時候再來仔細看這個類的設計。
發送隊列CSndQueue,主要用於同步地向特定的目標發送一個UDT的Packet,或者在適當的時機異步地發送一些消息,它一樣會在初始化是起一個線程,用來執行異步地發送任務。這個class是UDT作可靠傳輸的一個比較關鍵的class,後面咱們研究具體的消息收發的時候再來仔細看這個類的設計。
UDT的多路複用器結構CMultiplexer將全部這些與特定的系統UDP socket相關聯的CChannel,CRcvQueue,CSndQueue包在一塊兒,並描述了這個系統UDP socket收發的數據的一些公有屬性,有UDP 端口號,IP版本號,最大的包大小,引用計數,是否可複用,及用作哈希索引的ID等。能夠看一下這個class的定義:
struct CMultiplexer { CSndQueue* m_pSndQueue; // The sending queue CRcvQueue* m_pRcvQueue; // The receiving queue CChannel* m_pChannel; // The UDP channel for sending and receiving CTimer* m_pTimer; // The timer int m_iPort; // The UDP port number of this multiplexer int m_iIPversion; // IP version int m_iMSS; // Maximum Segment Size int m_iRefCount; // number of UDT instances that are associated with this multiplexer bool m_bReusable; // if this one can be shared with others int m_iID; // multiplexer ID CMultiplexer() : m_pSndQueue(NULL), m_pRcvQueue(NULL), m_pChannel(NULL), m_pTimer(NULL), m_iPort(0), m_iIPversion(0), m_iMSS(0), m_iRefCount(0), m_bReusable(true), m_iID(0) { } };
接着來看CUDTUnited::updateMux()函數的定義(src/api.cpp):
void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock) { CGuard cg(m_ControlLock); CMultiplexer m; if ((s->m_pUDT->m_bReuseAddr) && (NULL != addr)) { int port = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in*) addr)->sin_port) : ntohs(((sockaddr_in6*) addr)->sin6_port); // find a reusable address for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++i) { if ((i->second.m_iIPversion == s->m_pUDT->m_iIPversion) && (i->second.m_iMSS == s->m_pUDT->m_iMSS) && i->second.m_bReusable) { if (i->second.m_iPort == port) { // reuse the existing multiplexer m = i->second; break; } } } } // a new multiplexer is needed if (m.m_iID == 0) { m.m_iMSS = s->m_pUDT->m_iMSS; m.m_iIPversion = s->m_pUDT->m_iIPversion; m.m_bReusable = s->m_pUDT->m_bReuseAddr; m.m_iID = s->m_SocketID; m.m_pChannel = new CChannel(s->m_pUDT->m_iIPversion); m.m_pChannel->setSndBufSize(s->m_pUDT->m_iUDPSndBufSize); m.m_pChannel->setRcvBufSize(s->m_pUDT->m_iUDPRcvBufSize); try { if (NULL != udpsock) m.m_pChannel->open(*udpsock); else m.m_pChannel->open(addr); } catch (CUDTException& e) { m.m_pChannel->close(); delete m.m_pChannel; throw e; } sockaddr* sa = (AF_INET == s->m_pUDT->m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6; m.m_pChannel->getSockAddr(sa); m.m_iPort = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in*) sa)->sin_port) : ntohs(((sockaddr_in6*) sa)->sin6_port); if (AF_INET == s->m_pUDT->m_iIPversion) delete (sockaddr_in*) sa; else delete (sockaddr_in6*) sa; m.m_pTimer = new CTimer; m.m_pSndQueue = new CSndQueue; m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer); m.m_pRcvQueue = new CRcvQueue; m.m_pRcvQueue->init(32, s->m_pUDT->m_iPayloadSize, m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer); m_mMultiplexer[m.m_iID] = m; } ++m.m_iRefCount; s->m_pUDT->m_pSndQueue = m.m_pSndQueue; s->m_pUDT->m_pRcvQueue = m.m_pRcvQueue; s->m_iMuxID = m.m_iID; }
1. 這個函數首先會在已經建立的多路複用器的map中查找,看看是否存在 要與多路複用器關聯的UDT Socket可用的多路複用器存在。對於一個UDT Socket來講,UDT Socket自己網絡地址可複用,且某個多路複用器同時知足它的CChannel的UDP端口號與UDT Socket要bind的目標UDP端口號匹配,它的CChannel的IP地址版本及MSS與UDT Socket的IP地址版本及MSS匹配,它自己可複用,則該多路複用器就是該UDT Socket可用的多路複用器。
2. 若在前面的步驟中,沒有找到可用的多路複用器,則建立一個。
根據UDT Socket的MSS值,IP版本號,及地址的可複用性來初始化CMultiplexer的對應值。設置CMultiplexer的ID爲UDT Socket的SocketID。也就是說,某個CMultiplexer的ID就是與它關聯的首個UDT Socket的SocketID。
建立CChannel,設置系統UDP socket發送緩衝區及接收緩衝區的大小。並執行CChannel的open()操做。在CChannel::open()中若是不是綁定的已經建立好的系統UDP socket的話,它會自行建立系統UDP socket,並綁定到目標端口上。
獲取CChannel實際綁定的UDP端口號,賦值給m.m_iPort。
建立CTimer。
建立並初始化CSndQueue。
建立並初始化CRcvQueue。
將新建的CMultiplexer放進std::map<int, CMultiplexer> m_mMultiplexer中。
在CUDTUnited類定義中能夠看到以下幾行:
private: std::map<int, CMultiplexer> m_mMultiplexer; // UDP multiplexer pthread_mutex_t m_MultiplexerLock;
本來設計彷佛是要用m_MultiplexerLock來保證對m_mMultiplexer多線程的互斥訪問的,但卻沒有一個地方有用到這個m_MultiplexerLock。不知是發現保護全無必要,仍是有所遺漏?
3. 將UDT Socket與多路複用器關聯起來,不論是找到的現成可用的,仍是徹底新建立的。這裏能夠看到所謂的將UDT Socket與多路複用器關聯的含義,便是讓CUDTSocket的CUDT對象m_pUDT的發送隊列和接收隊列指向CMultiplexer的發送隊列和接收隊列,設置CUDTSocket的多路複用器ID爲CMultiplexer的ID m_iID,這樣後面CUDTSocket和CUDT就可使用發送隊列CSndQueue和接收隊列CRcvQueue進行數據的收發,並可在須要的時候找到相關的CMultiplexer對象了。
自此以後,CUDTSocket就有了能夠用來收發數據的設施了。
總結一下UDT bind的主要過程。UDT bind過程當中,作的最主要的事情就是,根據一個已經建立好的UDT Socket的一些信息及要綁定的本地UDP端口,找到或建立一個多路複用器CMultiplexer,將UDT Socket與該CMultiplexer關聯,即設置CUDTSocket的多路複用器ID m_iMuxID爲該CMultiplexer的ID,UDT Socket的發送隊列指針和接收隊列指針指向該CMultiplexer的發送隊列和接收隊列。後續UDT Socket就能夠經過發送隊列/接收隊列及它們的CChannel進行數據的收發了。在這個過程當中,UDT Socket狀態機完成了狀態由INIT到OPENED的轉變。
在UDT Server端,對UDT Socket執行了bind操做以後,就能夠執行listen來等待其它節點的鏈接了。這裏來看下UDT listen的過程(src/api.cpp):
int CUDTUnited::listen(const UDTSOCKET u, int backlog) { CUDTSocket* s = locate(u); if (NULL == s) throw CUDTException(5, 4, 0); CGuard cg(s->m_ControlLock); // do nothing if the socket is already listening if (LISTENING == s->m_Status) return 0; // a socket can listen only if is in OPENED status if (OPENED != s->m_Status) throw CUDTException(5, 5, 0); // listen is not supported in rendezvous connection setup if (s->m_pUDT->m_bRendezvous) throw CUDTException(5, 7, 0); if (backlog <= 0) throw CUDTException(5, 3, 0); s->m_uiBackLog = backlog; try { s->m_pQueuedSockets = new set<UDTSOCKET>; s->m_pAcceptSockets = new set<UDTSOCKET>; } catch (...) { delete s->m_pQueuedSockets; delete s->m_pAcceptSockets; throw CUDTException(3, 2, 0); } s->m_pUDT->listen(); s->m_Status = LISTENING; return 0; } int CUDT::listen(UDTSOCKET u, int backlog) { try { return s_UDTUnited.listen(u, backlog); } catch (CUDTException& e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (bad_alloc&) { s_UDTUnited.setError(new CUDTException(3, 2, 0)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } int listen(UDTSOCKET u, int backlog) { return CUDT::listen(u, backlog); }
這個API的實現一樣分爲3層,UDT命名空間提供的直接給應用程序調用User API層,CUDT API層用於作異常處理,CUDTUnited具體實現API的功能。這裏直接來分析CUDTUnited::listen()函數:
1. 調用CUDTUnited::locate(),查找UDT Socket對應的CUDTSocket結構。若找不到,則拋出異常直接返回;不然,繼續執行。
2. 檢查CUDTSocket對象的狀態,若是當前的狀態爲LISTENING,則說明UDT Socket已經處於監聽狀態了,直接返回;若當前狀態不爲OPENED,直接拋異常退出,不然,繼續執行。這就限制了只有通過了bind操做的UDT Socket才能監聽,也就是UDT Socket的狀態只能由OPENED轉爲LISTENING。
3. 檢查是不是rendezvous的UDT Socket,如果則拋出異常推出。這確保在監聽的UDT Socket不能爲rendezvous的。
4. 檢查傳入的backlog參數並進行設置。backlog參數用於指定Listening的UDT Socket同一時刻可以處理的最大的等待鏈接的請求數。Listening的UDT Socket在收到鏈接請求的Handshake消息後,通過幾回來回確認,會建立新的UDT Socket以便於經過UDT::accept()函數返回給應用程序,用於與請求鏈接的發起方進行通訊。backlog值用於限定,尚未經過accept()返回的新建立的UDT Socket的個數。
5. 建立兩個UDTSOCKET的集合m_pQueuedSockets和m_pAcceptSockets,前者爲Listening的UDT Socket的鏈接已經成功創建但還未經過UDT::accept()返回給應用程序的UDT Socket的集合;然後者則是已經經過UDT::accept()返回給應用程序的UDT Socket的集合。
6. 執行UDTSocket的CUDT的listen()操做,能夠看一下CUDT listen動做的具體含義(src/core.cpp):
void CUDT::listen() { CGuard cg(m_ConnectionLock); if (!m_bOpened) throw CUDTException(5, 0, 0); if (m_bConnecting || m_bConnected) throw CUDTException(5, 2, 0); // listen can be called more than once if (m_bListening) return; // if there is already another socket listening on the same port if (m_pRcvQueue->setListener(this) < 0) throw CUDTException(5, 11, 0); m_bListening = true; }
先是進行狀態的合法性檢查。
而後執行m_pRcvQueue->setListener(this),將本CUDT設置爲接收隊列的listener。
最後設置CUDT的狀態m_bListening爲true。
這裏能夠看出CUDTSocket與CUDT是表示UDT Socket的兩層狀態機,它們的狀態之間有關聯,但又有各自的描述方法。這樣彷佛大大增長了這個UDT Socket狀態管理的複雜度了。
再來看一下CRcvQueue::setListener()(src/queue.cpp):
int CRcvQueue::setListener(CUDT* u) { CGuard lslock(m_LSLock); if (NULL != m_pListener) return -1; m_pListener = u; return 0; }
設置接收隊列CRcvQueue的Listener。
7. 設置CUDTSocket的狀態爲LISTENING並返回。
能夠看到對於UDT::listen()的調用,促使UDT Socket的狀態由OPENED轉換爲了LISTENING。UDT::listen()主要的做用就是 爲與UDT Socket關聯的特定端口上的多路複用器CMultiplexer的接收隊列CRcvQueue設置listener,這個動做最主要的意義在於消息的dispatch。咱們知道CMultiplexer的接收隊列CRcvQueue在建立、初始化時會起一個線程,不斷地試圖從網絡接收UDP消息,在收到消息以後,將消息dispatch給不一樣的UDT Socket處理,其中的Handshake等消息,就會被dispatch給listener CUDT處理。後面在具體研究消息的收發時會再來詳細研究這個過程。
UDT Server端在對listening執行了UDT::listen()操做以後,就能夠執行UDT::accept()操做來等待其它節點鏈接本身了。來看一下UDT::accept()的執行過程(src/api.cpp):
UDTSOCKET CUDTUnited::accept(const UDTSOCKET listen, sockaddr* addr, int* addrlen) { if ((NULL != addr) && (NULL == addrlen)) throw CUDTException(5, 3, 0); CUDTSocket* ls = locate(listen); if (ls == NULL) throw CUDTException(5, 4, 0); // the "listen" socket must be in LISTENING status if (LISTENING != ls->m_Status) throw CUDTException(5, 6, 0); // no "accept" in rendezvous connection setup if (ls->m_pUDT->m_bRendezvous) throw CUDTException(5, 7, 0); UDTSOCKET u = CUDT::INVALID_SOCK; bool accepted = false; // !!only one conection can be set up each time!! #ifndef WIN32 while (!accepted) { pthread_mutex_lock(&(ls->m_AcceptLock)); if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken) { // This socket has been closed. accepted = true; } else if (ls->m_pQueuedSockets->size() > 0) { u = *(ls->m_pQueuedSockets->begin()); ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u); ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin()); accepted = true; } else if (!ls->m_pUDT->m_bSynRecving) { accepted = true; } if (!accepted && (LISTENING == ls->m_Status)) pthread_cond_wait(&(ls->m_AcceptCond), &(ls->m_AcceptLock)); if (ls->m_pQueuedSockets->empty()) m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false); pthread_mutex_unlock(&(ls->m_AcceptLock)); } #else while (!accepted) { WaitForSingleObject(ls->m_AcceptLock, INFINITE); if (ls->m_pQueuedSockets->size() > 0) { u = *(ls->m_pQueuedSockets->begin()); ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u); ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin()); accepted = true; } else if (!ls->m_pUDT->m_bSynRecving) accepted = true; ReleaseMutex(ls->m_AcceptLock); if (!accepted & (LISTENING == ls->m_Status)) WaitForSingleObject(ls->m_AcceptCond, INFINITE); if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken) { // Send signal to other threads that are waiting to accept. SetEvent(ls->m_AcceptCond); accepted = true; } if (ls->m_pQueuedSockets->empty()) m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false); } #endif if (u == CUDT::INVALID_SOCK) { // non-blocking receiving, no connection available if (!ls->m_pUDT->m_bSynRecving) throw CUDTException(6, 2, 0); // listening socket is closed throw CUDTException(5, 6, 0); } if ((addr != NULL) && (addrlen != NULL)) { if (AF_INET == locate(u)->m_iIPversion) *addrlen = sizeof(sockaddr_in); else *addrlen = sizeof(sockaddr_in6); // copy address information of peer node memcpy(addr, locate(u)->m_pPeerAddr, *addrlen); } return u; } UDTSOCKET CUDT::accept(UDTSOCKET u, sockaddr* addr, int* addrlen) { try { return s_UDTUnited.accept(u, addr, addrlen); } catch (CUDTException& e) { s_UDTUnited.setError(new CUDTException(e)); return INVALID_SOCK; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return INVALID_SOCK; } } UDTSOCKET accept(UDTSOCKET u, struct sockaddr* addr, int* addrlen) { return CUDT::accept(u, addr, addrlen); }
這個API實現的3層結構與UDT::bind(),UDT::listen()同樣,再也不贅述。來看CUDTUnited::accept()的實現:
1. 調用CUDTUnited::locate(),查找UDT Socket對應的CUDTSocket結構。若找不到,則拋出異常直接返回;不然,繼續執行。
2. 檢查CUDTSocket對象的狀態。可見CUDTUnited::accept()操做要求相應的UDT Socket必須處於LISTENING狀態,且不能爲Rendezvous模式。這個地方對於ls->m_pUDT->m_bRendezvous的檢查彷佛有些多餘了,在CUDTUnited::listen()中能夠看到,若是UDT Socket處於Rendezvous模式的話,根本就不可能完成狀態由OPENED到LISTENING的轉換,於是對於UDT Socket LISTENING狀態的檢查已經足夠了。
3. 經過一個循環來等待其它節點的鏈接。這指的是,等待ls->m_pQueuedSockets中被放入爲新的鏈接建立的UDT Socket。有新的鏈接時,CUDTUnited::accept()線程被喚醒,它會將UDT Socket從ls->m_pQueuedSockets中移到ls->m_pAcceptSockets,並準備將UDT Socket返回給調用者。固然CUDTUnited::accept()的等待過程結束的條件不僅是有新鏈接進來,在Listening的UDT Socket被closed掉時,ls->m_pUDT->m_bBroken會被設置,UDT Socket的狀態也可能會發生變化,此時等待過程會結束;或者UDT Socket處於同步接收狀態,則不管是否有新鏈接,等待過程都會盡快結束。
4. 等待鏈接的過程意外退出,也就是在沒有等到新鏈接進來的狀況下等待過程就退出了的狀況下,拋出異常退出。若是UDT Socket處於同步接收狀態,拋出某個類型的異常,不然拋出另一種類型的異常來表示UDT Socket被關閉了。這個地方的邏輯,向調用者展現的異常信息可能具備誤導性,好比一個同步接收的UDT Socket被關閉了,向調用者展現的信息彷佛仍然代表,UDT Socket是因爲同步接收的問題而沒有等到新鏈接進來才退出的。
5. 等到了新鏈接進來的狀況下,將發起端的網絡地址拷貝給調用者。
6. 將新UDT Socket的SocketID返回給調用者。
能夠看到,UDT::accept()這個地方是一個典型的生產者-消費者模型。UDT::accept()是消費者,消費的對象是ls->m_pQueuedSockets中的UDT Socket。咱們分析UDT::accept()函數的實現,只能看到這個關於生產-消費的故事的一半,另外一半關於生產的故事則須要經過更仔細地分析CRcvQueue::worker()的執行來了解了。
總結一下這幾個操做與Listening Socket狀態變化之間的關係,以下圖所示:
Done.