鏈接創建起來以後,咱們就能夠經過UDT Socket進行數據的收發了。先來看用來發送數據的幾個函數。UDT提供了以下的幾個函數用於不一樣目的下的數據發送:node
UDT_API int send(UDTSOCKET u, const char* buf, int len, int flags); UDT_API int sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false); UDT_API int64_t sendfile(UDTSOCKET u, std::fstream& ifs, int64_t& offset, int64_t size, int block = 364000); UDT_API int64_t sendfile2(UDTSOCKET u, const char* path, int64_t* offset, int64_t size, int block = 364000);
send()用來進行流式的數據發送;sendmsg()用來進行數據報式的數據發送;sendfile()與sendfile2()用來執行文件的發送,流式發送,這二者基本同樣,僅有的差別在於,前者接收文件的流來發送,然後者則接收文件的路徑。api
這裏先來看UDT::sendmsg():數組
int CUDT::sendmsg(UDTSOCKET u, const char* buf, int len, int ttl, bool inorder) { try { CUDT* udt = s_UDTUnited.lookup(u); return udt->sendmsg(buf, len, ttl, inorder); } catch (CUDTException &e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (bad_alloc&) { s_UDTUnited.setError(new CUDTException(3, 2, 0)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } int sendmsg(UDTSOCKET u, const char* buf, int len, int ttl, bool inorder) { return CUDT::sendmsg(u, buf, len, ttl, inorder); }
這個API的實現結構與以前看到的listen()、bind()這些略微有點區別,在CUDT的API層,會調用CUDT 對應的實現函數,調用過程:UDT::sendmsg() -> CUDT::sendmsg(UDTSOCKET u, const char* buf, int len, int ttl, bool inorder) -> CUDT::sendmsg(const char* data, int len, int msttl, bool inorder)。直接來看CUDT::sendmsg(const char* data, int len, int msttl, bool inorder)(src/core.cpp):網絡
void CUDT::waitBlockingSending(int space) { if (!m_bSynSending) throw CUDTException(6, 1, 0); else { // wait here during a blocking sending #ifndef WIN32 pthread_mutex_lock(&m_SendBlockLock); if (m_iSndTimeOut < 0) { while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < space) && m_bPeerHealth) pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock); } else { uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL; timespec locktime; locktime.tv_sec = exptime / 1000000; locktime.tv_nsec = (exptime % 1000000) * 1000; while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < space) && m_bPeerHealth && (CTimer::getTime() < exptime)) pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &locktime); } pthread_mutex_unlock(&m_SendBlockLock); #else if (m_iSndTimeOut < 0) { while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < space) && m_bPeerHealth) WaitForSingleObject(m_SendBlockCond, INFINITE); } else { uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL; while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < space) && m_bPeerHealth && (CTimer::getTime() < exptime)) WaitForSingleObject(m_SendBlockCond, DWORD((exptime - CTimer::getTime()) / 1000)); } #endif // check the connection status if (m_bBroken || m_bClosing) throw CUDTException(2, 1, 0); else if (!m_bConnected) throw CUDTException(2, 2, 0); else if (!m_bPeerHealth) { m_bPeerHealth = true; throw CUDTException(7); } } } int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder) { if (UDT_STREAM == m_iSockType) throw CUDTException(5, 9, 0); // throw an exception if not connected if (m_bBroken || m_bClosing) throw CUDTException(2, 1, 0); else if (!m_bConnected) throw CUDTException(2, 2, 0); if (len <= 0) return 0; if (len > m_iSndBufSize * m_iPayloadSize) throw CUDTException(5, 12, 0); CGuard sendguard(m_SendLock); if (m_pSndBuffer->getCurrBufSize() == 0) { // delay the EXP timer to avoid mis-fired timeout uint64_t currtime; CTimer::rdtsc(currtime); m_ullLastRspTime = currtime; } if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) { waitBlockingSending(len); } if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) { if (m_iSndTimeOut >= 0) throw CUDTException(6, 3, 0); return 0; } // record total time used for sending if (0 == m_pSndBuffer->getCurrBufSize()) m_llSndDurationCounter = CTimer::getTime(); // insert the user buffer into the sening list m_pSndBuffer->addBuffer(data, len, msttl, inorder); // insert this socket to the snd list if it is not on the list yet m_pSndQueue->m_pSndUList->update(this, false); if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) { // write is not available any more s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false); } return len; }
CUDT::sendmsg()主要作了以下這樣的一些事情:數據結構
1. 檢查UDT Socket的類型SockType,若爲UDT_STREAM則直接拋異常退出,不然繼續執行。socket
2. 檢查CUDT的狀態,若UDT Socket不處於Connected狀態,就拋異常退出,不然繼續執行。函數
3. 檢查傳入的參數,主要是數據的長度,既不能太大也不能過小。數據長度過小是指,小於等於0;太大是指,超出了CUDT發送緩衝區的最大大小。若參數無效,就返回,不然繼續執行。在默認不經過UDT::setsockopt()修改m_iSndBufSize和m_iMSS這些選項的狀況下,m_iSndBufSize爲8192,m_iPayloadSize爲1456,也就是大概12MB。性能
4. 當CUDT發送緩衝區的已用大小爲0時,會將m_ullLastRspTime更新爲當前時間。學習
5. 檢查CUDT發送緩衝區中可用大小,若可用大小不足消息的長度時,執行waitBlockingSending()等待有足夠大小可用。在waitBlockingSending()中能夠看到,它主要處理了這樣3中狀況:ui
(1). UDT Socket不是處於同步發送模式。拋異常,結束髮送的整個流程。
(2). 發送的超時時間m_iSndTimeOut爲一個小於0的無效值,則永久等待,直到UDT Socket被關掉。
(3). 發送的超時時間m_iSndTimeOut爲一個大於等於0的有效值,則等待m_iSndTimeOut個ms或UDT Socket被關閉。在CUDT的構造函數中,iSndTimeOut默認是被設置爲-1的,但能夠經過UDT::setsockopt()進行設置。
對於後兩種狀況裏,若等待過程是因爲CUDT狀態變得無效而終止,則還將拋出異常以結束髮送過程。
6. 檢查CUDT發送緩衝區中可用大小,若可用大小不足消息的長度時,則說明waitBlockingSending()多是因以下的幾種狀況中的一種出現而結束:
(1). CUDT處於非同步發送模式而直接結束。
(2). CUDT處於同步模式,m_iSndTimeOut爲一個有效值,但超市時間到來時仍然沒有等到發送緩衝區中有足夠的空間。
對於第一種狀況的處理是直接返回0。對於第二種狀況的處理則是拋出異常。waitBlockingSending()等待過程終結因爲UDT Socket而形成的狀況,waitBlockingSending()本身會拋出異常的,而不會走到這一步。
7. 當發送緩衝區的已用大小爲0時,更新m_llSndDurationCounter爲當前時間。
8. 執行m_pSndBuffer->addBuffer(data, len, msttl, inorder)將要發送的數據放入發送緩衝區。
9. 將這個socket加入發送隊列SndQueue的發送列表m_pSndUList中。
10. 返回數據長度,也即發送的數據長度。
這也是一個生產與消費的故事。在這裏發起發送的線程爲生產者,真正將數據發送到網絡上的的發送隊列RcvQueue的worker線程則爲消費者。在UDT::sendmsg()的執行過程當中,咱們一樣只能看到這個故事的一半,生產的那一半。後面會再來分析故事的另外一半。
而後來看UDT::send()(src/api.cpp):
int CUDT::send(UDTSOCKET u, const char* buf, int len, int) { try { CUDT* udt = s_UDTUnited.lookup(u); return udt->send(buf, len); } catch (CUDTException &e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (bad_alloc&) { s_UDTUnited.setError(new CUDTException(3, 2, 0)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } int send(UDTSOCKET u, const char* buf, int len, int flags) { return CUDT::send(u, buf, len, flags); }
調用用過程:UDT::send() -> CUDT::send(UDTSOCKET u, const char* buf, int len, int) -> CUDT::send(const char* data, int len)。直接來看CUDT::send(const char* data, int len)(src/core.cpp):
int CUDT::send(const char* data, int len) { if (UDT_DGRAM == m_iSockType) throw CUDTException(5, 10, 0); // throw an exception if not connected if (m_bBroken || m_bClosing) throw CUDTException(2, 1, 0); else if (!m_bConnected) throw CUDTException(2, 2, 0); if (len <= 0) return 0; CGuard sendguard(m_SendLock); if (m_pSndBuffer->getCurrBufSize() == 0) { // delay the EXP timer to avoid mis-fired timeout uint64_t currtime; CTimer::rdtsc(currtime); m_ullLastRspTime = currtime; } if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) { waitBlockingSending(1); } if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) { if (m_iSndTimeOut >= 0) throw CUDTException(6, 3, 0); return 0; } int size = (m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize; if (size > len) size = len; // record total time used for sending if (0 == m_pSndBuffer->getCurrBufSize()) m_llSndDurationCounter = CTimer::getTime(); // insert the user buffer into the sening list m_pSndBuffer->addBuffer(data, size); // insert this socket to snd list if it is not on the list yet m_pSndQueue->m_pSndUList->update(this, false); if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) { // write is not available any more s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false); } return size; }
這個函數與CUDT::sendmsg(const char* data, int len, int msttl, bool inorder)的執行過程極爲類似,但仍是有以下這樣的一些區別:
1. 這個函數的執行要求UDT Socket的類型必須爲UDT_STREAM,而不是UDT_DGRAM,這一點與CUDT::sendmsg()正好相反。
2. 要發送的數據的長度,只要大於0就能夠了。而在CUDT::sendmsg()中會限制要發送的數據的大小不能超過發送緩衝區的最大大小。
3. 在CUDT::sendmsg()中對數據發送的規則執行的是,要麼所有發送,要麼一點也不發送。而在這裏則是,只要發送緩衝區尚未滿,則會將數據儘量多的加進發送緩衝區以待發送。
其它則徹底同樣。
來看一下CUDT中用來管理待發送數據的發送緩衝區CSndBuffer,先來看它的定義:
class CSndBuffer { public: CSndBuffer(int size = 32, int mss = 1500); ~CSndBuffer(); // Functionality: // Insert a user buffer into the sending list. // Parameters: // 0) [in] data: pointer to the user data block. // 1) [in] len: size of the block. // 2) [in] ttl: time to live in milliseconds // 3) [in] order: if the block should be delivered in order, for DGRAM only // Returned value: // None. void addBuffer(const char* data, int len, int ttl = -1, bool order = false); // Functionality: // Read a block of data from file and insert it into the sending list. // Parameters: // 0) [in] ifs: input file stream. // 1) [in] len: size of the block. // Returned value: // actual size of data added from the file. int addBufferFromFile(std::fstream& ifs, int len); // Functionality: // Find data position to pack a DATA packet from the furthest reading point. // Parameters: // 0) [out] data: the pointer to the data position. // 1) [out] msgno: message number of the packet. // Returned value: // Actual length of data read. int readData(char** data, int32_t& msgno); // Functionality: // Find data position to pack a DATA packet for a retransmission. // Parameters: // 0) [out] data: the pointer to the data position. // 1) [in] offset: offset from the last ACK point. // 2) [out] msgno: message number of the packet. // 3) [out] msglen: length of the message // Returned value: // Actual length of data read. int readData(char** data, const int offset, int32_t& msgno, int& msglen); // Functionality: // Update the ACK point and may release/unmap/return the user data according to the flag. // Parameters: // 0) [in] offset: number of packets acknowledged. // Returned value: // None. void ackData(int offset); // Functionality: // Read size of data still in the sending list. // Parameters: // None. // Returned value: // Current size of the data in the sending list. int getCurrBufSize() const; private: void increase(); private: pthread_mutex_t m_BufLock; // used to synchronize buffer operation struct Block { char* m_pcData; // pointer to the data block int m_iLength; // length of the block int32_t m_iMsgNo; // message number uint64_t m_OriginTime; // original request time int m_iTTL; // time to live (milliseconds) Block* m_pNext; // next block }*m_pBlock, *m_pFirstBlock, *m_pCurrBlock, *m_pLastBlock; // m_pBlock: The head pointer // m_pFirstBlock: The first block // m_pCurrBlock: The current block // m_pLastBlock: The last block (if first == last, buffer is empty) struct Buffer { char* m_pcData; // buffer int m_iSize; // size Buffer* m_pNext; // next buffer }*m_pBuffer; // physical buffer int32_t m_iNextMsgNo; // next message number int m_iSize; // buffer size (number of packets) int m_iMSS; // maximum seqment/packet size int m_iCount; // number of used blocks private: CSndBuffer(const CSndBuffer&); CSndBuffer& operator=(const CSndBuffer&); };
在這個結構中,彷佛在用幾個單鏈表來管理髮送緩衝區的數據存儲區,struct Buffer的鏈表,和struct Block的鏈表,但這些結構究竟如何組織,每一個鏈表的意義是什麼,仍是要看下幾個成員函數的定義。首先是構造函數:
CSndBuffer::CSndBuffer(int size, int mss) : m_BufLock(), m_pBlock(NULL), m_pFirstBlock(NULL), m_pCurrBlock(NULL), m_pLastBlock(NULL), m_pBuffer(NULL), m_iNextMsgNo(1), m_iSize(size), m_iMSS(mss), m_iCount(0) { // initial physical buffer of "size" m_pBuffer = new Buffer; m_pBuffer->m_pcData = new char[m_iSize * m_iMSS]; m_pBuffer->m_iSize = m_iSize; m_pBuffer->m_pNext = NULL; // circular linked list for out bound packets m_pBlock = new Block; Block* pb = m_pBlock; for (int i = 1; i < m_iSize; ++i) { pb->m_pNext = new Block; pb->m_iMsgNo = 0; pb = pb->m_pNext; } pb->m_pNext = m_pBlock; pb = m_pBlock; char* pc = m_pBuffer->m_pcData; for (int i = 0; i < m_iSize; ++i) { pb->m_pcData = pc; pb = pb->m_pNext; pc += m_iMSS; } m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock; #ifndef WIN32 pthread_mutex_init(&m_BufLock, NULL); #else m_BufLock = CreateMutex(NULL, false, NULL); #endif }
在這個構造函數中會作以下這樣一些事情:
1. 在成員初始化列表中初始化全部的成員變量。注意,m_iNextMsgNo被初始化爲了1,表示已用blocks數量的m_iCount被初始化爲了0。
初始化物理buffer,即分配一個Buffer結構m_pBuffer,爲這個Buffer結構分配一塊大小爲m_iSize * m_iMSS的內存,初始化Buffer結構的m_iSize爲m_iSize。m_iSize和m_iMSS的值來自於傳入的兩個參數size和mss。在connect成功(不管是主動發起鏈接的UDT Socket,仍是由Listening的UDT Socket建立的都同樣),建立CSndBuffer時,size值爲32,而mss值爲m_iPayloadSize。在前面 UDT協議實現分析——鏈接的創建 一文中咱們有仔細地分析過m_iPayloadSize這個值的計算過程。
2. 建立一個Block結構的循環鏈表,m_pBlock指向鏈表頭。鏈表的長度一樣爲m_iSize,也就是32。
3. 初始化前一步建立的鏈表。使每一個Block結構的m_pcData指向m_pBuffer->m_pcData的不一樣位置,相鄰的兩個Block結構,所指位置的距離爲m_iMSS。
由此不難猜想,Buffer用於實際保存要發送的數據。而Block結構則用於將Buffer的數據緩衝區分段管理。
4. 將m_pFirstBlock、m_pCurrBlock和m_pLastBlock的值都初始化爲m_pBlock的值。
5. 初始化用於同步緩衝區操做的m_BufLock。
這裏咱們弄清了struct Buffer和struct Block,但仍是有許多問題尚未弄清楚。m_pBlock,m_pFirstBlock,m_pCurrBlock和m_pLastBlock這幾個指針的含義是什麼?struct Buffer鏈表的擴展與收縮,MsgNo的意義等。
接着就來看一下CSndBuffer的其它一些成員函數。來看向CSndBuffer中添加數據的CSndBuffer::addBuffer():
void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order) { int size = len / m_iMSS; if ((len % m_iMSS) != 0) size++; // dynamically increase sender buffer while (size + m_iCount >= m_iSize) increase(); uint64_t time = CTimer::getTime(); int32_t inorder = order; inorder <<= 29; Block* s = m_pLastBlock; for (int i = 0; i < size; ++i) { int pktlen = len - i * m_iMSS; if (pktlen > m_iMSS) pktlen = m_iMSS; memcpy(s->m_pcData, data + i * m_iMSS, pktlen); s->m_iLength = pktlen; s->m_iMsgNo = m_iNextMsgNo | inorder; if (i == 0) s->m_iMsgNo |= 0x80000000; if (i == size - 1) s->m_iMsgNo |= 0x40000000; s->m_OriginTime = time; s->m_iTTL = ttl; s = s->m_pNext; } m_pLastBlock = s; CGuard::enterCS(m_BufLock); m_iCount += size; CGuard::leaveCS(m_BufLock); m_iNextMsgNo++; if (m_iNextMsgNo == CMsgNo::m_iMaxMsgNo) m_iNextMsgNo = 1; }
這個函數的執行過程大致以下:
1. 向CSndBuffer中添加數據老是以整塊Block爲單位的,對於最後不滿整塊Block的數據仍然佔用整塊Block。在這個函數中作的第一件事情就是,計算添加全部的數據須要的Block的個數。
2. 若是CSndBuffer中的可用空間不足,則擴展CSndBuffer空間的大小,直到能知足添加的數據的需求爲止。
3. 計算inorder。
4. 經過一個循環將數據複製到CSndBuffer中。
在這段code中,咱們不難想到m_pLastBlock指向的是最後一塊已用Block以後的那塊Block。
關於UDT的Msg,Msg指的是一次UDT::send()或UDT::sendmsg()所發的所有數據。一個Msg中全部的Block共用了相同的一個m_iNextMsgNo。
這裏能夠看到UDT中Msg的開始與結束的表示方法:Block的m_iMsgNo的最高兩位被用來指示Msg的開始和結束,最高位爲1表示Msg的開始,第二高位爲1則表示Msg的結束。
5. 更新表示一用Block數的m_iCount。
6. 更新m_iNextMsgNo,達到最大值時會從新回到1。
再來看一個與CSndBuffer::addBuffer()相似的函數CSndBuffer::addBufferFromFile():
int CSndBuffer::addBufferFromFile(fstream& ifs, int len) { int size = len / m_iMSS; if ((len % m_iMSS) != 0) size++; // dynamically increase sender buffer while (size + m_iCount >= m_iSize) increase(); Block* s = m_pLastBlock; int total = 0; for (int i = 0; i < size; ++i) { if (ifs.bad() || ifs.fail() || ifs.eof()) break; int pktlen = len - i * m_iMSS; if (pktlen > m_iMSS) pktlen = m_iMSS; ifs.read(s->m_pcData, pktlen); if ((pktlen = ifs.gcount()) <= 0) break; // currently file transfer is only available in streaming mode, message is always in order, ttl = infinite s->m_iMsgNo = m_iNextMsgNo | 0x20000000; if (i == 0) s->m_iMsgNo |= 0x80000000; if (i == size - 1) s->m_iMsgNo |= 0x40000000; s->m_iLength = pktlen; s->m_iTTL = -1; s = s->m_pNext; total += pktlen; } m_pLastBlock = s; CGuard::enterCS(m_BufLock); m_iCount += size; CGuard::leaveCS(m_BufLock); m_iNextMsgNo++; if (m_iNextMsgNo == CMsgNo::m_iMaxMsgNo) m_iNextMsgNo = 1; return total; }
這個函數的執行過程,與CSndBuffer::addBuffer()的執行過程極爲類似,僅有的差異在於,在這個函數,是將文件的數據逐步的read進CSndBuffer,而在addBuffer()中則是memcpy。
在CSndBuffer::addBuffer()與CSndBuffer::addBufferFromFile()中有這麼多的重複code,實在是頗有進一步進行抽象的空間。在循環中拷貝數據時,每次計算剩餘了多少字節,而後和m_iMSS進行比較以肯定到底要複製多少數據。這種計算和比較在大多數狀況下都比較多餘,只有在循環的最後一次執行時才真正須要這樣的操做。能夠經過特殊處理最後一塊數據的複製,並將循環的退出條件值改成(size -1)來提高性能。
咱們前面提到了CSndBuffer容量的擴展,那這裏就來看一下執行單次擴展的increase():
void CSndBuffer::increase() { int unitsize = m_pBuffer->m_iSize; // new physical buffer Buffer* nbuf = NULL; try { nbuf = new Buffer; nbuf->m_pcData = new char[unitsize * m_iMSS]; } catch (...) { delete nbuf; throw CUDTException(3, 2, 0); } nbuf->m_iSize = unitsize; nbuf->m_pNext = NULL; // insert the buffer at the end of the buffer list Buffer* p = m_pBuffer; while (NULL != p->m_pNext) p = p->m_pNext; p->m_pNext = nbuf; // new packet blocks Block* nblk = NULL; try { nblk = new Block; } catch (...) { delete nblk; throw CUDTException(3, 2, 0); } Block* pb = nblk; for (int i = 1; i < unitsize; ++i) { pb->m_pNext = new Block; pb = pb->m_pNext; } // insert the new blocks onto the existing one pb->m_pNext = m_pLastBlock->m_pNext; m_pLastBlock->m_pNext = nblk; pb = nblk; char* pc = nbuf->m_pcData; for (int i = 0; i < unitsize; ++i) { pb->m_pcData = pc; pb = pb->m_pNext; pc += m_iMSS; } m_iSize += unitsize; }
能夠看下執行單次容量擴充的含義及過程:
1. 獲取CSndBuffer中已有的Block的總數量unitsize。
2. 建立一個Buffer結構,併爲它分配數據緩衝區,數據緩衝區的大小爲unitsize個Block。因而可知,每一次CSndBuffer的擴展,都會使它的容量加倍。
3. 將前一步建立的Buffer插入到已有的Buffer單鏈表的尾部。
4. 爲前面建立的Buffer,建立對應的Block結構鏈表,nblk指向這個單鏈表的頭節點,而pb指向這個單鏈表的尾節點。
5. 如咱們前面提到的CSndBuffer中的Block結構是一個循環單向鏈表,這一步便是將前一步建立的Block單向鏈表插入CSndBuffer的Block循環鏈表中。
6. 設置前面建立的全部Block,使它們指向前面建立的Buffer的數據部分的適當位置。
7. 更新表示CSndBuffer中總的Block大小的m_iSize。
如咱們前面瞭解到的,發送數據也是一個生產與消費的故事,UDT::send()和UDT::sendmsg()講的是生產的故事,至此咱們基本上將生產的故事都理清了,接着就來看下消費的故事,也就是CSndQueue中的實際數據發送。
接着來說數據發送這個生產-消費故事的另外一半,也就是消費的那一半,發送隊列CSndQueue中實際的數據發送。先來瞅一下CSndQueue的定義(src/queue.h):
class CSndQueue { friend class CUDT; friend class CUDTUnited; public: CSndQueue(); ~CSndQueue(); public: // Functionality: // Initialize the sending queue. // Parameters: // 1) [in] c: UDP channel to be associated to the queue // 2) [in] t: Timer // Returned value: // None. void init(CChannel* c, CTimer* t); // Functionality: // Send out a packet to a given address. // Parameters: // 1) [in] addr: destination address // 2) [in] packet: packet to be sent out // Returned value: // Size of data sent out. int sendto(const sockaddr* addr, CPacket& packet); private: #ifndef WIN32 static void* worker(void* param); #else static DWORD WINAPI worker(LPVOID param); #endif pthread_t m_WorkerThread; private: CSndUList* m_pSndUList; // List of UDT instances for data sending CChannel* m_pChannel; // The UDP channel for data sending CTimer* m_pTimer; // Timing facility pthread_mutex_t m_WindowLock; pthread_cond_t m_WindowCond; volatile bool m_bClosing; // closing the worker pthread_cond_t m_ExitCond; private: CSndQueue(const CSndQueue&); CSndQueue& operator=(const CSndQueue&); };
這個class,不論是成員變量,仍是成員函數,看上去基本都還比較親切,其做用不會讓人徹底無感,但惟獨一個成員變量,也就是CSndUList的m_pSndUList。於是這裏就先來看一下這個類。先來看CSndUList的定義:
struct CSNode { CUDT* m_pUDT; // Pointer to the instance of CUDT socket uint64_t m_llTimeStamp; // Time Stamp int m_iHeapLoc; // location on the heap, -1 means not on the heap }; class CSndUList { friend class CSndQueue; public: CSndUList(); ~CSndUList(); public: // Functionality: // Insert a new UDT instance into the list. // Parameters: // 1) [in] ts: time stamp: next processing time // 2) [in] u: pointer to the UDT instance // Returned value: // None. void insert(int64_t ts, const CUDT* u); // Functionality: // Update the timestamp of the UDT instance on the list. // Parameters: // 1) [in] u: pointer to the UDT instance // 2) [in] resechedule: if the timestampe shoudl be rescheduled // Returned value: // None. void update(const CUDT* u, bool reschedule = true); // Functionality: // Retrieve the next packet and peer address from the first entry, and reschedule it in the queue. // Parameters: // 0) [out] addr: destination address of the next packet // 1) [out] pkt: the next packet to be sent // Returned value: // 1 if successfully retrieved, -1 if no packet found. int pop(sockaddr*& addr, CPacket& pkt); // Functionality: // Remove UDT instance from the list. // Parameters: // 1) [in] u: pointer to the UDT instance // Returned value: // None. void remove(const CUDT* u); // Functionality: // Retrieve the next scheduled processing time. // Parameters: // None. // Returned value: // Scheduled processing time of the first UDT socket in the list. uint64_t getNextProcTime(); private: void insert_(int64_t ts, const CUDT* u); void remove_(const CUDT* u); private: CSNode** m_pHeap; // The heap array int m_iArrayLength; // physical length of the array int m_iLastEntry; // position of last entry on the heap array pthread_mutex_t m_ListLock; pthread_mutex_t* m_pWindowLock; pthread_cond_t* m_pWindowCond; CTimer* m_pTimer; private: CSndUList(const CSndUList&); CSndUList& operator=(const CSndUList&); };
從這個類的定義中,咱們大概能感受到這是一個CSNode的容器,但容器的許多組織的細節則仍然是一頭霧水,那就從它的構造函數和成員函數中來釐清這些細節。來看它的構造函數(src/queue.cpp):
CSndUList::CSndUList() : m_pHeap(NULL), m_iArrayLength(4096), m_iLastEntry(-1), m_ListLock(), m_pWindowLock(NULL), m_pWindowCond(NULL), m_pTimer(NULL) { m_pHeap = new CSNode*[m_iArrayLength]; #ifndef WIN32 pthread_mutex_init(&m_ListLock, NULL); #else m_ListLock = CreateMutex(NULL, false, NULL); #endif }
在這個函數中作的事情就是分配了一個CSNode指針的數組,並初始化了一個mutex m_ListLock,但這彷佛也沒法透漏出太多的訊息。而後來看向其中插入元素的insert():
void CSndUList::insert(int64_t ts, const CUDT* u) { CGuard listguard(m_ListLock); // increase the heap array size if necessary if (m_iLastEntry == m_iArrayLength - 1) { CSNode** temp = NULL; try { temp = new CSNode*[m_iArrayLength * 2]; } catch (...) { return; } memcpy(temp, m_pHeap, sizeof(CSNode*) * m_iArrayLength); m_iArrayLength *= 2; delete[] m_pHeap; m_pHeap = temp; } insert_(ts, u); }
m_iLastEntry是數組中已經被佔用的最後一個位置。在這個函數中,作了兩件事:
1. 檢查m_iLastEntry是否等於m_iArrayLength - 1,如果,則代表數組中全部的位置都被佔用了此時則須要擴充容量,這裏的作法就是建立一個長度爲以前數組長度2倍的新數組,將以前數組中的內容拷貝到新數組裏,更新m_iArrayLength,刪除以前的數組,更新m_pHeap只想新數組。
2. 執行CSndUList::insert_()進行實際的插入動做。
再來看CSndUList::insert_():
void CSndUList::insert_(int64_t ts, const CUDT* u) { CSNode* n = u->m_pSNode; // do not insert repeated node if (n->m_iHeapLoc >= 0) return; m_iLastEntry++; m_pHeap[m_iLastEntry] = n; n->m_llTimeStamp = ts; int q = m_iLastEntry; int p = q; while (p != 0) { p = (q - 1) >> 1; if (m_pHeap[p]->m_llTimeStamp > m_pHeap[q]->m_llTimeStamp) { CSNode* t = m_pHeap[p]; m_pHeap[p] = m_pHeap[q]; m_pHeap[q] = t; t->m_iHeapLoc = q; q = p; } else break; } n->m_iHeapLoc = q; // an earlier event has been inserted, wake up sending worker if (n->m_iHeapLoc == 0) m_pTimer->interrupt(); // first entry, activate the sending queue if (0 == m_iLastEntry) { #ifndef WIN32 pthread_mutex_lock(m_pWindowLock); pthread_cond_signal(m_pWindowCond); pthread_mutex_unlock(m_pWindowLock); #else SetEvent(*m_pWindowCond); #endif } }
在這個函數中主要作了這樣一些事:
1. 檢查要插入的元素是否已經插入了,主要是根據CUDT的CSNode n的HeapLoc字段是否大於0來判斷的。若已經插入,則直接返回。
2. 將CSNode放在數組的尾部。
3. 調整CSNode n在數組中的位置,以使它處於適當的位置。要釐清這個地方的調整過程,可能要複習一下咱們曾經學習過的堆數據結構了。能夠將堆理解爲一個用數組表示的二叉樹,以下圖所示:
如上圖,每一個框框中的數字表示該節點在數組中的位置。能夠看到一個節點的位置與它的兩個子節點及父節點的位置之間的關係:
假設一個節點的位置,也就是該節點在數組中的index爲n,則它的父節點的位置爲((n -1)/2),而它的兩個子幾點的位置分別爲(2×N+1)和(2×n+2)。
回到CSndUList::insert_()的節點CSNode n的位置調整過程。能夠看到,這個過程主要是根據CSNode n的m_llTimeStamp值,若CSNode n的m_llTimeStamp值比它的父節點的m_llTimeStamp小的話就把CSNode n往二叉樹的上層浮,而把它的父節點向二叉樹的下層沉,依次類推,直到找到某個位置,其父節點的m_llTimeStamp值比它的m_llTimeStamp值小,或者浮到二叉樹的最頂層。
因而可知m_pHeap是一個根據CSNode的m_llTimeStamp值建的堆,越往上層,該值越小。而m_pHeap[0]則是整個堆中全部CSNode元素m_llTimeStamp值最小的那個。
4. 更新CSNode n的m_iHeapLoc指向它在數組m_pHeap中的索引。
5. CSNode n被浮到堆的頂部時,喚醒發送隊列的worker線程。
6. 插入的若是是堆中的第一個元素的話,喚醒等待在m_pWindowCond上的線程。
看完了插入元素,再來看移除元素也就是CSndUList::remove():
void CSndUList::remove(const CUDT* u) { CGuard listguard(m_ListLock); remove_(u); } void CSndUList::remove_(const CUDT* u) { CSNode* n = u->m_pSNode; if (n->m_iHeapLoc >= 0) { // remove the node from heap m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry]; m_iLastEntry--; m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc; int q = n->m_iHeapLoc; int p = q * 2 + 1; while (p <= m_iLastEntry) { if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_llTimeStamp > m_pHeap[p + 1]->m_llTimeStamp)) p++; if (m_pHeap[q]->m_llTimeStamp > m_pHeap[p]->m_llTimeStamp) { CSNode* t = m_pHeap[p]; m_pHeap[p] = m_pHeap[q]; m_pHeap[p]->m_iHeapLoc = p; m_pHeap[q] = t; m_pHeap[q]->m_iHeapLoc = q; q = p; p = q * 2 + 1; } else break; } n->m_iHeapLoc = -1; } // the only event has been deleted, wake up immediately if (0 == m_iLastEntry) m_pTimer->interrupt(); }
CSndUList::remove()是直接調用了CSndUList::remove_(),而在CSndUList::remove_()主要作了以下這樣一些事情:
1. 先檢查要移除的CSNode n是否存在與堆中,主要根據CSNode n的HeapLoc字段是否大於0來判斷的。若沒有插入,則跳到第5步,不然繼續執行。
2. 將m_pHeap中的最末尾的元素放進本來由要移除的CSNode n所佔用的位置,更新m_iLastEntry,及被改變了位置的原來的最末尾元素CSNode的m_iHeapLoc指向它當前被放置的位置。
3. 若是被調整了位置的CSNode的新位置不是最末尾的位置,則調整該節點的位置。這裏主要是在這個節點的m_llTimeStamp值比它的子節點的m_llTimeStamp值更大時,將這個節點向二叉樹層次結構的下層沉,而將它的字節點向上浮的過程。一個節點老是有兩個字節點,也就是兩棵子樹,那它又會向哪一棵那邊沉呢?能夠看到是字節點中m_llTimeStamp值更小的那一邊。
總以爲這裏應該再有一個節點上浮的過程。若是移除的節點是最末尾節點的直系父節點,這固然沒有問題,但若是不是,則關係仍是有許多不肯定的地方。
4. 更新CSNode n的m_iHeapLoc指向-1。
5. 若是m_iLastEntry爲0,即表示堆中再也不有元素了,則還會執行喚醒。
回頭看咱們前面分析的CUDT::send()和CUDT::sendmsg(),它們都經過m_pSndQueue->m_pSndUList->update(this, false)將CUDT插入CSndUList m_pSndUList,這裏再來看一下CSndUList::update():
void CSndUList::update(const CUDT* u, bool reschedule) { CGuard listguard(m_ListLock); CSNode* n = u->m_pSNode; if (n->m_iHeapLoc >= 0) { if (!reschedule) return; if (n->m_iHeapLoc == 0) { n->m_llTimeStamp = 1; m_pTimer->interrupt(); return; } remove_(u); } insert_(1, u); }
reschedule參數表示的是,若是以前的已經有發送任務,且尚未執行完,則將新的發送任務放在老的以後。
能夠看到在這個函數中,若是CSNode尚未被插入堆中,則儘量將CSNode放如堆的頂部以便於發送任務儘快執行。若已經插入,且reschedule爲false,則直接返回。若已經插入,且reschedule爲true,則檢查一下CSNode當前是否已經在堆的頂部了,如果就退出。若不是,則先將CSNode從堆中移除,而後再儘量將CSNode插入堆的頂部。
對發送隊列CSndQueue所用的數據結構作這麼多分析以後,咱們再來看它的worker線程,也就是CSndQueue::worker():
CSndQueue::CSndQueue() : m_WorkerThread(), m_pSndUList(NULL), m_pChannel(NULL), m_pTimer(NULL), m_WindowLock(), m_WindowCond(), m_bClosing(false), m_ExitCond() { #ifndef WIN32 pthread_cond_init(&m_WindowCond, NULL); pthread_mutex_init(&m_WindowLock, NULL); #else m_WindowLock = CreateMutex(NULL, false, NULL); m_WindowCond = CreateEvent(NULL, false, false, NULL); m_ExitCond = CreateEvent(NULL, false, false, NULL); #endif } CSndQueue::~CSndQueue() { m_bClosing = true; #ifndef WIN32 pthread_mutex_lock(&m_WindowLock); pthread_cond_signal(&m_WindowCond); pthread_mutex_unlock(&m_WindowLock); if (0 != m_WorkerThread) pthread_join(m_WorkerThread, NULL); pthread_cond_destroy(&m_WindowCond); pthread_mutex_destroy(&m_WindowLock); #else SetEvent(m_WindowCond); if (NULL != m_WorkerThread) WaitForSingleObject(m_ExitCond, INFINITE); CloseHandle(m_WorkerThread); CloseHandle(m_WindowLock); CloseHandle(m_WindowCond); CloseHandle(m_ExitCond); #endif delete m_pSndUList; } void CSndQueue::init(CChannel* c, CTimer* t) { m_pChannel = c; m_pTimer = t; m_pSndUList = new CSndUList; m_pSndUList->m_pWindowLock = &m_WindowLock; m_pSndUList->m_pWindowCond = &m_WindowCond; m_pSndUList->m_pTimer = m_pTimer; #ifndef WIN32 if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this)) { m_WorkerThread = 0; throw CUDTException(3, 1); } #else DWORD threadID; m_WorkerThread = CreateThread(NULL, 0, CSndQueue::worker, this, 0, &threadID); if (NULL == m_WorkerThread) throw CUDTException(3, 1); #endif } #ifndef WIN32 void* CSndQueue::worker(void* param) #else DWORD WINAPI CSndQueue::worker(LPVOID param) #endif { CSndQueue* self = (CSndQueue*) param; while (!self->m_bClosing) { uint64_t ts = self->m_pSndUList->getNextProcTime(); if (ts > 0) { // wait until next processing time of the first socket on the list uint64_t currtime; CTimer::rdtsc(currtime); if (currtime < ts) self->m_pTimer->sleepto(ts); // it is time to send the next pkt sockaddr* addr; CPacket pkt; if (self->m_pSndUList->pop(addr, pkt) < 0) continue; self->m_pChannel->sendto(addr, pkt); } else { // wait here if there is no sockets with data to be sent #ifndef WIN32 pthread_mutex_lock(&self->m_WindowLock); if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0)) pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock); pthread_mutex_unlock(&self->m_WindowLock); #else WaitForSingleObject(self->m_WindowCond, INFINITE); #endif } } #ifndef WIN32 return NULL; #else SetEvent(self->m_ExitCond); return 0; #endif }
CSndQueue::worker()中的while循環幾乎就是CSndQueue的worker線程執行的所有任務了,這個循環的循環體中主要作了以下這樣的一些事情:
1. 調用self->m_pSndUList->getNextProcTime(),來查看最近的一次發送任務所須要執行的時間ts。這裏來看一下CSndUList::getNextProcTime()的定義:
uint64_t CSndUList::getNextProcTime() { CGuard listguard(m_ListLock); if (-1 == m_iLastEntry) return 0; return m_pHeap[0]->m_llTimeStamp; }
若返回值小於等於0,就表示當前沒有須要發送的數據,沒有須要執行的發送任務。則進入等待狀態。在CSndQueue::init()的定義中,CSndUList m_pSndUList的m_pWindowLock和m_pWindowCond會分別指向CSndQueue的m_WindowLock和m_WindowCond,於是可見,這個地方的等待,是被CSndUList::insert_()喚醒的。
若返回值大於0,則代表存在着須要執行的發送任務。此時則執行下一步。
2. 獲取當前的時間currtime。比較currtime與ts,若前者較小,則代表最近須要執行的發送任務它請求的執行時間還沒到,則休眠等待直到ts的到來。如咱們前面的分析,並根據CSndQueue::init()的定義可見,向m_pSndUList中插入、移除或更新元素,均可能會喚醒這裏的等待。若前者較大,或者等待的時刻到了則執行下一步。
3. 執行self->m_pSndUList->pop(addr, pkt),從CSndUList m_pSndUList中抓一個CPacket出來。若沒抓到,則進入下一次循環,不然發送抓到的CPacket。來看CSndUList::pop():
int CSndUList::pop(sockaddr*& addr, CPacket& pkt) { CGuard listguard(m_ListLock); if (-1 == m_iLastEntry) return -1; // no pop until the next schedulled time uint64_t ts; CTimer::rdtsc(ts); if (ts < m_pHeap[0]->m_llTimeStamp) return -1; CUDT* u = m_pHeap[0]->m_pUDT; remove_(u); if (!u->m_bConnected || u->m_bBroken) return -1; // pack a packet from the socket if (u->packData(pkt, ts) <= 0) return -1; addr = u->m_pPeerAddr; // insert a new entry, ts is the next processing time if (ts > 0) insert_(ts, u); return 1; }
能夠看到CSndUList::pop()作了這樣一些事情:
1. 檢查m_iLastEntry是否爲-1,若爲-1,代表沒有要執行發送的任務,於是直接返回-1。不然繼續執行。
2. 檢查當前時間是否小於堆頂元素的m_llTimeStamp,若小於則代表,最近一次發送任務的發送時間還沒到,則返回-1,不然繼續執行。
3. 獲取堆頂元素的CUDT對象u,也就是發送任務的請求者,並將堆頂元素先從對中移除。
4. 檢查u的狀態,若不處於有效的鏈接狀態,則返回-1,不然繼續執行。
5. 執行u->packData(pkt, ts)打出一個數據包來。
6. 使傳進來的addr只想CUDT u的PeerAddr,CSndQueue會將這個地址做爲包的發送目的地址。
7. 將CUDT u從新插入堆中,時間戳爲當前時間,這也就意味着,若是能夠的話,就在下一個循環中繼續發送CUDT u的數據。
8. 返回1。
在發送隊列CSndQueue的worker線程中,最最須要的是儘量快的得到最近須要執行的發送任務的一些信息。CSndUList::getNextProcTime()和CSndUList::pop()中獲取的最主要的信息也是堆頂元素的一些信息。一個嚴格依照請求的執行時間進行排列的列表意義並非很大,爲了保證這種有序性反倒可能須要消耗很多的時間,採用堆卻能夠想CSndQueue的worker線程儘量快的返回最近將要執行的發送任務請求執行的時間。
Done。