鏈接創建起來以後,咱們就能夠經過UDT Socket進行數據的收發了。先來看用來發送數據的幾個函數。UDT提供了以下的幾個函數用於不一樣目的下的數據發送:node
UDT_API int send(UDTSOCKET u, const char* buf, int len, int flags); UDT_API int sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false); UDT_API int64_t sendfile(UDTSOCKET u, std::fstream& ifs, int64_t& offset, int64_t size, int block = 364000); UDT_API int64_t sendfile2(UDTSOCKET u, const char* path, int64_t* offset, int64_t size, int block = 364000);
int CUDT::sendmsg(UDTSOCKET u, const char* buf, int len, int ttl, bool inorder) { try { CUDT* udt = s_UDTUnited.lookup(u); return udt->sendmsg(buf, len, ttl, inorder); } catch (CUDTException &e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (bad_alloc&) { s_UDTUnited.setError(new CUDTException(3, 2, 0)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } int sendmsg(UDTSOCKET u, const char* buf, int len, int ttl, bool inorder) { return CUDT::sendmsg(u, buf, len, ttl, inorder); }
這個API的實現結構與以前看到的listen()、bind()這些略微有點區別,在CUDT的API層,會調用CUDT 對應的實現函數,調用過程:UDT::sendmsg() -> CUDT::sendmsg(UDTSOCKET u, const char* buf, int len, int ttl, bool inorder) -> CUDT::sendmsg(const char* data, int len, int msttl, bool inorder)。直接來看CUDT::sendmsg(const char* data, int len, int msttl, bool inorder)(src/core.cpp):網絡
void CUDT::waitBlockingSending(int space) { if (!m_bSynSending) throw CUDTException(6, 1, 0); else { // wait here during a blocking sending #ifndef WIN32 pthread_mutex_lock(&m_SendBlockLock); if (m_iSndTimeOut < 0) { while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < space) && m_bPeerHealth) pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock); } else { uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL; timespec locktime; locktime.tv_sec = exptime / 1000000; locktime.tv_nsec = (exptime % 1000000) * 1000; while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < space) && m_bPeerHealth && (CTimer::getTime() < exptime)) pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &locktime); } pthread_mutex_unlock(&m_SendBlockLock); #else if (m_iSndTimeOut < 0) { while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < space) && m_bPeerHealth) WaitForSingleObject(m_SendBlockCond, INFINITE); } else { uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL; while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < space) && m_bPeerHealth && (CTimer::getTime() < exptime)) WaitForSingleObject(m_SendBlockCond, DWORD((exptime - CTimer::getTime()) / 1000)); } #endif // check the connection status if (m_bBroken || m_bClosing) throw CUDTException(2, 1, 0); else if (!m_bConnected) throw CUDTException(2, 2, 0); else if (!m_bPeerHealth) { m_bPeerHealth = true; throw CUDTException(7); } } } int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder) { if (UDT_STREAM == m_iSockType) throw CUDTException(5, 9, 0); // throw an exception if not connected if (m_bBroken || m_bClosing) throw CUDTException(2, 1, 0); else if (!m_bConnected) throw CUDTException(2, 2, 0); if (len <= 0) return 0; if (len > m_iSndBufSize * m_iPayloadSize) throw CUDTException(5, 12, 0); CGuard sendguard(m_SendLock); if (m_pSndBuffer->getCurrBufSize() == 0) { // delay the EXP timer to avoid mis-fired timeout uint64_t currtime; CTimer::rdtsc(currtime); m_ullLastRspTime = currtime; } if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) { waitBlockingSending(len); } if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) { if (m_iSndTimeOut >= 0) throw CUDTException(6, 3, 0); return 0; } // record total time used for sending if (0 == m_pSndBuffer->getCurrBufSize()) m_llSndDurationCounter = CTimer::getTime(); // insert the user buffer into the sening list m_pSndBuffer->addBuffer(data, len, msttl, inorder); // insert this socket to the snd list if it is not on the list yet m_pSndQueue->m_pSndUList->update(this, false); if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) { // write is not available any more s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false); } return len; }
1. 檢查UDT Socket的類型SockType,若爲UDT_STREAM則直接拋異常退出,不然繼續執行。socket
2. 檢查CUDT的狀態,若UDT Socket不處於Connected狀態,就拋異常退出,不然繼續執行。函數
3. 檢查傳入的參數,主要是數據的長度,既不能太大也不能過小。數據長度過小是指,小於等於0;太大是指,超出了CUDT發送緩衝區的最大大小。若參數無效,就返回,不然繼續執行。在默認不經過UDT::setsockopt()修改m_iSndBufSize和m_iMSS這些選項的狀況下,m_iSndBufSize爲8192,m_iPayloadSize爲1456,也就是大概12MB。性能
4. 當CUDT發送緩衝區的已用大小爲0時,會將m_ullLastRspTime更新爲當前時間。學習
5. 檢查CUDT發送緩衝區中可用大小,若可用大小不足消息的長度時,執行waitBlockingSending()等待有足夠大小可用。在waitBlockingSending()中能夠看到,它主要處理了這樣3中狀況:ui
(1). UDT Socket不是處於同步發送模式。拋異常,結束髮送的整個流程。
(2). 發送的超時時間m_iSndTimeOut爲一個小於0的無效值,則永久等待,直到UDT Socket被關掉。
(3). 發送的超時時間m_iSndTimeOut爲一個大於等於0的有效值,則等待m_iSndTimeOut個ms或UDT Socket被關閉。在CUDT的構造函數中,iSndTimeOut默認是被設置爲-1的,但能夠經過UDT::setsockopt()進行設置。
6. 檢查CUDT發送緩衝區中可用大小,若可用大小不足消息的長度時,則說明waitBlockingSending()多是因以下的幾種狀況中的一種出現而結束:
(1). CUDT處於非同步發送模式而直接結束。
(2). CUDT處於同步模式,m_iSndTimeOut爲一個有效值,但超市時間到來時仍然沒有等到發送緩衝區中有足夠的空間。
對於第一種狀況的處理是直接返回0。對於第二種狀況的處理則是拋出異常。waitBlockingSending()等待過程終結因爲UDT Socket而形成的狀況,waitBlockingSending()本身會拋出異常的,而不會走到這一步。
7. 當發送緩衝區的已用大小爲0時,更新m_llSndDurationCounter爲當前時間。
8. 執行m_pSndBuffer->addBuffer(data, len, msttl, inorder)將要發送的數據放入發送緩衝區。
9. 將這個socket加入發送隊列SndQueue的發送列表m_pSndUList中。
10. 返回數據長度,也即發送的數據長度。
int CUDT::send(UDTSOCKET u, const char* buf, int len, int) { try { CUDT* udt = s_UDTUnited.lookup(u); return udt->send(buf, len); } catch (CUDTException &e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (bad_alloc&) { s_UDTUnited.setError(new CUDTException(3, 2, 0)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } int send(UDTSOCKET u, const char* buf, int len, int flags) { return CUDT::send(u, buf, len, flags); }
調用用過程:UDT::send() -> CUDT::send(UDTSOCKET u, const char* buf, int len, int) -> CUDT::send(const char* data, int len)。直接來看CUDT::send(const char* data, int len)(src/core.cpp):
int CUDT::send(const char* data, int len) { if (UDT_DGRAM == m_iSockType) throw CUDTException(5, 10, 0); // throw an exception if not connected if (m_bBroken || m_bClosing) throw CUDTException(2, 1, 0); else if (!m_bConnected) throw CUDTException(2, 2, 0); if (len <= 0) return 0; CGuard sendguard(m_SendLock); if (m_pSndBuffer->getCurrBufSize() == 0) { // delay the EXP timer to avoid mis-fired timeout uint64_t currtime; CTimer::rdtsc(currtime); m_ullLastRspTime = currtime; } if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) { waitBlockingSending(1); } if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) { if (m_iSndTimeOut >= 0) throw CUDTException(6, 3, 0); return 0; } int size = (m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize; if (size > len) size = len; // record total time used for sending if (0 == m_pSndBuffer->getCurrBufSize()) m_llSndDurationCounter = CTimer::getTime(); // insert the user buffer into the sening list m_pSndBuffer->addBuffer(data, size); // insert this socket to snd list if it is not on the list yet m_pSndQueue->m_pSndUList->update(this, false); if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) { // write is not available any more s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false); } return size; }
這個函數與CUDT::sendmsg(const char* data, int len, int msttl, bool inorder)的執行過程極爲類似,但仍是有以下這樣的一些區別:
1. 這個函數的執行要求UDT Socket的類型必須爲UDT_STREAM,而不是UDT_DGRAM,這一點與CUDT::sendmsg()正好相反。
2. 要發送的數據的長度,只要大於0就能夠了。而在CUDT::sendmsg()中會限制要發送的數據的大小不能超過發送緩衝區的最大大小。
3. 在CUDT::sendmsg()中對數據發送的規則執行的是,要麼所有發送,要麼一點也不發送。而在這裏則是,只要發送緩衝區尚未滿,則會將數據儘量多的加進發送緩衝區以待發送。
class CSndBuffer { public: CSndBuffer(int size = 32, int mss = 1500); ~CSndBuffer(); // Functionality: // Insert a user buffer into the sending list. // Parameters: // 0) [in] data: pointer to the user data block. // 1) [in] len: size of the block. // 2) [in] ttl: time to live in milliseconds // 3) [in] order: if the block should be delivered in order, for DGRAM only // Returned value: // None. void addBuffer(const char* data, int len, int ttl = -1, bool order = false); // Functionality: // Read a block of data from file and insert it into the sending list. // Parameters: // 0) [in] ifs: input file stream. // 1) [in] len: size of the block. // Returned value: // actual size of data added from the file. int addBufferFromFile(std::fstream& ifs, int len); // Functionality: // Find data position to pack a DATA packet from the furthest reading point. // Parameters: // 0) [out] data: the pointer to the data position. // 1) [out] msgno: message number of the packet. // Returned value: // Actual length of data read. int readData(char** data, int32_t& msgno); // Functionality: // Find data position to pack a DATA packet for a retransmission. // Parameters: // 0) [out] data: the pointer to the data position. // 1) [in] offset: offset from the last ACK point. // 2) [out] msgno: message number of the packet. // 3) [out] msglen: length of the message // Returned value: // Actual length of data read. int readData(char** data, const int offset, int32_t& msgno, int& msglen); // Functionality: // Update the ACK point and may release/unmap/return the user data according to the flag. // Parameters: // 0) [in] offset: number of packets acknowledged. // Returned value: // None. void ackData(int offset); // Functionality: // Read size of data still in the sending list. // Parameters: // None. // Returned value: // Current size of the data in the sending list. int getCurrBufSize() const; private: void increase(); private: pthread_mutex_t m_BufLock; // used to synchronize buffer operation struct Block { char* m_pcData; // pointer to the data block int m_iLength; // length of the block int32_t m_iMsgNo; // message number uint64_t m_OriginTime; // original request time int m_iTTL; // time to live (milliseconds) Block* m_pNext; // next block }*m_pBlock, *m_pFirstBlock, *m_pCurrBlock, *m_pLastBlock; // m_pBlock: The head pointer // m_pFirstBlock: The first block // m_pCurrBlock: The current block // m_pLastBlock: The last block (if first == last, buffer is empty) struct Buffer { char* m_pcData; // buffer int m_iSize; // size Buffer* m_pNext; // next buffer }*m_pBuffer; // physical buffer int32_t m_iNextMsgNo; // next message number int m_iSize; // buffer size (number of packets) int m_iMSS; // maximum seqment/packet size int m_iCount; // number of used blocks private: CSndBuffer(const CSndBuffer&); CSndBuffer& operator=(const CSndBuffer&); };
在這個結構中,彷佛在用幾個單鏈表來管理髮送緩衝區的數據存儲區,struct Buffer的鏈表,和struct Block的鏈表,但這些結構究竟如何組織,每一個鏈表的意義是什麼,仍是要看下幾個成員函數的定義。首先是構造函數:
CSndBuffer::CSndBuffer(int size, int mss) : m_BufLock(), m_pBlock(NULL), m_pFirstBlock(NULL), m_pCurrBlock(NULL), m_pLastBlock(NULL), m_pBuffer(NULL), m_iNextMsgNo(1), m_iSize(size), m_iMSS(mss), m_iCount(0) { // initial physical buffer of "size" m_pBuffer = new Buffer; m_pBuffer->m_pcData = new char[m_iSize * m_iMSS]; m_pBuffer->m_iSize = m_iSize; m_pBuffer->m_pNext = NULL; // circular linked list for out bound packets m_pBlock = new Block; Block* pb = m_pBlock; for (int i = 1; i < m_iSize; ++i) { pb->m_pNext = new Block; pb->m_iMsgNo = 0; pb = pb->m_pNext; } pb->m_pNext = m_pBlock; pb = m_pBlock; char* pc = m_pBuffer->m_pcData; for (int i = 0; i < m_iSize; ++i) { pb->m_pcData = pc; pb = pb->m_pNext; pc += m_iMSS; } m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock; #ifndef WIN32 pthread_mutex_init(&m_BufLock, NULL); #else m_BufLock = CreateMutex(NULL, false, NULL); #endif }
1. 在成員初始化列表中初始化全部的成員變量。注意,m_iNextMsgNo被初始化爲了1,表示已用blocks數量的m_iCount被初始化爲了0。
初始化物理buffer,即分配一個Buffer結構m_pBuffer,爲這個Buffer結構分配一塊大小爲m_iSize * m_iMSS的內存,初始化Buffer結構的m_iSize爲m_iSize。m_iSize和m_iMSS的值來自於傳入的兩個參數size和mss。在connect成功(不管是主動發起鏈接的UDT Socket,仍是由Listening的UDT Socket建立的都同樣),建立CSndBuffer時,size值爲32,而mss值爲m_iPayloadSize。在前面 UDT協議實現分析——鏈接的創建 一文中咱們有仔細地分析過m_iPayloadSize這個值的計算過程。
2. 建立一個Block結構的循環鏈表,m_pBlock指向鏈表頭。鏈表的長度一樣爲m_iSize,也就是32。
3. 初始化前一步建立的鏈表。使每一個Block結構的m_pcData指向m_pBuffer->m_pcData的不一樣位置,相鄰的兩個Block結構,所指位置的距離爲m_iMSS。
4. 將m_pFirstBlock、m_pCurrBlock和m_pLastBlock的值都初始化爲m_pBlock的值。
5. 初始化用於同步緩衝區操做的m_BufLock。
這裏咱們弄清了struct Buffer和struct Block,但仍是有許多問題尚未弄清楚。m_pBlock,m_pFirstBlock,m_pCurrBlock和m_pLastBlock這幾個指針的含義是什麼?struct Buffer鏈表的擴展與收縮,MsgNo的意義等。
void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order) { int size = len / m_iMSS; if ((len % m_iMSS) != 0) size++; // dynamically increase sender buffer while (size + m_iCount >= m_iSize) increase(); uint64_t time = CTimer::getTime(); int32_t inorder = order; inorder <<= 29; Block* s = m_pLastBlock; for (int i = 0; i < size; ++i) { int pktlen = len - i * m_iMSS; if (pktlen > m_iMSS) pktlen = m_iMSS; memcpy(s->m_pcData, data + i * m_iMSS, pktlen); s->m_iLength = pktlen; s->m_iMsgNo = m_iNextMsgNo | inorder; if (i == 0) s->m_iMsgNo |= 0x80000000; if (i == size - 1) s->m_iMsgNo |= 0x40000000; s->m_OriginTime = time; s->m_iTTL = ttl; s = s->m_pNext; } m_pLastBlock = s; CGuard::enterCS(m_BufLock); m_iCount += size; CGuard::leaveCS(m_BufLock); m_iNextMsgNo++; if (m_iNextMsgNo == CMsgNo::m_iMaxMsgNo) m_iNextMsgNo = 1; }
1. 向CSndBuffer中添加數據老是以整塊Block爲單位的,對於最後不滿整塊Block的數據仍然佔用整塊Block。在這個函數中作的第一件事情就是,計算添加全部的數據須要的Block的個數。
2. 若是CSndBuffer中的可用空間不足,則擴展CSndBuffer空間的大小,直到能知足添加的數據的需求爲止。
3. 計算inorder。
4. 經過一個循環將數據複製到CSndBuffer中。
5. 更新表示一用Block數的m_iCount。
6. 更新m_iNextMsgNo,達到最大值時會從新回到1。
int CSndBuffer::addBufferFromFile(fstream& ifs, int len) { int size = len / m_iMSS; if ((len % m_iMSS) != 0) size++; // dynamically increase sender buffer while (size + m_iCount >= m_iSize) increase(); Block* s = m_pLastBlock; int total = 0; for (int i = 0; i < size; ++i) { if (ifs.bad() || ifs.fail() || ifs.eof()) break; int pktlen = len - i * m_iMSS; if (pktlen > m_iMSS) pktlen = m_iMSS; ifs.read(s->m_pcData, pktlen); if ((pktlen = ifs.gcount()) <= 0) break; // currently file transfer is only available in streaming mode, message is always in order, ttl = infinite s->m_iMsgNo = m_iNextMsgNo | 0x20000000; if (i == 0) s->m_iMsgNo |= 0x80000000; if (i == size - 1) s->m_iMsgNo |= 0x40000000; s->m_iLength = pktlen; s->m_iTTL = -1; s = s->m_pNext; total += pktlen; } m_pLastBlock = s; CGuard::enterCS(m_BufLock); m_iCount += size; CGuard::leaveCS(m_BufLock); m_iNextMsgNo++; if (m_iNextMsgNo == CMsgNo::m_iMaxMsgNo) m_iNextMsgNo = 1; return total; }
在CSndBuffer::addBuffer()與CSndBuffer::addBufferFromFile()中有這麼多的重複code,實在是頗有進一步進行抽象的空間。在循環中拷貝數據時,每次計算剩餘了多少字節,而後和m_iMSS進行比較以肯定到底要複製多少數據。這種計算和比較在大多數狀況下都比較多餘,只有在循環的最後一次執行時才真正須要這樣的操做。能夠經過特殊處理最後一塊數據的複製,並將循環的退出條件值改成(size -1)來提高性能。
void CSndBuffer::increase() { int unitsize = m_pBuffer->m_iSize; // new physical buffer Buffer* nbuf = NULL; try { nbuf = new Buffer; nbuf->m_pcData = new char[unitsize * m_iMSS]; } catch (...) { delete nbuf; throw CUDTException(3, 2, 0); } nbuf->m_iSize = unitsize; nbuf->m_pNext = NULL; // insert the buffer at the end of the buffer list Buffer* p = m_pBuffer; while (NULL != p->m_pNext) p = p->m_pNext; p->m_pNext = nbuf; // new packet blocks Block* nblk = NULL; try { nblk = new Block; } catch (...) { delete nblk; throw CUDTException(3, 2, 0); } Block* pb = nblk; for (int i = 1; i < unitsize; ++i) { pb->m_pNext = new Block; pb = pb->m_pNext; } // insert the new blocks onto the existing one pb->m_pNext = m_pLastBlock->m_pNext; m_pLastBlock->m_pNext = nblk; pb = nblk; char* pc = nbuf->m_pcData; for (int i = 0; i < unitsize; ++i) { pb->m_pcData = pc; pb = pb->m_pNext; pc += m_iMSS; } m_iSize += unitsize; }
1. 獲取CSndBuffer中已有的Block的總數量unitsize。
2. 建立一個Buffer結構,併爲它分配數據緩衝區,數據緩衝區的大小爲unitsize個Block。因而可知,每一次CSndBuffer的擴展,都會使它的容量加倍。
3. 將前一步建立的Buffer插入到已有的Buffer單鏈表的尾部。
4. 爲前面建立的Buffer,建立對應的Block結構鏈表,nblk指向這個單鏈表的頭節點,而pb指向這個單鏈表的尾節點。
5. 如咱們前面提到的CSndBuffer中的Block結構是一個循環單向鏈表,這一步便是將前一步建立的Block單向鏈表插入CSndBuffer的Block循環鏈表中。
6. 設置前面建立的全部Block,使它們指向前面建立的Buffer的數據部分的適當位置。
7. 更新表示CSndBuffer中總的Block大小的m_iSize。
class CSndQueue { friend class CUDT; friend class CUDTUnited; public: CSndQueue(); ~CSndQueue(); public: // Functionality: // Initialize the sending queue. // Parameters: // 1) [in] c: UDP channel to be associated to the queue // 2) [in] t: Timer // Returned value: // None. void init(CChannel* c, CTimer* t); // Functionality: // Send out a packet to a given address. // Parameters: // 1) [in] addr: destination address // 2) [in] packet: packet to be sent out // Returned value: // Size of data sent out. int sendto(const sockaddr* addr, CPacket& packet); private: #ifndef WIN32 static void* worker(void* param); #else static DWORD WINAPI worker(LPVOID param); #endif pthread_t m_WorkerThread; private: CSndUList* m_pSndUList; // List of UDT instances for data sending CChannel* m_pChannel; // The UDP channel for data sending CTimer* m_pTimer; // Timing facility pthread_mutex_t m_WindowLock; pthread_cond_t m_WindowCond; volatile bool m_bClosing; // closing the worker pthread_cond_t m_ExitCond; private: CSndQueue(const CSndQueue&); CSndQueue& operator=(const CSndQueue&); };
struct CSNode { CUDT* m_pUDT; // Pointer to the instance of CUDT socket uint64_t m_llTimeStamp; // Time Stamp int m_iHeapLoc; // location on the heap, -1 means not on the heap }; class CSndUList { friend class CSndQueue; public: CSndUList(); ~CSndUList(); public: // Functionality: // Insert a new UDT instance into the list. // Parameters: // 1) [in] ts: time stamp: next processing time // 2) [in] u: pointer to the UDT instance // Returned value: // None. void insert(int64_t ts, const CUDT* u); // Functionality: // Update the timestamp of the UDT instance on the list. // Parameters: // 1) [in] u: pointer to the UDT instance // 2) [in] resechedule: if the timestampe shoudl be rescheduled // Returned value: // None. void update(const CUDT* u, bool reschedule = true); // Functionality: // Retrieve the next packet and peer address from the first entry, and reschedule it in the queue. // Parameters: // 0) [out] addr: destination address of the next packet // 1) [out] pkt: the next packet to be sent // Returned value: // 1 if successfully retrieved, -1 if no packet found. int pop(sockaddr*& addr, CPacket& pkt); // Functionality: // Remove UDT instance from the list. // Parameters: // 1) [in] u: pointer to the UDT instance // Returned value: // None. void remove(const CUDT* u); // Functionality: // Retrieve the next scheduled processing time. // Parameters: // None. // Returned value: // Scheduled processing time of the first UDT socket in the list. uint64_t getNextProcTime(); private: void insert_(int64_t ts, const CUDT* u); void remove_(const CUDT* u); private: CSNode** m_pHeap; // The heap array int m_iArrayLength; // physical length of the array int m_iLastEntry; // position of last entry on the heap array pthread_mutex_t m_ListLock; pthread_mutex_t* m_pWindowLock; pthread_cond_t* m_pWindowCond; CTimer* m_pTimer; private: CSndUList(const CSndUList&); CSndUList& operator=(const CSndUList&); };
CSndUList::CSndUList() : m_pHeap(NULL), m_iArrayLength(4096), m_iLastEntry(-1), m_ListLock(), m_pWindowLock(NULL), m_pWindowCond(NULL), m_pTimer(NULL) { m_pHeap = new CSNode*[m_iArrayLength]; #ifndef WIN32 pthread_mutex_init(&m_ListLock, NULL); #else m_ListLock = CreateMutex(NULL, false, NULL); #endif }
在這個函數中作的事情就是分配了一個CSNode指針的數組,並初始化了一個mutex m_ListLock,但這彷佛也沒法透漏出太多的訊息。而後來看向其中插入元素的insert():
void CSndUList::insert(int64_t ts, const CUDT* u) { CGuard listguard(m_ListLock); // increase the heap array size if necessary if (m_iLastEntry == m_iArrayLength - 1) { CSNode** temp = NULL; try { temp = new CSNode*[m_iArrayLength * 2]; } catch (...) { return; } memcpy(temp, m_pHeap, sizeof(CSNode*) * m_iArrayLength); m_iArrayLength *= 2; delete[] m_pHeap; m_pHeap = temp; } insert_(ts, u); }
1. 檢查m_iLastEntry是否等於m_iArrayLength - 1,如果,則代表數組中全部的位置都被佔用了此時則須要擴充容量,這裏的作法就是建立一個長度爲以前數組長度2倍的新數組,將以前數組中的內容拷貝到新數組裏,更新m_iArrayLength,刪除以前的數組,更新m_pHeap只想新數組。
2. 執行CSndUList::insert_()進行實際的插入動做。
void CSndUList::insert_(int64_t ts, const CUDT* u) { CSNode* n = u->m_pSNode; // do not insert repeated node if (n->m_iHeapLoc >= 0) return; m_iLastEntry++; m_pHeap[m_iLastEntry] = n; n->m_llTimeStamp = ts; int q = m_iLastEntry; int p = q; while (p != 0) { p = (q - 1) >> 1; if (m_pHeap[p]->m_llTimeStamp > m_pHeap[q]->m_llTimeStamp) { CSNode* t = m_pHeap[p]; m_pHeap[p] = m_pHeap[q]; m_pHeap[q] = t; t->m_iHeapLoc = q; q = p; } else break; } n->m_iHeapLoc = q; // an earlier event has been inserted, wake up sending worker if (n->m_iHeapLoc == 0) m_pTimer->interrupt(); // first entry, activate the sending queue if (0 == m_iLastEntry) { #ifndef WIN32 pthread_mutex_lock(m_pWindowLock); pthread_cond_signal(m_pWindowCond); pthread_mutex_unlock(m_pWindowLock); #else SetEvent(*m_pWindowCond); #endif } }
1. 檢查要插入的元素是否已經插入了,主要是根據CUDT的CSNode n的HeapLoc字段是否大於0來判斷的。若已經插入,則直接返回。
2. 將CSNode放在數組的尾部。
3. 調整CSNode n在數組中的位置,以使它處於適當的位置。要釐清這個地方的調整過程,可能要複習一下咱們曾經學習過的堆數據結構了。能夠將堆理解爲一個用數組表示的二叉樹,以下圖所示:
假設一個節點的位置,也就是該節點在數組中的index爲n,則它的父節點的位置爲((n -1)/2),而它的兩個子幾點的位置分別爲(2×N+1)和(2×n+2)。
回到CSndUList::insert_()的節點CSNode n的位置調整過程。能夠看到,這個過程主要是根據CSNode n的m_llTimeStamp值,若CSNode n的m_llTimeStamp值比它的父節點的m_llTimeStamp小的話就把CSNode n往二叉樹的上層浮,而把它的父節點向二叉樹的下層沉,依次類推,直到找到某個位置,其父節點的m_llTimeStamp值比它的m_llTimeStamp值小,或者浮到二叉樹的最頂層。
4. 更新CSNode n的m_iHeapLoc指向它在數組m_pHeap中的索引。
5. CSNode n被浮到堆的頂部時,喚醒發送隊列的worker線程。
6. 插入的若是是堆中的第一個元素的話,喚醒等待在m_pWindowCond上的線程。
void CSndUList::remove(const CUDT* u) { CGuard listguard(m_ListLock); remove_(u); } void CSndUList::remove_(const CUDT* u) { CSNode* n = u->m_pSNode; if (n->m_iHeapLoc >= 0) { // remove the node from heap m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry]; m_iLastEntry--; m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc; int q = n->m_iHeapLoc; int p = q * 2 + 1; while (p <= m_iLastEntry) { if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_llTimeStamp > m_pHeap[p + 1]->m_llTimeStamp)) p++; if (m_pHeap[q]->m_llTimeStamp > m_pHeap[p]->m_llTimeStamp) { CSNode* t = m_pHeap[p]; m_pHeap[p] = m_pHeap[q]; m_pHeap[p]->m_iHeapLoc = p; m_pHeap[q] = t; m_pHeap[q]->m_iHeapLoc = q; q = p; p = q * 2 + 1; } else break; } n->m_iHeapLoc = -1; } // the only event has been deleted, wake up immediately if (0 == m_iLastEntry) m_pTimer->interrupt(); }
1. 先檢查要移除的CSNode n是否存在與堆中,主要根據CSNode n的HeapLoc字段是否大於0來判斷的。若沒有插入,則跳到第5步,不然繼續執行。
2. 將m_pHeap中的最末尾的元素放進本來由要移除的CSNode n所佔用的位置,更新m_iLastEntry,及被改變了位置的原來的最末尾元素CSNode的m_iHeapLoc指向它當前被放置的位置。
3. 若是被調整了位置的CSNode的新位置不是最末尾的位置,則調整該節點的位置。這裏主要是在這個節點的m_llTimeStamp值比它的子節點的m_llTimeStamp值更大時,將這個節點向二叉樹層次結構的下層沉,而將它的字節點向上浮的過程。一個節點老是有兩個字節點,也就是兩棵子樹,那它又會向哪一棵那邊沉呢?能夠看到是字節點中m_llTimeStamp值更小的那一邊。
4. 更新CSNode n的m_iHeapLoc指向-1。
5. 若是m_iLastEntry爲0,即表示堆中再也不有元素了,則還會執行喚醒。
回頭看咱們前面分析的CUDT::send()和CUDT::sendmsg(),它們都經過m_pSndQueue->m_pSndUList->update(this, false)將CUDT插入CSndUList m_pSndUList,這裏再來看一下CSndUList::update():
void CSndUList::update(const CUDT* u, bool reschedule) { CGuard listguard(m_ListLock); CSNode* n = u->m_pSNode; if (n->m_iHeapLoc >= 0) { if (!reschedule) return; if (n->m_iHeapLoc == 0) { n->m_llTimeStamp = 1; m_pTimer->interrupt(); return; } remove_(u); } insert_(1, u); }
CSndQueue::CSndQueue() : m_WorkerThread(), m_pSndUList(NULL), m_pChannel(NULL), m_pTimer(NULL), m_WindowLock(), m_WindowCond(), m_bClosing(false), m_ExitCond() { #ifndef WIN32 pthread_cond_init(&m_WindowCond, NULL); pthread_mutex_init(&m_WindowLock, NULL); #else m_WindowLock = CreateMutex(NULL, false, NULL); m_WindowCond = CreateEvent(NULL, false, false, NULL); m_ExitCond = CreateEvent(NULL, false, false, NULL); #endif } CSndQueue::~CSndQueue() { m_bClosing = true; #ifndef WIN32 pthread_mutex_lock(&m_WindowLock); pthread_cond_signal(&m_WindowCond); pthread_mutex_unlock(&m_WindowLock); if (0 != m_WorkerThread) pthread_join(m_WorkerThread, NULL); pthread_cond_destroy(&m_WindowCond); pthread_mutex_destroy(&m_WindowLock); #else SetEvent(m_WindowCond); if (NULL != m_WorkerThread) WaitForSingleObject(m_ExitCond, INFINITE); CloseHandle(m_WorkerThread); CloseHandle(m_WindowLock); CloseHandle(m_WindowCond); CloseHandle(m_ExitCond); #endif delete m_pSndUList; } void CSndQueue::init(CChannel* c, CTimer* t) { m_pChannel = c; m_pTimer = t; m_pSndUList = new CSndUList; m_pSndUList->m_pWindowLock = &m_WindowLock; m_pSndUList->m_pWindowCond = &m_WindowCond; m_pSndUList->m_pTimer = m_pTimer; #ifndef WIN32 if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this)) { m_WorkerThread = 0; throw CUDTException(3, 1); } #else DWORD threadID; m_WorkerThread = CreateThread(NULL, 0, CSndQueue::worker, this, 0, &threadID); if (NULL == m_WorkerThread) throw CUDTException(3, 1); #endif } #ifndef WIN32 void* CSndQueue::worker(void* param) #else DWORD WINAPI CSndQueue::worker(LPVOID param) #endif { CSndQueue* self = (CSndQueue*) param; while (!self->m_bClosing) { uint64_t ts = self->m_pSndUList->getNextProcTime(); if (ts > 0) { // wait until next processing time of the first socket on the list uint64_t currtime; CTimer::rdtsc(currtime); if (currtime < ts) self->m_pTimer->sleepto(ts); // it is time to send the next pkt sockaddr* addr; CPacket pkt; if (self->m_pSndUList->pop(addr, pkt) < 0) continue; self->m_pChannel->sendto(addr, pkt); } else { // wait here if there is no sockets with data to be sent #ifndef WIN32 pthread_mutex_lock(&self->m_WindowLock); if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0)) pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock); pthread_mutex_unlock(&self->m_WindowLock); #else WaitForSingleObject(self->m_WindowCond, INFINITE); #endif } } #ifndef WIN32 return NULL; #else SetEvent(self->m_ExitCond); return 0; #endif }
1. 調用self->m_pSndUList->getNextProcTime(),來查看最近的一次發送任務所須要執行的時間ts。這裏來看一下CSndUList::getNextProcTime()的定義:
uint64_t CSndUList::getNextProcTime() { CGuard listguard(m_ListLock); if (-1 == m_iLastEntry) return 0; return m_pHeap[0]->m_llTimeStamp; }
若返回值小於等於0,就表示當前沒有須要發送的數據,沒有須要執行的發送任務。則進入等待狀態。在CSndQueue::init()的定義中,CSndUList m_pSndUList的m_pWindowLock和m_pWindowCond會分別指向CSndQueue的m_WindowLock和m_WindowCond,於是可見,這個地方的等待,是被CSndUList::insert_()喚醒的。
2. 獲取當前的時間currtime。比較currtime與ts,若前者較小,則代表最近須要執行的發送任務它請求的執行時間還沒到,則休眠等待直到ts的到來。如咱們前面的分析,並根據CSndQueue::init()的定義可見,向m_pSndUList中插入、移除或更新元素,均可能會喚醒這裏的等待。若前者較大,或者等待的時刻到了則執行下一步。
3. 執行self->m_pSndUList->pop(addr, pkt),從CSndUList m_pSndUList中抓一個CPacket出來。若沒抓到,則進入下一次循環,不然發送抓到的CPacket。來看CSndUList::pop():
int CSndUList::pop(sockaddr*& addr, CPacket& pkt) { CGuard listguard(m_ListLock); if (-1 == m_iLastEntry) return -1; // no pop until the next schedulled time uint64_t ts; CTimer::rdtsc(ts); if (ts < m_pHeap[0]->m_llTimeStamp) return -1; CUDT* u = m_pHeap[0]->m_pUDT; remove_(u); if (!u->m_bConnected || u->m_bBroken) return -1; // pack a packet from the socket if (u->packData(pkt, ts) <= 0) return -1; addr = u->m_pPeerAddr; // insert a new entry, ts is the next processing time if (ts > 0) insert_(ts, u); return 1; }
1. 檢查m_iLastEntry是否爲-1,若爲-1,代表沒有要執行發送的任務,於是直接返回-1。不然繼續執行。
2. 檢查當前時間是否小於堆頂元素的m_llTimeStamp,若小於則代表,最近一次發送任務的發送時間還沒到,則返回-1,不然繼續執行。
3. 獲取堆頂元素的CUDT對象u,也就是發送任務的請求者,並將堆頂元素先從對中移除。
4. 檢查u的狀態,若不處於有效的鏈接狀態,則返回-1,不然繼續執行。
5. 執行u->packData(pkt, ts)打出一個數據包來。
6. 使傳進來的addr只想CUDT u的PeerAddr,CSndQueue會將這個地址做爲包的發送目的地址。
7. 將CUDT u從新插入堆中,時間戳爲當前時間,這也就意味着,若是能夠的話,就在下一個循環中繼續發送CUDT u的數據。
8. 返回1。