UDT Server在執行UDT::listen()以後,就能夠接受其它節點的鏈接請求了。這裏咱們研究一下UDT鏈接創建的過程。 node
來看鏈接的發起方。如前面咱們看到的那樣,UDT Client建立一個Socket,能夠將該Socket綁定到某個端口,也能夠不綁定,而後就能夠調用UDT::connect()將這個Socket鏈接到UDT Server了。來看UDT::connect()的定義(src/api.cpp): 算法
int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen) { CUDTSocket* s = locate(u); if (NULL == s) throw CUDTException(5, 4, 0); CGuard cg(s->m_ControlLock); // check the size of SOCKADDR structure if (AF_INET == s->m_iIPversion) { if (namelen != sizeof(sockaddr_in)) throw CUDTException(5, 3, 0); } else { if (namelen != sizeof(sockaddr_in6)) throw CUDTException(5, 3, 0); } // a socket can "connect" only if it is in INIT or OPENED status if (INIT == s->m_Status) { if (!s->m_pUDT->m_bRendezvous) { s->m_pUDT->open(); updateMux(s); s->m_Status = OPENED; } else throw CUDTException(5, 8, 0); } else if (OPENED != s->m_Status) throw CUDTException(5, 2, 0); // connect_complete() may be called before connect() returns. // So we need to update the status before connect() is called, // otherwise the status may be overwritten with wrong value (CONNECTED vs. CONNECTING). s->m_Status = CONNECTING; try { s->m_pUDT->connect(name); } catch (CUDTException &e) { s->m_Status = OPENED; throw e; } // record peer address delete s->m_pPeerAddr; if (AF_INET == s->m_iIPversion) { s->m_pPeerAddr = (sockaddr*) (new sockaddr_in); memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in)); } else { s->m_pPeerAddr = (sockaddr*) (new sockaddr_in6); memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in6)); } return 0; } int CUDT::connect(UDTSOCKET u, const sockaddr* name, int namelen) { try { return s_UDTUnited.connect(u, name, namelen); } catch (CUDTException &e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (bad_alloc&) { s_UDTUnited.setError(new CUDTException(3, 2, 0)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } int connect(UDTSOCKET u, const struct sockaddr* name, int namelen) { return CUDT::connect(u, name, namelen); }
UDT::connect() API實現的結構跟其它的API沒有太大的區別,再也不贅述,直接來分析CUDTUnited::connect(): shell
1. 調用CUDTUnited::locate(),查找UDT Socket對應的CUDTSocket結構。若找不到,則拋出異常直接返回;不然,繼續執行。 api
2. 根據UDT Socket的IP版本,檢查目標地址的有效性。若無效,則退出,不然繼續執行。 數組
3. 檢查UDT Socket的狀態。確保只有處於INIT或OPENED狀態的UDT Socket才能夠執行connect()操做。新建立的UDT Socket處於INIT狀態,bind以後UDT Socket處於OPENED狀態。若是UDT Socket處於INIT狀態,且不是Rendezvous模式,還會執行s->m_pUDT->open(),將UDT Socket與多路複用器CMultiplexer,而後將狀態置爲OPENED。
緩存
前面咱們在bind的執行過程當中有看到將UDT Socket與多路複用器CMultiplexer關聯的過程CUDTUnited::updateMux()。但這裏執行的updateMux()的不一樣之處在於,它既沒有傳遞有效的系統UDP socket,也沒有傳遞有效的本地端口地址。回想updateMux()的實現,這兩個參數主要決定了CMultiplexer的CChannel將與哪一個端口關聯。來看兩個CChannel::open()的實現(src/channel.cpp): 安全
void CChannel::open(const sockaddr* addr) { // construct an socket m_iSocket = ::socket(m_iIPversion, SOCK_DGRAM, 0); #ifdef WIN32 if (INVALID_SOCKET == m_iSocket) #else if (m_iSocket < 0) #endif throw CUDTException(1, 0, NET_ERROR); if (NULL != addr) { socklen_t namelen = m_iSockAddrSize; if (0 != ::bind(m_iSocket, addr, namelen)) throw CUDTException(1, 3, NET_ERROR); } else { //sendto or WSASendTo will also automatically bind the socket addrinfo hints; addrinfo* res; memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_flags = AI_PASSIVE; hints.ai_family = m_iIPversion; hints.ai_socktype = SOCK_DGRAM; if (0 != ::getaddrinfo(NULL, "0", &hints, &res)) throw CUDTException(1, 3, NET_ERROR); if (0 != ::bind(m_iSocket, res->ai_addr, res->ai_addrlen)) throw CUDTException(1, 3, NET_ERROR); ::freeaddrinfo(res); } setUDPSockOpt(); } void CChannel::open(UDPSOCKET udpsock) { m_iSocket = udpsock; setUDPSockOpt(); } void CChannel::setUDPSockOpt() { #if defined(BSD) || defined(OSX) // BSD system will fail setsockopt if the requested buffer size exceeds system maximum value int maxsize = 64000; if (0 != ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (char*)&m_iRcvBufSize, sizeof(int))) ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (char*)&maxsize, sizeof(int)); if (0 != ::setsockopt(m_iSocket, SOL_SOCKET, SO_SNDBUF, (char*)&m_iSndBufSize, sizeof(int))) ::setsockopt(m_iSocket, SOL_SOCKET, SO_SNDBUF, (char*)&maxsize, sizeof(int)); #else // for other systems, if requested is greated than maximum, the maximum value will be automactally used if ((0 != ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (char*) &m_iRcvBufSize, sizeof(int))) || (0 != ::setsockopt(m_iSocket, SOL_SOCKET, SO_SNDBUF, (char*) &m_iSndBufSize, sizeof(int)))) throw CUDTException(1, 3, NET_ERROR); #endif timeval tv; tv.tv_sec = 0; #if defined (BSD) || defined (OSX) // Known BSD bug as the day I wrote this code. // A small time out value will cause the socket to block forever. tv.tv_usec = 10000; #else tv.tv_usec = 100; #endif #ifdef UNIX // Set non-blocking I/O // UNIX does not support SO_RCVTIMEO int opts = ::fcntl(m_iSocket, F_GETFL); if (-1 == ::fcntl(m_iSocket, F_SETFL, opts | O_NONBLOCK)) throw CUDTException(1, 3, NET_ERROR); #elif WIN32 DWORD ot = 1; //milliseconds if (0 != ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVTIMEO, (char *)&ot, sizeof(DWORD))) throw CUDTException(1, 3, NET_ERROR); #else // Set receiving time-out value if (0 != ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, sizeof(timeval))) throw CUDTException(1, 3, NET_ERROR); #endif }
能夠看到CChannel::open()主要是把UDT的CChannel與一個系統的UDP socket關聯起來,它們總共處理了3中狀況,一是調用者已經建立並綁定到了目標端口的系統UDP socket,這種最簡單,直接將傳遞進來的UDPSOCKET賦值給CChannel的m_iSocket,而後設置系統UDP socket的選項;二是傳遞進來了一個有效的本地端口地址,此時CChannel會本身先建立一個系統UDP socket,並將該socket綁定到傳進來的目標端口地址,1、二兩種狀況正是UDT的兩個bind API的狀況;三是既沒有有效的系統UDP socket,又沒有有效的本地端口地址傳進來,則會在建立了系統UDP socket以後,先再找一個可用的端口地址,而後將該socket綁定到找到的端口地址,這也就是UDT Socket沒有bind,直接connect的狀況。 cookie
4. 將UDT Socket的狀態置爲CONNECTING。 網絡
5. 執行s->m_pUDT->connect(name),鏈接UDT Server。若是鏈接失敗,有異常拋出,UDT Socket的狀態會退回到OPENED狀態,而後返回。在這個函數中會完成創建鏈接整個的網絡消息交互過程。 app
6. 將鏈接的目標地址複製到UDT Socket的Peer Address。而後返回0表示成功結束。
在仔細地分析鏈接創建過程當中的數據包交互以前,能夠先粗略地看一下這個過程收發了幾個包,及各個包收發的順序。咱們知道在UDT中,全部數據包的收發都是經過CChannel完成的,咱們能夠在CChannel::sendto()和CChannel::recvfrom()中加log來track這一過程。經過UDT提供的demo程序appserver和appclient(在app/目錄下)來研究。先在一個終端下執行appserver:
xxxxxx@ThundeRobot:/media/data/downloads/hudt/app$ ./appserver server is ready at port: 9000
改造appclient,使得它只發送一個比較小的數據包就結束,編譯後在另外一個終端下執行,能夠看到有以下的logs吐出來:
xxxxxx@ThundeRobot:/media/data/downloads/hudt/app$ ./appclient 127.0.0.1 9000 To connect CRcvQueue::registerConnector Send packet 0 Receive packet 364855723 unit->m_Packet.m_iID 364855723 Send packet 0 Receive packet 364855723 unit->m_Packet.m_iID 364855723 To send data. send 10 bytes Send packet 1020108693 Receive packet 364855723 unit->m_Packet.m_iID 364855723 Send packet 1020108693 Receive packet 364855723 unit->m_Packet.m_iID 364855723 Send packet 1020108693 Receive packet 364855723 unit->m_Packet.m_iID 364855723 Send packet 1020108693
在appclient運行的這段時間,在運行appserver的終端下的能夠看到有以下的logs輸出:
xxxxxx@ThundeRobot:/media/data/downloads/hudt/app$ ./appserver server is ready at port: 9000 Receive packet 0 unit->m_Packet.m_iID 0 Send packet 364855723 Receive packet 0 unit->m_Packet.m_iID 0 new CUDTSocket SocketID is 1020108693 PeerID 364855723 Send packet 364855723 new connection: 127.0.0.1:59847 Receive packet 1020108693 unit->m_Packet.m_iID 1020108693 Send packet 364855723 Send packet 364855723 Send packet 364855723 Receive packet 1020108693 unit->m_Packet.m_iID 1020108693 Receive packet 1020108693 unit->m_Packet.m_iID 1020108693 Receive packet 1020108693 unit->m_Packet.m_iID 1020108693 recv:Connection was broken.
能夠看到,UDT Client端先發送了一個消息MSG1給UDT Server;UDT Server端收到消息MSG1以後,回了一個消息MSG2給UDT Client;UDT Client收到消息MSG2,又回了一個消息MSG3給UDT Server;UDT Server收到消息MSG3後又回了一個消息MSG4給UDT Client,而後從UDT::accept()返回,自此UDT Server認爲一個鏈接已經成功創建;UDT Client則在收到消息MSG4後,從UDT::connect()返回,並自此認爲鏈接已成功創建,能夠進行數據的收發了。用一幅圖來描述這個過程:
至於MSG一、二、三、4的具體格式及內容,則留待咱們後面來具體分析了。
接着來看鏈接創建過程消息交互具體的實現,也就是CUDT::connect()函數:
void CUDT::connect(const sockaddr* serv_addr) { CGuard cg(m_ConnectionLock); if (!m_bOpened) throw CUDTException(5, 0, 0); if (m_bListening) throw CUDTException(5, 2, 0); if (m_bConnecting || m_bConnected) throw CUDTException(5, 2, 0); // record peer/server address delete m_pPeerAddr; m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6; memcpy(m_pPeerAddr, serv_addr, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6)); // register this socket in the rendezvous queue // RendezevousQueue is used to temporarily store incoming handshake, non-rendezvous connections also require this function uint64_t ttl = 3000000; if (m_bRendezvous) ttl *= 10; ttl += CTimer::getTime(); m_pRcvQueue->registerConnector(m_SocketID, this, m_iIPversion, serv_addr, ttl); // This is my current configurations m_ConnReq.m_iVersion = m_iVersion; m_ConnReq.m_iType = m_iSockType; m_ConnReq.m_iMSS = m_iMSS; m_ConnReq.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize) ? m_iRcvBufSize : m_iFlightFlagSize; m_ConnReq.m_iReqType = (!m_bRendezvous) ? 1 : 0; m_ConnReq.m_iID = m_SocketID; CIPAddress::ntop(serv_addr, m_ConnReq.m_piPeerIP, m_iIPversion); // Random Initial Sequence Number srand((unsigned int) CTimer::getTime()); m_iISN = m_ConnReq.m_iISN = (int32_t) (CSeqNo::m_iMaxSeqNo * (double(rand()) / RAND_MAX)); m_iLastDecSeq = m_iISN - 1; m_iSndLastAck = m_iISN; m_iSndLastDataAck = m_iISN; m_iSndCurrSeqNo = m_iISN - 1; m_iSndLastAck2 = m_iISN; m_ullSndLastAck2Time = CTimer::getTime(); // Inform the server my configurations. CPacket request; char* reqdata = new char[m_iPayloadSize]; request.pack(0, NULL, reqdata, m_iPayloadSize); // ID = 0, connection request request.m_iID = 0; int hs_size = m_iPayloadSize; m_ConnReq.serialize(reqdata, hs_size); request.setLength(hs_size); m_pSndQueue->sendto(serv_addr, request); m_llLastReqTime = CTimer::getTime(); m_bConnecting = true; // asynchronous connect, return immediately if (!m_bSynRecving) { delete[] reqdata; return; } // Wait for the negotiated configurations from the peer side. CPacket response; char* resdata = new char[m_iPayloadSize]; response.pack(0, NULL, resdata, m_iPayloadSize); CUDTException e(0, 0); while (!m_bClosing) { // avoid sending too many requests, at most 1 request per 250ms if (CTimer::getTime() - m_llLastReqTime > 250000) { m_ConnReq.serialize(reqdata, hs_size); request.setLength(hs_size); if (m_bRendezvous) request.m_iID = m_ConnRes.m_iID; m_pSndQueue->sendto(serv_addr, request); m_llLastReqTime = CTimer::getTime(); } response.setLength(m_iPayloadSize); if (m_pRcvQueue->recvfrom(m_SocketID, response) > 0) { if (connect(response) <= 0) break; // new request/response should be sent out immediately on receving a response m_llLastReqTime = 0; } if (CTimer::getTime() > ttl) { // timeout e = CUDTException(1, 1, 0); break; } } delete[] reqdata; delete[] resdata; if (e.getErrorCode() == 0) { if (m_bClosing) // if the socket is closed before connection... e = CUDTException(1); else if (1002 == m_ConnRes.m_iReqType) // connection request rejected e = CUDTException(1, 2, 0); else if ((!m_bRendezvous) && (m_iISN != m_ConnRes.m_iISN)) // secuity check e = CUDTException(1, 4, 0); } if (e.getErrorCode() != 0) throw e; }
能夠看到,在這個函數中主要完成了以下的這樣一些事情:
1. 檢查CUDT的狀態。確保只有已經與多路複用器關聯,即處於OPENED狀態的UDT Socket才能執行CUDT::connect()操做。如前面看到的,bind操做可使UDT Socket進入OPENED狀態。對於沒有進行過bind的UDT Socket,CUDTUnited::connect()會作這樣的保證。
2. 拷貝目標網絡地址爲UDT Socket的PeerAddr。
3. 執行m_pRcvQueue->registerConnector()向接收隊列註冊Connector。來看這個函數的執行過程(src/queue.cpp):
void CRendezvousQueue::insert(const UDTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl) { CGuard vg(m_RIDVectorLock); CRL r; r.m_iID = id; r.m_pUDT = u; r.m_iIPversion = ipv; r.m_pPeerAddr = (AF_INET == ipv) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6; memcpy(r.m_pPeerAddr, addr, (AF_INET == ipv) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6)); r.m_ullTTL = ttl; m_lRendezvousID.push_back(r); } void CRcvQueue::registerConnector(const UDTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl) { m_pRendezvousQueue->insert(id, u, ipv, addr, ttl); }
能夠看到,在這個函數中,主要是向接收隊列CRcvQueue的CRendezvousQueue m_pRendezvousQueue中插入了一個CRL結構。那CRendezvousQueue又是個什麼東西呢?來看它的定義(src/queue.h):
class CRendezvousQueue { public: CRendezvousQueue(); ~CRendezvousQueue(); public: void insert(const UDTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl); void remove(const UDTSOCKET& id); CUDT* retrieve(const sockaddr* addr, UDTSOCKET& id); void updateConnStatus(); private: struct CRL { UDTSOCKET m_iID; // UDT socket ID (self) CUDT* m_pUDT; // UDT instance int m_iIPversion; // IP version sockaddr* m_pPeerAddr; // UDT sonnection peer address uint64_t m_ullTTL; // the time that this request expires }; std::list<CRL> m_lRendezvousID; // The sockets currently in rendezvous mode pthread_mutex_t m_RIDVectorLock; };
能夠看到,它就是一個簡單的容器,提供的操做也是常規的插入、移除及檢索等操做:
void CRendezvousQueue::remove(const UDTSOCKET& id) { CGuard vg(m_RIDVectorLock); for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++i) { if (i->m_iID == id) { if (AF_INET == i->m_iIPversion) delete (sockaddr_in*) i->m_pPeerAddr; else delete (sockaddr_in6*) i->m_pPeerAddr; m_lRendezvousID.erase(i); return; } } } CUDT* CRendezvousQueue::retrieve(const sockaddr* addr, UDTSOCKET& id) { CGuard vg(m_RIDVectorLock); // TODO: optimize search for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++i) { if (CIPAddress::ipcmp(addr, i->m_pPeerAddr, i->m_iIPversion) && ((0 == id) || (id == i->m_iID))) { id = i->m_iID; return i->m_pUDT; } } return NULL; }
那接收隊列CRcvQueue是用這個隊列來作什麼的呢?這主要與接收隊列CRcvQueue的消息dispatch機制有關。在接收隊列CRcvQueue的worker線程中,接收到一條消息以後,它會根據消息的目標SocketID,及發送端的地址等信息,將消息以不一樣的方式進行dispatch,m_pRendezvousQueue中的CUDT是其中的一類dispatch目標。後面咱們在研究消息接收時,會再來仔細研究接收隊列CRcvQueue的worker線程及m_pRendezvousQueue。
4. 構造 鏈接請求 消息CHandShake m_ConnReq。能夠看一下CHandShake的定義(src/packet.h):
class CHandShake { public: CHandShake(); int serialize(char* buf, int& size); int deserialize(const char* buf, int size); public: static const int m_iContentSize; // Size of hand shake data public: int32_t m_iVersion; // UDT version int32_t m_iType; // UDT socket type int32_t m_iISN; // random initial sequence number int32_t m_iMSS; // maximum segment size int32_t m_iFlightFlagSize; // flow control window size int32_t m_iReqType; // connection request type: 1: regular connection request, 0: rendezvous connection request, -1/-2: response int32_t m_iID; // socket ID int32_t m_iCookie; // cookie uint32_t m_piPeerIP[4]; // The IP address that the peer's UDP port is bound to };
CHandShake的m_iID爲發起端UDT Socket的SocketID,請求類型m_iReqType將被設置爲了1,還設置了m_iMSS用於協商MSS值。CHandShake的構造函數會初始化全部的字段(src/packet.cpp):
CHandShake::CHandShake() : m_iVersion(0), m_iType(0), m_iISN(0), m_iMSS(0), m_iFlightFlagSize(0), m_iReqType(0), m_iID(0), m_iCookie_iCookie(0) { for (int i = 0; i < 4; ++i) m_piPeerIP[i] = 0; }
能夠看到m_iCookie被初始化爲了0。但注意在這裏,CHandShake m_ConnReq的構造過程當中,m_iCookie並無被賦予新值。
5. 隨機初始化序列號Sequence Number。
6. 建立一個CPacket結構request,爲它建立大小爲m_iPayloadSize的緩衝區,將該緩衝區pack進CPacket結構,並專門把request.m_iID,也就是這個包發送的目的UDT SocketID,設置爲0。
m_iPayloadSize的值根據UDT Socket建立者的不一樣,在不一樣的地方設置。由應用程序建立的UDT Socket在CUDT::open()中設置,好比Listening的UDT Socket在bind時會執行CUDT::open(),或者鏈接UDT Server但沒有執行過bind操做的UDT Socket會在CUDTUnited::connect()中執行CUDT::open();UDT Server中由Listening的UDT Socket收到鏈接請求時建立的UDT Socket,在CUDT::connect(const sockaddr* peer, CHandShake* hs)中初設置;發起鏈接的UDT Socket還會在CUDT::connect(const CPacket& response)中再次更新這個值。但這個值老是被設置爲m_iPktSize - CPacket::m_iPktHdrSize,CPacket::m_iPktHdrSize爲固定的UDT Packet Header大小16。
m_iPktSize老是與m_iPayloadSize在相同的地方設置,被設置爲m_iMSS - 28。m_iMSS,MSS(Maximum Segment Size,最大報文長度),這裏是UDT協議定義的一個選項,用於在UDT鏈接創建時,收發雙方協商通訊時每個報文段所能承載的最大數據長度。在CUDT對象建立時被初始化爲1500,但能夠經過UDT::setsockopt()進行設置。
這裏先來看一下CPacket的結構(src/packet.h):
class CPacket { friend class CChannel; friend class CSndQueue; friend class CRcvQueue; public: int32_t& m_iSeqNo; // alias: sequence number int32_t& m_iMsgNo; // alias: message number int32_t& m_iTimeStamp; // alias: timestamp int32_t& m_iID; // alias: socket ID char*& m_pcData; // alias: data/control information static const int m_iPktHdrSize; // packet header size public: CPacket(); ~CPacket(); // Functionality: // Get the payload or the control information field length. // Parameters: // None. // Returned value: // the payload or the control information field length. int getLength() const; // Functionality: // Set the payload or the control information field length. // Parameters: // 0) [in] len: the payload or the control information field length. // Returned value: // None. void setLength(int len); // Functionality: // Pack a Control packet. // Parameters: // 0) [in] pkttype: packet type filed. // 1) [in] lparam: pointer to the first data structure, explained by the packet type. // 2) [in] rparam: pointer to the second data structure, explained by the packet type. // 3) [in] size: size of rparam, in number of bytes; // Returned value: // None. void pack(int pkttype, void* lparam = NULL, void* rparam = NULL, int size = 0); // Functionality: // Read the packet vector. // Parameters: // None. // Returned value: // Pointer to the packet vector. iovec* getPacketVector(); // Functionality: // Read the packet flag. // Parameters: // None. // Returned value: // packet flag (0 or 1). int getFlag() const; // Functionality: // Read the packet type. // Parameters: // None. // Returned value: // packet type filed (000 ~ 111). int getType() const; // Functionality: // Read the extended packet type. // Parameters: // None. // Returned value: // extended packet type filed (0x000 ~ 0xFFF). int getExtendedType() const; // Functionality: // Read the ACK-2 seq. no. // Parameters: // None. // Returned value: // packet header field (bit 16~31). int32_t getAckSeqNo() const; // Functionality: // Read the message boundary flag bit. // Parameters: // None. // Returned value: // packet header field [1] (bit 0~1). int getMsgBoundary() const; // Functionality: // Read the message inorder delivery flag bit. // Parameters: // None. // Returned value: // packet header field [1] (bit 2). bool getMsgOrderFlag() const; // Functionality: // Read the message sequence number. // Parameters: // None. // Returned value: // packet header field [1] (bit 3~31). int32_t getMsgSeq() const; // Functionality: // Clone this packet. // Parameters: // None. // Returned value: // Pointer to the new packet. CPacket* clone() const; protected: uint32_t m_nHeader[4]; // The 128-bit header field iovec m_PacketVector[2]; // The 2-demension vector of UDT packet [header, data] int32_t __pad; protected: CPacket& operator=(const CPacket&); };
它的數據成員是有4個uint32_t元素的數組m_nHeader,描述UDT Packet的Header,和有兩個元素的iovec數組m_PacketVector。另外的幾個引用則主要是爲了方便對這些數據成員的訪問,看下CPacket的構造函數就一目瞭然了(src/packet.cpp):
// Set up the aliases in the constructure CPacket::CPacket() : m_iSeqNo((int32_t&) (m_nHeader[0])), m_iMsgNo((int32_t&) (m_nHeader[1])), m_iTimeStamp((int32_t&) (m_nHeader[2])), m_iID((int32_t&) (m_nHeader[3])), m_pcData((char*&) (m_PacketVector[1].iov_base)), __pad() { for (int i = 0; i < 4; ++i) m_nHeader[i] = 0; m_PacketVector[0].iov_base = (char *) m_nHeader; m_PacketVector[0].iov_len = CPacket::m_iPktHdrSize; m_PacketVector[1].iov_base = NULL; m_PacketVector[1].iov_len = 0; }
注意m_PacketVector的第一個元素指向了m_nHeader。
在CPacket::pack()中:
void CPacket::pack(int pkttype, void* lparam, void* rparam, int size) { // Set (bit-0 = 1) and (bit-1~15 = type) m_nHeader[0] = 0x80000000 | (pkttype << 16); // Set additional information and control information field switch (pkttype) { case 2: //0010 - Acknowledgement (ACK) // ACK packet seq. no. if (NULL != lparam) m_nHeader[1] = *(int32_t *) lparam; // data ACK seq. no. // optional: RTT (microsends), RTT variance (microseconds) advertised flow window size (packets), and estimated link capacity (packets per second) m_PacketVector[1].iov_base = (char *) rparam; m_PacketVector[1].iov_len = size; break; case 6: //0110 - Acknowledgement of Acknowledgement (ACK-2) // ACK packet seq. no. m_nHeader[1] = *(int32_t *) lparam; // control info field should be none // but "writev" does not allow this m_PacketVector[1].iov_base = (char *) &__pad; //NULL; m_PacketVector[1].iov_len = 4; //0; break; case 3: //0011 - Loss Report (NAK) // loss list m_PacketVector[1].iov_base = (char *) rparam; m_PacketVector[1].iov_len = size; break; case 4: //0100 - Congestion Warning // control info field should be none // but "writev" does not allow this m_PacketVector[1].iov_base = (char *) &__pad; //NULL; m_PacketVector[1].iov_len = 4; //0; break; case 1: //0001 - Keep-alive // control info field should be none // but "writev" does not allow this m_PacketVector[1].iov_base = (char *) &__pad; //NULL; m_PacketVector[1].iov_len = 4; //0; break; case 0: //0000 - Handshake // control info filed is handshake info m_PacketVector[1].iov_base = (char *) rparam; m_PacketVector[1].iov_len = size; //sizeof(CHandShake); break; case 5: //0101 - Shutdown // control info field should be none // but "writev" does not allow this m_PacketVector[1].iov_base = (char *) &__pad; //NULL; m_PacketVector[1].iov_len = 4; //0; break; case 7: //0111 - Message Drop Request // msg id m_nHeader[1] = *(int32_t *) lparam; //first seq no, last seq no m_PacketVector[1].iov_base = (char *) rparam; m_PacketVector[1].iov_len = size; break; case 8: //1000 - Error Signal from the Peer Side // Error type m_nHeader[1] = *(int32_t *) lparam; // control info field should be none // but "writev" does not allow this m_PacketVector[1].iov_base = (char *) &__pad; //NULL; m_PacketVector[1].iov_len = 4; //0; break; case 32767: //0x7FFF - Reserved for user defined control packets // for extended control packet // "lparam" contains the extended type information for bit 16 - 31 // "rparam" is the control information m_nHeader[0] |= *(int32_t *) lparam; if (NULL != rparam) { m_PacketVector[1].iov_base = (char *) rparam; m_PacketVector[1].iov_len = size; } else { m_PacketVector[1].iov_base = (char *) &__pad; m_PacketVector[1].iov_len = 4; } break; default: break; } }
在CPacket::pack()中,首先將m_nHeader[0],也就是m_iSeqNo的bit-0設爲1表示這是一個控制包,將bit-1~15設置爲消息的類型,而後根據消息的不一樣類型進行不一樣的處理。對於Handshake消息,其pkttype爲0,這裏主要關注pkttype爲0的case。可見它就是讓m_PacketVector[1]指向前面建立的緩衝區。
7. 將Handshake消息m_ConnReq序列化進前面建立的緩衝區,並正確地設置CPacket request的長度:
void CPacket::setLength(int len) { m_PacketVector[1].iov_len = len; } int CHandShake::serialize(char* buf, int& size) { if (size < m_iContentSize) return -1; int32_t* p = (int32_t*) buf; *p++ = m_iVersion; *p++ = m_iType; *p++ = m_iISN; *p++ = m_iMSS; *p++ = m_iFlightFlagSize; *p++ = m_iReqType; *p++ = m_iID; *p++ = m_iCookie; for (int i = 0; i < 4; ++i) *p++ = m_piPeerIP[i]; size = m_iContentSize; return 0; }
序列化時,會將Handshake消息m_ConnReq所有的內容拷貝進緩衝區。略感奇怪,這個地方居然徹底沒有顧及字節序的問題。
8. 調用發送隊列的sendto()函數,向目標地址發送消息:
int CSndQueue::sendto(const sockaddr* addr, CPacket& packet) { // send out the packet immediately (high priority), this is a control packet m_pChannel->sendto(addr, packet); return packet.getLength(); }
CSndQueue的sendto()函數直接調用了CChannel::sendto():
int CChannel::sendto(const sockaddr* addr, CPacket& packet) const { cout << "CChannel send packet " << packet.m_iID << endl << endl; // convert control information into network order if (packet.getFlag()) for (int i = 0, n = packet.getLength() / 4; i < n; ++i) *((uint32_t *) packet.m_pcData + i) = htonl(*((uint32_t *) packet.m_pcData + i)); // convert packet header into network order //for (int j = 0; j < 4; ++ j) // packet.m_nHeader[j] = htonl(packet.m_nHeader[j]); uint32_t* p = packet.m_nHeader; for (int j = 0; j < 4; ++j) { *p = htonl(*p); ++p; } #ifndef WIN32 msghdr mh; mh.msg_name = (sockaddr*) addr; mh.msg_namelen = m_iSockAddrSize; mh.msg_iov = (iovec*) packet.m_PacketVector; mh.msg_iovlen = 2; mh.msg_control = NULL; mh.msg_controllen = 0; mh.msg_flags = 0; int res = ::sendmsg(m_iSocket, &mh, 0); #else DWORD size = CPacket::m_iPktHdrSize + packet.getLength(); int addrsize = m_iSockAddrSize; int res = ::WSASendTo(m_iSocket, (LPWSABUF)packet.m_PacketVector, 2, &size, 0, addr, addrsize, NULL, NULL); res = (0 == res) ? size : -1; #endif // convert back into local host order //for (int k = 0; k < 4; ++ k) // packet.m_nHeader[k] = ntohl(packet.m_nHeader[k]); p = packet.m_nHeader; for (int k = 0; k < 4; ++k) { *p = ntohl(*p); ++p; } if (packet.getFlag()) { for (int l = 0, n = packet.getLength() / 4; l < n; ++l) *((uint32_t *) packet.m_pcData + l) = ntohl(*((uint32_t *) packet.m_pcData + l)); } return res; }
在CChannel::sendto()中會處理Header的字節序問題。
這裏總結一下,UDT Client向UDT Server發送的鏈接創建請求消息的內容:消息主要分爲兩個部分一個是消息的Header,一個是消息的Content。Header爲4個uint32_t類型變量,從前到後這4個變量的含義分別爲sequence number,message number,timestamp和目標SocketID。就Handshake而言,sequence number的最高位,也就是bit-0爲1,表示這是一個控制消息,bit-1~15爲pkttype 0,其它位爲0;message number及timestamp均爲0,目標SocketID爲0。
Content部分,總共48個字節,主要用於進行鏈接的協商,如MSS等,具體能夠看CHandShake。
9. 檢查是不是同步接收模式。若是不是的話,則delete掉前面爲request CPacket的CHandShake建立的緩衝區並退出。後面與UDT Server端進一步的消息交互會有接收隊列等幫忙異步地推進。不然繼續執行。值得一提的是,CUDT在其構造函數中,會將m_bSynRecving置爲true,但在拷貝構造函數中,則會繼承傳入的值。但這個值如同MSS值同樣,也能夠經過UDT::setOpt()設置。也就是說由應用程序建立的UDT Socket默認處於同步接收模式,好比Listening的UDT Socket和發起鏈接的UDT Socket,但能夠自行設置,由Listening的UDT Socket在接收到鏈接創建請求時建立的UDT Socket,則會繼承Listening UDT Socket的對應值。
咱們暫時先看SynRecving模式,也就是默認模式下的UDT Socket的行爲。
10. 建立一個CPacket response,一樣爲它建立一個大小爲m_iPayloadSize的緩衝區以存放數據,並將緩衝區pack進response中。這個CPacket response會被用來存放從UDT Server發回的相應的信息。
11. 進入一個循環執行後續的握手動做,及消息的超時重傳等動做。能夠將這個循環看作由3個部分組成。
循環開始的地方是一段發送消息的代碼,在這段代碼中,其實作了兩個事情,或者說可能會發送兩種類型的消息,一是第一個握手消息的超時重傳,二是第二個握手消息的發送及超時重傳。看上去發送的都是CHandShake m_ConnReq,但在接收到第一個握手消息的響應以後,這個結構的某些成員會根據響應而被修改。注意,發送第一個握手消息以後,首次進入循環,將會跳過這個部分。
以後的第二部分,主要用於接收響應,第一個握手消息的響應及第二個握手消息的響應。來看CRcvQueue::recvfrom()(src/queue.cpp):
int CRcvQueue::recvfrom(int32_t id, CPacket& packet) { CGuard bufferlock(m_PassLock); map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id); if (i == m_mBuffer.end()) { #ifndef WIN32 uint64_t now = CTimer::getTime(); timespec timeout; timeout.tv_sec = now / 1000000 + 1; timeout.tv_nsec = (now % 1000000) * 1000; pthread_cond_timedwait(&m_PassCond, &m_PassLock, &timeout); #else ReleaseMutex(m_PassLock); WaitForSingleObject(m_PassCond, 1000); WaitForSingleObject(m_PassLock, INFINITE); #endif i = m_mBuffer.find(id); if (i == m_mBuffer.end()) { packet.setLength(-1); return -1; } } // retrieve the earliest packet CPacket* newpkt = i->second.front(); if (packet.getLength() < newpkt->getLength()) { packet.setLength(-1); return -1; } // copy packet content memcpy(packet.m_nHeader, newpkt->m_nHeader, CPacket::m_iPktHdrSize); memcpy(packet.m_pcData, newpkt->m_pcData, newpkt->getLength()); packet.setLength(newpkt->getLength()); delete[] newpkt->m_pcData; delete newpkt; // remove this message from queue, // if no more messages left for this socket, release its data structure i->second.pop(); if (i->second.empty()) m_mBuffer.erase(i); return packet.getLength(); }
這也是一個生產者-消費者模型,在這裏就如同listen的過程同樣,也只能看到這個生產與消費的故事的一半,即消費的那一半。生產者也是RcvQueue的worker線程。這個地方會等待着消息的到來,但也不會無限制的等待,能夠看到,這裏接收消息的等待時間大概爲1s。這裏是在等待一個CPacket隊列的出現,也就是m_mBuffer中目標UDT Socket的CPacket隊列。這裏會從這個隊列中取出第一個packet返回給調用者。若是隊列被取空了,會直接將這個隊列從m_mBuffer中移除出去。
循環的第三部分是整個鏈接創建消息交互過程的超時處理,能夠看到,非Rendezvous模式下超時時間爲3s,Rendezvous模式下,超時時間則會延長十倍。
CUDT::connect()執行到接收第一個握手消息的相應時,鏈接創建請求的發起也算是基本完成了。下面來看UDT Server端收到這個消息時是如何處理的。
來看UDT Server端收到這個消息時是如何處理的。如咱們前面在 UDT協議實現分析——bind、listen與accept 一文中瞭解到的,Listening的UDT Socket會在UDT::accept()中等待鏈接請求進來,那是一個生產者與消費者的故事,UDT::accept()是生產者,接收隊列RcvQueue的worker線程是消費者。
咱們這就來仔細地看一下RcvQueue的worker線程,固然重點會關注對於Handshake消息,也就是目標SocketID爲0,pkttype爲0的packet的處理(src/queue.cpp):
#ifndef WIN32 void* CRcvQueue::worker(void* param) #else DWORD WINAPI CRcvQueue::worker(LPVOID param) #endif { CRcvQueue* self = (CRcvQueue*) param; sockaddr* addr = (AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6; CUDT* u = NULL; int32_t id; while (!self->m_bClosing) { #ifdef NO_BUSY_WAITING self->m_pTimer->tick(); #endif // check waiting list, if new socket, insert it to the list while (self->ifNewEntry()) { CUDT* ne = self->getNewEntry(); if (NULL != ne) { self->m_pRcvUList->insert(ne); self->m_pHash->insert(ne->m_SocketID, ne); } } // find next available slot for incoming packet CUnit* unit = self->m_UnitQueue.getNextAvailUnit(); if (NULL == unit) { // no space, skip this packet CPacket temp; temp.m_pcData = new char[self->m_iPayloadSize]; temp.setLength(self->m_iPayloadSize); self->m_pChannel->recvfrom(addr, temp); delete[] temp.m_pcData; goto TIMER_CHECK; } unit->m_Packet.setLength(self->m_iPayloadSize); // reading next incoming packet, recvfrom returns -1 is nothing has been received if (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0) goto TIMER_CHECK; id = unit->m_Packet.m_iID; // ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets if (0 == id) { if (NULL != self->m_pListener) self->m_pListener->listen(addr, unit->m_Packet); else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))) { // asynchronous connect: call connect here // otherwise wait for the UDT socket to retrieve this packet if (!u->m_bSynRecving) u->connect(unit->m_Packet); else self->storePkt(id, unit->m_Packet.clone()); } } else if (id > 0) { if (NULL != (u = self->m_pHash->lookup(id))) { if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion)) { if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) { if (0 == unit->m_Packet.getFlag()) u->processData(unit); else u->processCtrl(unit->m_Packet); u->checkTimers(); self->m_pRcvUList->update(u); } } } else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))) { if (!u->m_bSynRecving) u->connect(unit->m_Packet); else self->storePkt(id, unit->m_Packet.clone()); } } TIMER_CHECK: // take care of the timing event for all UDT sockets uint64_t currtime; CTimer::rdtsc(currtime); CRNode* ul = self->m_pRcvUList->m_pUList; uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency(); while ((NULL != ul) && (ul->m_llTimeStamp < ctime)) { CUDT* u = ul->m_pUDT; if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) { u->checkTimers(); self->m_pRcvUList->update(u); } else { // the socket must be removed from Hash table first, then RcvUList self->m_pHash->remove(u->m_SocketID); self->m_pRcvUList->remove(u); u->m_pRNode->m_bOnList = false; } ul = self->m_pRcvUList->m_pUList; } // Check connection requests status for all sockets in the RendezvousQueue. self->m_pRendezvousQueue->updateConnStatus(); } if (AF_INET == self->m_UnitQueue.m_iIPversion) delete (sockaddr_in*) addr; else delete (sockaddr_in6*) addr; #ifndef WIN32 return NULL; #else SetEvent(self->m_ExitCond); return 0; #endif }
這個函數,首先建立了一個sockaddr,用於保存發送端的地址。
而後就進入了一個循環,不斷地接收UDP消息。
循環內的第一行是執行Timer的tick(),這個是UDT本身的定時器Timer機制的一部分。
接下來的這個子循環也主要與RcvQueue的worker線程中消息的dispatch機制有關。
而後是取一個CUnit,用來接收其它端點發送過來的消息。若是取不到,則接收UDP包並丟棄。而後跳事後面消息dispatch的過程。這個地方的m_UnitQueue用來作緩存,也用來防止收到過多的包消耗過多的資源。完整的CUnitQueue機制暫時先不去仔細分析。
而後就是取到了CUnit的狀況,則先經過CChannel接收一個包,並根據包的內容進行包的dispatch。不能跑偏了,這裏主要關注目標SocketID爲0,pkttype爲0的包的dispatch。能夠看到,在Listener存在的狀況下,是dispatch給了listener,也就是Listening的UDT Socket的CUDT的listen()函數,不然會dispatch給通道上處於Rendezvous模式的UDT Socket。(在 UDT協議實現分析——bind、listen與accept 一文中關於listen的部分有具體理過這個listener的設置過程。)能夠看到,對於相同的通道CChannel,也就是同一個端口上,Rendezvous模式下的UDT Socket和Listening的UDT Socket不能共存,或者說同時存在時,Rendezvous的行爲可能不是預期的,但多個處於Rendezvous模式下的UDT Socket能夠共存。
接收隊列CRcvQueue的worker()線程作的其它事情,暫時先不去仔細看。這裏先來理一下Listening的UDT Socket在接收到Handshake消息的處理過程,也就是CUDT::listen(sockaddr* addr, CPacket& packet)(src/core.cpp):
int CUDT::listen(sockaddr* addr, CPacket& packet) { if (m_bClosing) return 1002; if (packet.getLength() != CHandShake::m_iContentSize) return 1004; CHandShake hs; hs.deserialize(packet.m_pcData, packet.getLength()); // SYN cookie char clienthost[NI_MAXHOST]; char clientport[NI_MAXSERV]; getnameinfo(addr, (AF_INET == m_iVersion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6), clienthost, sizeof(clienthost), clientport, sizeof(clientport), NI_NUMERICHOST | NI_NUMERICSERV); int64_t timestamp = (CTimer::getTime() - m_StartTime) / 60000000; // secret changes every one minute stringstream cookiestr; cookiestr << clienthost << ":" << clientport << ":" << timestamp; unsigned char cookie[16]; CMD5::compute(cookiestr.str().c_str(), cookie); if (1 == hs.m_iReqType) { hs.m_iCookie = *(int*) cookie; packet.m_iID = hs.m_iID; int size = packet.getLength(); hs.serialize(packet.m_pcData, size); m_pSndQueue->sendto(addr, packet); return 0; } else { if (hs.m_iCookie != *(int*) cookie) { timestamp--; cookiestr << clienthost << ":" << clientport << ":" << timestamp; CMD5::compute(cookiestr.str().c_str(), cookie); if (hs.m_iCookie != *(int*) cookie) return -1; } } int32_t id = hs.m_iID; // When a peer side connects in... if ((1 == packet.getFlag()) && (0 == packet.getType())) { if ((hs.m_iVersion != m_iVersion) || (hs.m_iType != m_iSockType)) { // mismatch, reject the request hs.m_iReqType = 1002; int size = CHandShake::m_iContentSize; hs.serialize(packet.m_pcData, size); packet.m_iID = id; m_pSndQueue->sendto(addr, packet); } else { int result = s_UDTUnited.newConnection(m_SocketID, addr, &hs); if (result == -1) hs.m_iReqType = 1002; // send back a response if connection failed or connection already existed // new connection response should be sent in connect() if (result != 1) { int size = CHandShake::m_iContentSize; hs.serialize(packet.m_pcData, size); packet.m_iID = id; m_pSndQueue->sendto(addr, packet); } else { // a new connection has been created, enable epoll for write s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true); } } } return hs.m_iReqType; }
在這個函數中主要作了這樣的一些事情:
1. 檢查UDT Socket的狀態,若是處於Closing狀態下,就返回,不然繼續執行。
2. 檢查包的數據部分長度。若長度不爲CHandShake::m_iContentSize 48字節,則說明這不是一個有效的Handshake,則返回,不然繼續執行。
3. 建立一個CHandShake hs,並將傳入的packet的數據部分反序列化進這個CHandShake。這裏來掃一眼這個CHandShake::deserialize()(src/packet.cpp):
int CHandShake::deserialize(const char* buf, int size) { if (size < m_iContentSize) return -1; int32_t* p = (int32_t*) buf; m_iVersion = *p++; m_iType = *p++; m_iISN = *p++; m_iMSS = *p++; m_iFlightFlagSize = *p++; m_iReqType = *p++; m_iID = *p++; m_iCookie = *p++; for (int i = 0; i < 4; ++i) m_piPeerIP[i] = *p++; return 0; }
這個函數如同它的反函數serialize()同樣沒有處理字節序的問題。
4. 計算cookie值。所謂cookie值,即由鏈接發起端的網絡地址(包括IP地址與端口號)及時間戳組成的字符串計算出來的16個字節長度的MD5值。時間戳精確到分鐘值。用於計算MD5值的字符串相似127.0.0.1:49033:0。
5. 計算出來cookie值以後的部分,應該被分紅兩個部分。一部分處理鏈接發起端發送的地一個握手包,也就是hs.m_iReqType == 1的block,在CUDT::connect()中構造m_ConnReq的部分咱們有看到這個值要被設爲1的;另外一部分則處理鏈接發起端發送的第二個握手消息。這裏咱們先來看hs.m_iReqType == 1的block。
它取前一步計算的cookie的前4個字節,直接將其強轉爲一個int值,賦給前面反序列化的CHandShake的m_iCookie。這個地方居然顧及字節序的問題,也沒有顧及不一樣平臺的差別,即int類型的長度在不一樣的機器上可能不一樣,這個地方用int32_t彷佛要更安全一點。將CHandShake的m_iID,如咱們在CUDT::connect()中構造m_ConnReq的部分咱們有看到的,爲鏈接發起端UDT Socket的SocketID,設置給packet的m_iID,也就是包的目標SocketID。再將hs從新序列化進packet。經過發送隊列SndQueue發送通過了這一番修改的packet。而後返回。
總結一下UDT Server中Listening的UDT Socket接收到第一個HandShake包時,對於這個包的處理過程:
計算一個cookie值,設置給接收到的HandShake的cookie字段,修改包的目標SocketID字段爲發起鏈接的UDT Socket的SocketID,包的其它部分原封不動,最後將這個包從新發回給鏈接發起端。
UDT Server接收到第一個HandShake消息,回給UDT Client一個HandShake消息。這樣球就又被踢回給了UDT Client端。接着來看在UDT Client端接收到首個HandShake包的響應後會作什麼樣的處理。
咱們知道在CUDT::connect(const sockaddr* serv_addr)中,發送首個HandShake包以後,會調用CRcvQueue::recvfrom()來等着接收UDT Server的響應,消費者焦急地等待着食物的到來。在消息到來時,CUDT::connect()會被生產者,也就是CRcvQueue的worker線程喚醒。這裏就來具體看一下這個生產與消費的故事的另外一半,生產的故事,也就是CRcvQueue的worker線程的消息dispatch。
在CRcvQueue::worker()中包dispatch的部分能夠看到:
} else if (id > 0) { if (NULL != (u = self->m_pHash->lookup(id))) { if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion)) { cout << "Receive packet by m_pHash table" << endl; if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) { if (0 == unit->m_Packet.getFlag()) u->processData(unit); else u->processCtrl(unit->m_Packet); u->checkTimers(); self->m_pRcvUList->update(u); } } } else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))) { cout << "Receive packet by m_pRendezvousQueue, u->m_bSynRecving " << u->m_bSynRecving << endl; if (!u->m_bSynRecving) u->connect(unit->m_Packet); else self->storePkt(id, unit->m_Packet.clone()); } }
咱們知道UDT Server回覆的消息中是設置了目標SocketID了的。於是會走id > 0的block。
在CUDT::connect( const sockaddr* serv_addr )中有看到調用m_pRcvQueue->registerConnector()將CUDT添加進RcvQueue的m_pRendezvousQueue中,於是這裏會執行id > 0 block中下面的那個block。若是前面對於m_bSynRecving的分析,默認狀況爲true。於是這個地方會執行CRcvQueue::storePkt()來存儲包。來看這個函數的實現:
void CRcvQueue::storePkt(int32_t id, CPacket* pkt) { CGuard bufferlock(m_PassLock); map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id); if (i == m_mBuffer.end()) { m_mBuffer[id].push(pkt); #ifndef WIN32 pthread_cond_signal(&m_PassCond); #else SetEvent(m_PassCond); #endif } else { //avoid storing too many packets, in case of malfunction or attack if (i->second.size() > 16) return; i->second.push(pkt); } }
在這個函數中會保存接收到的packet,並在必要的時候喚醒等待接收消息的線程。(對應CRcvQueue::recvfrom()的邏輯來看。)
而後來看CUDT::connect(const sockaddr* serv_addr)在收到第一個HandShake消息的響應以後會作什麼樣的處理,也就是CUDT::connect(const CPacket& response)(src/core.cpp):
int CUDT::connect(const CPacket& response) throw () { // this is the 2nd half of a connection request. If the connection is setup successfully this returns 0. // returning -1 means there is an error. // returning 1 or 2 means the connection is in process and needs more handshake if (!m_bConnecting) return -1; if (m_bRendezvous && ((0 == response.getFlag()) || (1 == response.getType())) && (0 != m_ConnRes.m_iType)) { //a data packet or a keep-alive packet comes, which means the peer side is already connected // in this situation, the previously recorded response will be used goto POST_CONNECT; } if ((1 != response.getFlag()) || (0 != response.getType())) return -1; m_ConnRes.deserialize(response.m_pcData, response.getLength()); if (m_bRendezvous) { // regular connect should NOT communicate with rendezvous connect // rendezvous connect require 3-way handshake if (1 == m_ConnRes.m_iReqType) return -1; if ((0 == m_ConnReq.m_iReqType) || (0 == m_ConnRes.m_iReqType)) { m_ConnReq.m_iReqType = -1; // the request time must be updated so that the next handshake can be sent out immediately. m_llLastReqTime = 0; return 1; } } else { // set cookie if (1 == m_ConnRes.m_iReqType) { m_ConnReq.m_iReqType = -1; m_ConnReq.m_iCookie = m_ConnRes.m_iCookie; m_llLastReqTime = 0; return 1; } }
這個函數會處理第一個HandShake的響應,也會處理第二個HandShake的響應,這裏先來關注第一個HandShake的響應的處理,於是只列出它的一部分的代碼。
這個函數先是檢查了CUDT的狀態,檢查了packet的有效性,而後就是將接收到的包的數據部分反序列化至CHandShake m_ConnRes中。咱們不關注對於Rendezvous模式的處理。
接着會檢查m_ConnRes的m_iReqType,若爲1,則設置m_ConnReq.m_iReqType爲-1,設置m_ConnReq.m_iCookie爲m_ConnRes.m_iCookie用以標識m_ConnReq爲一個合法的第二個HandShake packet;同時設置m_llLastReqTime爲0,如咱們前面對CUDT::connect(const sockaddr* serv_addr)的分析,以便於此刻保存於m_ConnReq中的第二個HandShake可以被髮送出去as soon as possible。
這第二個HandShake,與第一個HandShake的差別僅僅在於有了有效的Cookie值,且請求類型ReqType爲-1。其它則徹底同樣。
UDT Client對於m_ConnReq的改變並不足以改變接收隊列中worker線程對這個包的dispatch規則,於是直接來看CUDT::listen(sockaddr* addr, CPacket& packet)中對於這第二個HandShake消息的處理。
接着前面對於這個函數的分析,接前面的第4步。
5. 對於這第二個HandShake,它的ReqType天然再也不是1了,而是-1。於是在計算完了cookie值以後,它會先驗證一下HandShake包中的cookie值是不是有效的,若是無效,則直接返回。根據這個地方的邏輯,能夠看到cookie的有效時間最長爲2分鐘。
6. 檢查包的Flag和Type,若是不是HandShake包,則直接返回,不然繼續執行。
7. 檢查鏈接發起端IP的版本及Socket類型SockType與本地Listen的UDT Socket是否匹配。若不匹配,則將錯誤碼1002放在發過來的HandShanke的ReqType字段中,設置packet的目標SocketID爲發起鏈接的SocketID,而後將這個包從新發回給UDT Client。
8. 檢查以後,發現徹底匹配的狀況。調用CUDTUnited::newConnection()建立一個新的UDT Socket。若建立過程執行失敗,則將錯誤碼1002放在發過來的HandShanke的ReqType字段中。若建立成功,會設置發過來的packet的目標SocketID爲適當的值,而後將同一個包再發送回UDT Client。CUDTUnited::newConnection()會適當地修改HandShake packet的一些字段。若失敗在執行s_UDTUnited.m_EPoll.update_events()。
9. 返回hs.m_iReqType。
而後來看在CUDTUnited::newConnection()中是如何新建Socket的:
int CUDTUnited::newConnection(const UDTSOCKET listen, const sockaddr* peer, CHandShake* hs) { CUDTSocket* ns = NULL; CUDTSocket* ls = locate(listen); if (NULL == ls) return -1; // if this connection has already been processed if (NULL != (ns = locate(peer, hs->m_iID, hs->m_iISN))) { if (ns->m_pUDT->m_bBroken) { // last connection from the "peer" address has been broken ns->m_Status = CLOSED; ns->m_TimeStamp = CTimer::getTime(); CGuard::enterCS(ls->m_AcceptLock); ls->m_pQueuedSockets->erase(ns->m_SocketID); ls->m_pAcceptSockets->erase(ns->m_SocketID); CGuard::leaveCS(ls->m_AcceptLock); } else { // connection already exist, this is a repeated connection request // respond with existing HS information hs->m_iISN = ns->m_pUDT->m_iISN; hs->m_iMSS = ns->m_pUDT->m_iMSS; hs->m_iFlightFlagSize = ns->m_pUDT->m_iFlightFlagSize; hs->m_iReqType = -1; hs->m_iID = ns->m_SocketID; return 0; //except for this situation a new connection should be started } } // exceeding backlog, refuse the connection request if (ls->m_pQueuedSockets->size() >= ls->m_uiBackLog) return -1; try { ns = new CUDTSocket; ns->m_pUDT = new CUDT(*(ls->m_pUDT)); if (AF_INET == ls->m_iIPversion) { ns->m_pSelfAddr = (sockaddr*) (new sockaddr_in); ((sockaddr_in*) (ns->m_pSelfAddr))->sin_port = 0; ns->m_pPeerAddr = (sockaddr*) (new sockaddr_in); memcpy(ns->m_pPeerAddr, peer, sizeof(sockaddr_in)); } else { ns->m_pSelfAddr = (sockaddr*) (new sockaddr_in6); ((sockaddr_in6*) (ns->m_pSelfAddr))->sin6_port = 0; ns->m_pPeerAddr = (sockaddr*) (new sockaddr_in6); memcpy(ns->m_pPeerAddr, peer, sizeof(sockaddr_in6)); } } catch (...) { delete ns; return -1; } CGuard::enterCS(m_IDLock); ns->m_SocketID = --m_SocketID; cout << "new CUDTSocket SocketID is " << ns->m_SocketID << " PeerID " << hs->m_iID << endl; CGuard::leaveCS(m_IDLock); ns->m_ListenSocket = listen; ns->m_iIPversion = ls->m_iIPversion; ns->m_pUDT->m_SocketID = ns->m_SocketID; ns->m_PeerID = hs->m_iID; ns->m_iISN = hs->m_iISN; int error = 0; try { // bind to the same addr of listening socket ns->m_pUDT->open(); updateMux(ns, ls); ns->m_pUDT->connect(peer, hs); } catch (...) { error = 1; goto ERR_ROLLBACK; } ns->m_Status = CONNECTED; // copy address information of local node ns->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(ns->m_pSelfAddr); CIPAddress::pton(ns->m_pSelfAddr, ns->m_pUDT->m_piSelfIP, ns->m_iIPversion); // protect the m_Sockets structure. CGuard::enterCS(m_ControlLock); try { m_Sockets[ns->m_SocketID] = ns; m_PeerRec[(ns->m_PeerID << 30) + ns->m_iISN].insert(ns->m_SocketID); } catch (...) { error = 2; } CGuard::leaveCS(m_ControlLock); CGuard::enterCS(ls->m_AcceptLock); try { ls->m_pQueuedSockets->insert(ns->m_SocketID); } catch (...) { error = 3; } CGuard::leaveCS(ls->m_AcceptLock); // acknowledge users waiting for new connections on the listening socket m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, true); CTimer::triggerEvent(); ERR_ROLLBACK: if (error > 0) { ns->m_pUDT->close(); ns->m_Status = CLOSED; ns->m_TimeStamp = CTimer::getTime(); return -1; } // wake up a waiting accept() call #ifndef WIN32 pthread_mutex_lock(&(ls->m_AcceptLock)); pthread_cond_signal(&(ls->m_AcceptCond)); pthread_mutex_unlock(&(ls->m_AcceptLock)); #else SetEvent(ls->m_AcceptCond); #endif return 1; }
在這個函數中作了以下這樣的一些事情:
1. 找到listening的UDT Socket的CUDTSocket結構,若找不到則直接返回-1。不然繼續執行。
2. 檢查相同的鏈接請求是否已經處理過了。在CUDTUnited有一個專門的緩衝區m_PeerRec,用來存放由Listening的Socket建立的UDT Socket,這裏主要是經過在這個緩衝區中查找是否已經有connection請求對應的socket來判斷:
CUDTSocket* CUDTUnited::locate(const sockaddr* peer, const UDTSOCKET id, int32_t isn) { CGuard cg(m_ControlLock); map<int64_t, set<UDTSOCKET> >::iterator i = m_PeerRec.find((id << 30) + isn); if (i == m_PeerRec.end()) return NULL; for (set<UDTSOCKET>::iterator j = i->second.begin(); j != i->second.end(); ++j) { map<UDTSOCKET, CUDTSocket*>::iterator k = m_Sockets.find(*j); // this socket might have been closed and moved m_ClosedSockets if (k == m_Sockets.end()) continue; if (CIPAddress::ipcmp(peer, k->second->m_pPeerAddr, k->second->m_iIPversion)) return k->second; } return NULL; }
若是已經爲這個connection請求建立了UDT Socket,又分爲兩種狀況:
(1). 爲connection請求建立的UDT Socket仍是好的,可用的,則根據以前建立的UDT Socket的一些字段設置接收到的HandShake,m_iReqType會被設置爲-1,m_iID會被設置爲UDT Socket的SocketID。而後返回0。如咱們前面在CUDTUnited::newConnection()中看到的,這樣返回以後,CUDTUnited::newConnection()會發送一個響應消息給UDT Client。
(2). 爲connection請求建立的UDT Socket已經爛掉了,不可用了,此時則主要會將其狀態設置爲CLOSED,設置時間戳,將其從m_pQueuedSockets和m_pAcceptSockets中移除出去。而後執行後續的新建UDT Socket的流程。
但對於一個由Listening Socket建立的UDT Socket而言,又會是什麼緣由致使它處於broken狀態呢?此處這樣的檢查是否真有必要呢?後面會再來研究。
3. 檢查m_pQueuedSockets的大小是否超出了爲Listening的UDT Socket設置的backlog大小,若超出,則返回-1,不然繼續執行。
4. 建立一個CUDTSocket對象。建立一個CUDT對象,這裏建立的CUDT對象會繼承Listening的UDT Socket的許多屬性(src/api.cpp):
CUDT::CUDT(const CUDT& ancestor) { m_pSndBuffer = NULL; m_pRcvBuffer = NULL; m_pSndLossList = NULL; m_pRcvLossList = NULL; m_pACKWindow = NULL; m_pSndTimeWindow = NULL; m_pRcvTimeWindow = NULL; m_pSndQueue = NULL; m_pRcvQueue = NULL; m_pPeerAddr = NULL; m_pSNode = NULL; m_pRNode = NULL; // Initilize mutex and condition variables initSynch(); // Default UDT configurations m_iMSS = ancestor.m_iMSS; m_bSynSending = ancestor.m_bSynSending; m_bSynRecving = ancestor.m_bSynRecving; m_iFlightFlagSize = ancestor.m_iFlightFlagSize; m_iSndBufSize = ancestor.m_iSndBufSize; m_iRcvBufSize = ancestor.m_iRcvBufSize; m_Linger = ancestor.m_Linger; m_iUDPSndBufSize = ancestor.m_iUDPSndBufSize; m_iUDPRcvBufSize = ancestor.m_iUDPRcvBufSize; m_iSockType = ancestor.m_iSockType; m_iIPversion = ancestor.m_iIPversion; m_bRendezvous = ancestor.m_bRendezvous; m_iSndTimeOut = ancestor.m_iSndTimeOut; m_iRcvTimeOut = ancestor.m_iRcvTimeOut; m_bReuseAddr = true; // this must be true, because all accepted sockets shared the same port with the listener m_llMaxBW = ancestor.m_llMaxBW; m_pCCFactory = ancestor.m_pCCFactory->clone(); m_pCC = NULL; m_pCache = ancestor.m_pCache; // Initial status m_bOpened = false; m_bListening = false; m_bConnecting = false; m_bConnected = false; m_bClosing = false; m_bShutdown = false; m_bBroken = false; m_bPeerHealth = true; m_ullLingerExpiration = 0; }
爲SelfAddr分配內存。
爲PeerAddr分配內存。
拷貝發送端地址到PeerAddr。
設置SocketID。等等。
5. 執行ns->m_pUDT->open()完成打開動做。而後執行updateMux(ns, ls),將新建的這個UDT Socket綁定到Listening的UDT Socket所綁定的多路複用器:
void CUDTUnited::updateMux(CUDTSocket* s, const CUDTSocket* ls) { CGuard cg(m_ControlLock); int port = (AF_INET == ls->m_iIPversion) ? ntohs(((sockaddr_in*) ls->m_pSelfAddr)->sin_port) : ntohs(((sockaddr_in6*) ls->m_pSelfAddr)->sin6_port); // find the listener's address for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++i) { if (i->second.m_iPort == port) { // reuse the existing multiplexer ++i->second.m_iRefCount; s->m_pUDT->m_pSndQueue = i->second.m_pSndQueue; s->m_pUDT->m_pRcvQueue = i->second.m_pRcvQueue; s->m_iMuxID = i->second.m_iID; return; } } }6. 執行 ns->m_pUDT->connect(peer, hs):
void CUDT::connect(const sockaddr* peer, CHandShake* hs) { CGuard cg(m_ConnectionLock); // Uses the smaller MSS between the peers if (hs->m_iMSS > m_iMSS) hs->m_iMSS = m_iMSS; else m_iMSS = hs->m_iMSS; // exchange info for maximum flow window size m_iFlowWindowSize = hs->m_iFlightFlagSize; hs->m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize) ? m_iRcvBufSize : m_iFlightFlagSize; m_iPeerISN = hs->m_iISN; m_iRcvLastAck = hs->m_iISN; m_iRcvLastAckAck = hs->m_iISN; m_iRcvCurrSeqNo = hs->m_iISN - 1; m_PeerID = hs->m_iID; hs->m_iID = m_SocketID; // use peer's ISN and send it back for security check m_iISN = hs->m_iISN; m_iLastDecSeq = m_iISN - 1; m_iSndLastAck = m_iISN; m_iSndLastDataAck = m_iISN; m_iSndCurrSeqNo = m_iISN - 1; m_iSndLastAck2 = m_iISN; m_ullSndLastAck2Time = CTimer::getTime(); // this is a reponse handshake hs->m_iReqType = -1; // get local IP address and send the peer its IP address (because UDP cannot get local IP address) memcpy(m_piSelfIP, hs->m_piPeerIP, 16); CIPAddress::ntop(peer, hs->m_piPeerIP, m_iIPversion); m_iPktSize = m_iMSS - 28; m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize; // Prepare all structures try { m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize); m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize); m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2); m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize); m_pACKWindow = new CACKWindow(1024); m_pRcvTimeWindow = new CPktTimeWindow(16, 64); m_pSndTimeWindow = new CPktTimeWindow(); } catch (...) { throw CUDTException(3, 2, 0); } CInfoBlock ib; ib.m_iIPversion = m_iIPversion; CInfoBlock::convert(peer, m_iIPversion, ib.m_piIP); if (m_pCache->lookup(&ib) >= 0) { m_iRTT = ib.m_iRTT; m_iBandwidth = ib.m_iBandwidth; } m_pCC = m_pCCFactory->create(); m_pCC->m_UDT = m_SocketID; m_pCC->setMSS(m_iMSS); m_pCC->setMaxCWndSize(m_iFlowWindowSize); m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo); m_pCC->setRcvRate(m_iDeliveryRate); m_pCC->setRTT(m_iRTT); m_pCC->setBandwidth(m_iBandwidth); m_pCC->init(); m_ullInterval = (uint64_t) (m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); m_dCongestionWindow = m_pCC->m_dCWndSize; m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6; memcpy(m_pPeerAddr, peer, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6)); // And of course, it is connected. m_bConnected = true; // register this socket for receiving data packets m_pRNode->m_bOnList = true; m_pRcvQueue->setNewEntry(this); //send the response to the peer, see listen() for more discussions about this CPacket response; int size = CHandShake::m_iContentSize; char* buffer = new char[size]; hs->serialize(buffer, size); response.pack(0, NULL, buffer, size); response.m_iID = m_PeerID; m_pSndQueue->sendto(peer, response); delete[] buffer; }
這個函數裏會根據HandShake包設置很是多的成員。但主要來關注m_pRcvQueue->setNewEntry(this),這個調用也是與RcvQueue的worker線程的消息dispatch機制有關。後面咱們會再來仔細地瞭解這個函數。
這個函數會在最後發送響應給UDT Client。
7. 將UDT Socket的狀態置爲CONNECTED。拷貝Channel的地址到PeerAddr。
8. 將建立的CUDTSocket放進m_Sockets中,同時放進m_PeerRec中。
9. 將建立的UDT Socket放進m_pQueuedSockets中。這正是Listening UDT Socket accept那個生產-消費故事的另外一半,這裏是生產者。
10. 將等待在accept()的線程喚醒。至此在UDT Server端,accept()返回一個UDT Socket,UDT Server認爲一個鏈接成功創建。
如咱們前面看到的,CUDT::connect(const sockaddr* serv_addr)在發送了第二個Handshake消息以後,它就會開是等待UDT Server的第二次響應。UDT Server發送第二個Handshake消息的相應以後,UDT Client端將會返回並處理它。這個消息的dispatch過程與第一個HandShake的響應消息的處理過程一致,這裏再也不贅述。這裏來看這第二個HandShake的響應消息的處理,一樣是在CUDT::connect(const CPacket& response)中:
} else { // set cookie if (1 == m_ConnRes.m_iReqType) { m_ConnReq.m_iReqType = -1; m_ConnReq.m_iCookie = m_ConnRes.m_iCookie; m_llLastReqTime = 0; return 1; } } POST_CONNECT: // Remove from rendezvous queue m_pRcvQueue->removeConnector(m_SocketID); // Re-configure according to the negotiated values. m_iMSS = m_ConnRes.m_iMSS; m_iFlowWindowSize = m_ConnRes.m_iFlightFlagSize; m_iPktSize = m_iMSS - 28; m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize; m_iPeerISN = m_ConnRes.m_iISN; m_iRcvLastAck = m_ConnRes.m_iISN; m_iRcvLastAckAck = m_ConnRes.m_iISN; m_iRcvCurrSeqNo = m_ConnRes.m_iISN - 1; m_PeerID = m_ConnRes.m_iID; memcpy(m_piSelfIP, m_ConnRes.m_piPeerIP, 16); // Prepare all data structures try { m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize); m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize); // after introducing lite ACK, the sndlosslist may not be cleared in time, so it requires twice space. m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2); m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize); m_pACKWindow = new CACKWindow(1024); m_pRcvTimeWindow = new CPktTimeWindow(16, 64); m_pSndTimeWindow = new CPktTimeWindow(); } catch (...) { throw CUDTException(3, 2, 0); } CInfoBlock ib; ib.m_iIPversion = m_iIPversion; CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP); if (m_pCache->lookup(&ib) >= 0) { m_iRTT = ib.m_iRTT; m_iBandwidth = ib.m_iBandwidth; } m_pCC = m_pCCFactory->create(); m_pCC->m_UDT = m_SocketID; m_pCC->setMSS(m_iMSS); m_pCC->setMaxCWndSize(m_iFlowWindowSize); m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo); m_pCC->setRcvRate(m_iDeliveryRate); m_pCC->setRTT(m_iRTT); m_pCC->setBandwidth(m_iBandwidth); m_pCC->init(); m_ullInterval = (uint64_t) (m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); m_dCongestionWindow = m_pCC->m_dCWndSize; // And, I am connected too. m_bConnecting = false; m_bConnected = true; // register this socket for receiving data packets m_pRNode->m_bOnList = true; m_pRcvQueue->setNewEntry(this); // acknowledge the management module. s_UDTUnited.connect_complete(m_SocketID); // acknowledde any waiting epolls to write s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true); return 0; }
1. 這裏作的第一件事就是調用m_pRcvQueue->removeConnector(m_SocketID)將本身從RevQueue的RendezvousQueue中移除,以表示本身將再也不接收Rendezvous消息(src/queue.cpp):
void CRcvQueue::removeConnector(const UDTSOCKET& id) { m_pRendezvousQueue->remove(id); CGuard bufferlock(m_PassLock); map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id); if (i != m_mBuffer.end()) { while (!i->second.empty()) { delete[] i->second.front()->m_pcData; delete i->second.front(); i->second.pop(); } m_mBuffer.erase(i); } }
這個函數執行完以後,RcvQueue暫時將沒法向UDT Socket dispatch包。
2. 根據協商的值從新作配置。這裏咱們能夠再來看一下UDT的協商指的是什麼。縱覽鏈接創建的整個過程,咱們並無看到針對這些須要協商的值UDT自己有什麼特殊的算法來計算,於是所謂的協商則主要是UDT Client端和UDT Server端,針對這些選項,不一樣應用程序層不一樣設置的同步協調。
3. 準備全部的數據緩衝區。
4. 設置CUDT的狀態,m_bConnecting爲false,m_bConnected爲true。
5. 執行m_pRcvQueue->setNewEntry(this),註冊socket來接收數據包。這裏來看一下CRcvQueue::setNewEntry(CUDT* u):
void CRcvQueue::setNewEntry(CUDT* u) { CGuard listguard(m_IDLock); m_vNewEntry.push_back(u); }
這個操做自己很是簡單。但把CUDT結構放進CRcvQueue以後,又會發生什麼呢?回憶咱們前面看到的CRcvQueue::worker(void* param)函數中循環開始部分的這段代碼:
// check waiting list, if new socket, insert it to the list while (self->ifNewEntry()) { CUDT* ne = self->getNewEntry(); if (NULL != ne) { self->m_pRcvUList->insert(ne); self->m_pHash->insert(ne->m_SocketID, ne); } }
對照這段代碼中用到的幾個函數的實現:
bool CRcvQueue::ifNewEntry() { return !(m_vNewEntry.empty()); } CUDT* CRcvQueue::getNewEntry() { CGuard listguard(m_IDLock); if (m_vNewEntry.empty()) return NULL; CUDT* u = (CUDT*) *(m_vNewEntry.begin()); m_vNewEntry.erase(m_vNewEntry.begin()); return u; }能夠了解到,在 執行m_pRcvQueue->setNewEntry(this),註冊socket以後,CRcvQueue的worker線程會將這個CUDT結構從它的m_vNewEntry中移到另外的兩個容器m_pRcvUList和m_pHash中。那而後呢?在CRcvQueue::worker(void* param)中不是還有下面這段嗎:
if (NULL != (u = self->m_pHash->lookup(id))) { if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion)) { cout << "Receive packet by m_pHash table" << endl; if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) { if (0 == unit->m_Packet.getFlag()) u->processData(unit); else u->processCtrl(unit->m_Packet); u->checkTimers(); self->m_pRcvUList->update(u); } } } else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))) {
就是這樣,能夠說,在CUDT::connect(const CPacket& response)中是完成了一次UDT Socket消息接收方式的轉變。
6. 執行s_UDTUnited.connect_complete(m_SocketID)結束整個的connect()過程:
void CUDTUnited::connect_complete(const UDTSOCKET u) { CUDTSocket* s = locate(u); if (NULL == s) throw CUDTException(5, 4, 0); // copy address information of local node // the local port must be correctly assigned BEFORE CUDT::connect(), // otherwise if connect() fails, the multiplexer cannot be located by garbage collection and will cause leak s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr); CIPAddress::pton(s->m_pSelfAddr, s->m_pUDT->m_piSelfIP, s->m_iIPversion); s->m_Status = CONNECTED; }
UDT Socket至此進入CONNECTED狀態。
Done。