在前文中,咱們有看到,數據發送的過程,大致是發送者CUDT將要發送的數據放進它的CSndBuffer m_pSndBuffer,並將它本身添加進它的CSndQueue m_pSndQueue的CSndUList m_pSndUList的堆裏,後面CSndQueue m_pSndQueue的worker線程會經過CSndUList::pop()從CSndUList m_pSndUList的堆頂CUDT中獲取一個要發送的包來發送,包的獲取主要是經過CUDT::packData()來完成,而這個函數正是UDT中包發送的執行中心。 node
這裏就來看一下CUDT::packData()的定義(src/core.cpp): 數組
int CUDT::packData(CPacket& packet, uint64_t& ts) { int payload = 0; bool probe = false; uint64_t entertime; CTimer::rdtsc(entertime); if ((0 != m_ullTargetTime) && (entertime > m_ullTargetTime)) m_ullTimeDiff += entertime - m_ullTargetTime; // Loss retransmission always has higher priority. if ((packet.m_iSeqNo = m_pSndLossList->getLostSeq()) >= 0) { // protect m_iSndLastDataAck from updating by ACK processing CGuard ackguard(m_AckLock); int offset = CSeqNo::seqoff(m_iSndLastDataAck, packet.m_iSeqNo); if (offset < 0) return 0; int msglen; payload = m_pSndBuffer->readData(&(packet.m_pcData), offset, packet.m_iMsgNo, msglen); if (-1 == payload) { int32_t seqpair[2]; seqpair[0] = packet.m_iSeqNo; seqpair[1] = CSeqNo::incseq(seqpair[0], msglen); sendCtrl(7, &packet.m_iMsgNo, seqpair, 8); // only one msg drop request is necessary m_pSndLossList->remove(seqpair[1]); // skip all dropped packets if (CSeqNo::seqcmp(m_iSndCurrSeqNo, CSeqNo::incseq(seqpair[1])) < 0) m_iSndCurrSeqNo = CSeqNo::incseq(seqpair[1]); return 0; } else if (0 == payload) return 0; ++m_iTraceRetrans; ++m_iRetransTotal; } else { // If no loss, pack a new packet. // check congestion/flow window limit int cwnd = (m_iFlowWindowSize < (int) m_dCongestionWindow) ? m_iFlowWindowSize : (int) m_dCongestionWindow; if (cwnd >= CSeqNo::seqlen(m_iSndLastAck, CSeqNo::incseq(m_iSndCurrSeqNo))) { if (0 != (payload = m_pSndBuffer->readData(&(packet.m_pcData), packet.m_iMsgNo))) { m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo); m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo); packet.m_iSeqNo = m_iSndCurrSeqNo; // every 16 (0xF) packets, a packet pair is sent if (0 == (packet.m_iSeqNo & 0xF)) probe = true; } else { m_ullTargetTime = 0; m_ullTimeDiff = 0; ts = 0; return 0; } } else { m_ullTargetTime = 0; m_ullTimeDiff = 0; ts = 0; return 0; } } packet.m_iTimeStamp = int(CTimer::getTime() - m_StartTime); packet.m_iID = m_PeerID; packet.setLength(payload); m_pCC->onPktSent(&packet); //m_pSndTimeWindow->onPktSent(packet.m_iTimeStamp); ++m_llTraceSent; ++m_llSentTotal; if (probe) { // sends out probing packet pair ts = entertime; probe = false; } else { #ifndef NO_BUSY_WAITING ts = entertime + m_ullInterval; #else if (m_ullTimeDiff >= m_ullInterval) { ts = entertime; m_ullTimeDiff -= m_ullInterval; } else { ts = entertime + m_ullInterval - m_ullTimeDiff; m_ullTimeDiff = 0; } #endif } m_ullTargetTime = ts; return payload; }
在這個函數中,處理了兩大類packet的讀取,一是丟失的packet,二是正常的順序傳輸的包。來看一下這個函數具體的執行過程: 網絡
1. 讀取當前的時間entertime。 數據結構
2. 更新m_ullTimeDiff。在UDT中,包發送會有一個隨着網絡情況調整的一個發送週期,也就是m_ullInterval值。在每一次發送包時,都會根據m_ullInterval值計算下一次包發送的理想時間,並記錄在m_ullTargetTime中。而m_ullTimeDiff則被用來記錄當前的此次包發送想對於理想的發送時間的延滯值,這個值會被用於計算下一次包發送的理想時間。UDT正是經過這樣的修正來儘量使的包發送週期可以保持在m_ullInterval值附近。 函數
3. 從丟失包列表m_pSndLossList中獲取一個丟失的包的SeqNo,並賦值給packet.m_iSeqNo。這個丟失包列表中的包多是來源於Timer,好比一個包超過了正常時間尚未獲得響應,也有可能來源於發送端發回的NACK消息,後面會在來研究這個問題。 ui
4. 前一步中獲取的SeqNo大於等於0,這代表存在丟失了須要重傳的包,則讀取丟失的包的內容: spa
(1). 計算丟失的包的SeqNo與SndLastDataAck的差值offset。 .net
(2). 檢查前一步計算出來的offset值,若小於0,代表發送窗口已經滑過了,則直接返回,不然繼續執行。 線程
(3). 根據前面計算的offset值,經過m_pSndBuffer->readData()把數據讀入packet中。packet的m_iMsgNo會被更新爲packet的MsgNo,msglen也會在packet過時時被更新。 設計
(4). m_pSndBuffer->readData()返回0,代表讀取的packet的數據長度爲0。這樣的packet沒有實際發送的必要,直接返回,無需進行後續的步驟。
(5). m_pSndBuffer->readData()返回-1,代表要讀取的packet已通過期,一樣沒有發送這個數據包自己的必要。
但此時會發送一個DropMsgRequest給數據接收端。
而後將過時的packet從SndLossList中移除出去。
m_iSndCurrSeqNo的值爲最近一次發送的packet的SeqNo,這裏還會在必要的時候更新m_iSndCurrSeqNo,以跳過全部被丟棄的packets。必要指的是,m_iSndCurrSeqNo的值小於等於要丟棄的這個Msg的最後一個packet的SeqNo。這也就意味着,要丟棄的這個Msg直到過時被丟棄都沒有發完。
這個地方有一點比較奇怪,簡化來看,seqpair[0] == packet.m_iSeqNo,seqpair[1] == seqpair[0] + msglen,也就是說seqpair[1]的值爲要被丟棄Msg的最後一個packet的SeqNo加1,但在判斷是否要更新m_iSndCurrSeqNo時,倒是拿m_iSndCurrSeqNo和(seqpair[1] + 1),也就是要被丟棄的Msg的最後一個packet的SeqNo加2在比較,而更新也是被設置爲這個值。但實際上,將m_iSndCurrSeqNo設置爲被丟棄Msg的最後一個packet的SeqNo已經能夠跳過整個Msg的發送了,由於下次要用m_iSndCurrSeqNo來得到SeqNo,會先將這個值加1的。這個地方的邏輯疑似存在bug。
返回0,向調用者代表暫時沒有數據要發送。
(6). m_pSndBuffer->readData()返回大於0的值,代表有一個丟失的包須要從新發送,則更新m_iTraceRetrans和m_iRetransTotal,這兩個值分別表示一次trace重發的總次數,和此UDT Socket總的重發次數,二者的區別在於前者在被讀取以後會被重置爲0(CUDT::sample()),然後者則不會。此時須要繼續執行後面的第6步。
5. 在第3步中讀取的SeqNo小於0,代表沒有丟失的packet。此時則:
(1). 根據m_iFlowWindowSize和m_dCongestionWindow的值計算cwnd發送窗口的大小。發送窗口大小取這兩個值中較小的那個,默認狀況下,前者爲8192(來自於Handshke消息的m_iFlightFlagSize字段,而m_iFlightFlagSize則根據m_iRcvBufSize和m_iFlightFlagSize得出),後者爲16(來自於CC的m_dCWndSize字段,在CUDTCC::init()中該值被初始化爲16。)。
(2). 檢查發送窗口是否已滿。若已滿,則將m_ullTargetTime和m_ullTimeDiff重置爲0,將ts置爲0,而後返回0,向調用者代表沒有數據要發送。不然繼續執行。
m_iSndLastAck的值爲下一次Ack應該確認的packet的SeqNo,CSeqNo::seqlen()計算的是包含兩個端點在內的區間的長度。此處對CSeqNo::seqlen()的調用被用來計算,下個packet發送以後,發送窗口中全部的packet的個數。
(3). 讀取下一個須要發送的packet,並檢查返回的payload值。若payload值爲0,代表數據緩衝區中全部的數據都已經發送了,無需再進行實際的發送,則將m_ullTargetTime和m_ullTimeDiff重置爲0,將ts置爲0,返回0,向調用者代表沒有數據要發送。不然繼續執行。
(4). 主要是更新m_iSndCurrSeqNo,並設置packet的SeqNo字段。若是SeqNo爲16的整數倍,還會設置probe爲true。
6. 設置packet的m_iTimeStamp,m_iID,及數據長度。
7. 更新m_llTraceSent和m_llSentTotal,其中前者表示CUDT此次Trace的過程當中發送的總的packet數量,這個值會在CUDT::sample()獲取trace數據以後被重置爲0,然後者則表示發送的總共的packet數量,不會在CUDT::sample()獲取trace數據以後被重置。
8. 根據probe的值,更新ts值等。配 合CUDT::packData()的調用者CSndUList::pop()一塊兒看,可知ts是理想中該CUDT下次發送數據的時間點。
probe設置爲true,就是代表,當前的這個packet被髮送結束以後當即發送下一個packet。即便probe的值不爲true,也有可能要當即發送下一個packet,好比延滯時間已經超過了理想的發生週期。存在延滯時間,但該延滯時間又沒有超出理想的發送週期的,則下個packet的發送時間具體本次packet的發送時間會小於理想的packet發送週期。
總之這裏是但願可以保持packet以接近理想的速率發送。
9. 更新m_ullTargetTime爲ts。ts這個下次發包的理想時間點還須要m_ullTargetTime進行記錄。
10. 返回payload值,也就是讀取的packet的大小。
這裏順便來看下CSeqNo的設計與實現。這個類被用來幫助進行與SeqNo有關的一些計算(src/common.h):
class CSeqNo { public: inline static int seqcmp(int32_t seq1, int32_t seq2) { return (abs(seq1 - seq2) < m_iSeqNoTH) ? (seq1 - seq2) : (seq2 - seq1); } inline static int seqlen(int32_t seq1, int32_t seq2) { return (seq1 <= seq2) ? (seq2 - seq1 + 1) : (seq2 - seq1 + m_iMaxSeqNo + 2); } inline static int seqoff(int32_t seq1, int32_t seq2) { if (abs(seq1 - seq2) < m_iSeqNoTH) return seq2 - seq1; if (seq1 < seq2) return seq2 - seq1 - m_iMaxSeqNo - 1; return seq2 - seq1 + m_iMaxSeqNo + 1; } inline static int32_t incseq(int32_t seq) { return (seq == m_iMaxSeqNo) ? 0 : seq + 1; } inline static int32_t decseq(int32_t seq) { return (seq == 0) ? m_iMaxSeqNo : seq - 1; } inline static int32_t incseq(int32_t seq, int32_t inc) { return (m_iMaxSeqNo - seq >= inc) ? seq + inc : seq - m_iMaxSeqNo + inc - 1; } public: static const int32_t m_iSeqNoTH; // threshold for comparing seq. no. static const int32_t m_iMaxSeqNo; // maximum sequence number used in UDT };
鏈接發起端在執行CUDT::connect(const sockaddr* serv_addr)時,會計算一個隨機的值做爲m_iISN,也便是發送的首個數據packet的SeqNo,而在鏈接創建過程當中,這個值會被同步給Peer端的Socket。
這裏能夠看到,UDT Packet的SeqNo是[0, 0x7FFFFFFF]區間中的一個值。每發送一個packet,m_iSndCurrSeqNo都會被遞增。經過class CSeqNo能夠看到這個遞增的規則,即SeqNo超出0x7FFFFFFF時會被歸0。也正是因爲0是一個合法的SeqNo,在incseq(int32_t seq, int32_t inc)中,SeqNo超出最大值時的計算裏能看到有額外的減1,在seqlen()裏,SeqNo超出最大值時的計算裏能看到加2。
由seqcmp()和seqoff()這兩個函數可見,同一時刻同時有效的兩個SeqNo seq1和seq2之間的距離不能超過m_iSeqNoTH 0x3FFFFFFF,若超過則代表必定有一個SeqNo越過了最大值0x7FFFFFFF,也即較小的那個值越過了最大值。
這裏還能夠再來看一下CSndBuffer::readData():
int CSndBuffer::readData(char** data, int32_t& msgno) { // No data to read if (m_pCurrBlock == m_pLastBlock) return 0; *data = m_pCurrBlock->m_pcData; int readlen = m_pCurrBlock->m_iLength; msgno = m_pCurrBlock->m_iMsgNo; m_pCurrBlock = m_pCurrBlock->m_pNext; return readlen; } int CSndBuffer::readData(char** data, const int offset, int32_t& msgno, int& msglen) { CGuard bufferguard(m_BufLock); Block* p = m_pFirstBlock; for (int i = 0; i < offset; ++i) p = p->m_pNext; if ((p->m_iTTL >= 0) && ((CTimer::getTime() - p->m_OriginTime) / 1000 > (uint64_t) p->m_iTTL)) { msgno = p->m_iMsgNo & 0x1FFFFFFF; msglen = 1; p = p->m_pNext; bool move = false; while (msgno == (p->m_iMsgNo & 0x1FFFFFFF)) { if (p == m_pCurrBlock) move = true; p = p->m_pNext; if (move) m_pCurrBlock = p; msglen++; } return -1; } *data = p->m_pcData; int readlen = p->m_iLength; msgno = p->m_iMsgNo; return readlen; }
CSndBuffer::readData(char** data, int32_t& msgno)讀取當前的Block。基本上就是讀取Block,而後將指向當前Block的指針m_pCurrBlock向後移一個Block。
而CSndBuffer::readData(char** data, const int offset, int32_t& msgno, int& msglen)則是讀取距未響應的Block中最舊一塊Block offset個單位的Block。在這個函數中,首先是移動到要讀取的目標Block,若是要讀取的Block已過時,則使m_pCurrBlock跳過該packet所屬的Msg的全部Packet,而後返回-1退出。目標Block沒有過時,則讀取Block後返回數據長度。
總結在CUDT::packData()中對發送過程的控制。
丟失的包具備最高的發送優先級,這也是發送可靠性的保障方法。全部丟失的packet都會被放進SndLossList,這個List中的包可能來源於超時未獲得響應,也可能來源於消息接收端發回的NACK。
對於正常的順序packet發送的控制主要在於兩個方面,一是發送窗口的大小,也就是某個時刻已經發送但未獲得相應的packet的最大個數,這一點主要由m_dCongestionWindow和m_iFlowWindowSize來表示;二是控制兩個包發送的時間間隔,也就是包的發送速率,這一點則主要用m_ullInterval來表示。全部的發送控制機制主要經過影響這幾個變量來控制發送過程。
先來看一下CSndLossList這個數據結構。這個Class的定義以下(src/list.h):
class CSndLossList { public: CSndLossList(int size = 1024); ~CSndLossList(); // Functionality: // Insert a seq. no. into the sender loss list. // Parameters: // 0) [in] seqno1: sequence number starts. // 1) [in] seqno2: sequence number ends. // Returned value: // number of packets that are not in the list previously. int insert(int32_t seqno1, int32_t seqno2); // Functionality: // Remove ALL the seq. no. that are not greater than the parameter. // Parameters: // 0) [in] seqno: sequence number. // Returned value: // None. void remove(int32_t seqno); // Functionality: // Read the loss length. // Parameters: // None. // Returned value: // The length of the list. int getLossLength(); // Functionality: // Read the first (smallest) loss seq. no. in the list and remove it. // Parameters: // None. // Returned value: // The seq. no. or -1 if the list is empty. int32_t getLostSeq(); private: int32_t* m_piData1; // sequence number starts int32_t* m_piData2; // seqnence number ends int* m_piNext; // next node in the list int m_iHead; // first node int m_iLength; // loss length int m_iSize; // size of the static array int m_iLastInsertPos; // position of last insert node pthread_mutex_t m_ListLock; // used to synchronize list operation private: CSndLossList(const CSndLossList&); CSndLossList& operator=(const CSndLossList&); };
這是一個不可複製容器。提供的接口不是不少,配合註釋,都沒有太多難以理解的地方。這是一個用數組實現的鏈表。接着來看這個class的構造和析構(src/list.cpp):
CSndLossList::CSndLossList(int size) : m_piData1(NULL), m_piData2(NULL), m_piNext(NULL), m_iHead(-1), m_iLength(0), m_iSize(size), m_iLastInsertPos(-1), m_ListLock() { m_piData1 = new int32_t[m_iSize]; m_piData2 = new int32_t[m_iSize]; m_piNext = new int[m_iSize]; // -1 means there is no data in the node for (int i = 0; i < size; ++i) { m_piData1[i] = -1; m_piData2[i] = -1; } // sender list needs mutex protection #ifndef WIN32 pthread_mutex_init(&m_ListLock, 0); #else m_ListLock = CreateMutex(NULL, false, NULL); #endif } CSndLossList::~CSndLossList() { delete[] m_piData1; delete[] m_piData2; delete[] m_piNext; #ifndef WIN32 pthread_mutex_destroy(&m_ListLock); #else CloseHandle(m_ListLock); #endif }
CUDT::connect()中建立CSndLossList時,size值爲m_iFlowWindowSize * 2,也即8192 × 2 == 16384。若是不用數組,而用常規一點的方法來實現的話,鏈表的節點定義多是這樣的:
struct Node { int32_t m_iStart; int32_t m_iEnd; Node *m_pNext; };
而後來看這個class最關鍵的函數之一CSndLossList::insert()的定義:
int CSndLossList::insert(int32_t seqno1, int32_t seqno2) { CGuard listguard(m_ListLock); if (0 == m_iLength) { // insert data into an empty list m_iHead = 0; m_piData1[m_iHead] = seqno1; if (seqno2 != seqno1) m_piData2[m_iHead] = seqno2; m_piNext[m_iHead] = -1; m_iLastInsertPos = m_iHead; m_iLength += CSeqNo::seqlen(seqno1, seqno2); return m_iLength; } // otherwise find the position where the data can be inserted int origlen = m_iLength; int offset = CSeqNo::seqoff(m_piData1[m_iHead], seqno1); int loc = (m_iHead + offset + m_iSize) % m_iSize; if (offset < 0) { // Insert data prior to the head pointer m_piData1[loc] = seqno1; if (seqno2 != seqno1) m_piData2[loc] = seqno2; // new node becomes head m_piNext[loc] = m_iHead; m_iHead = loc; m_iLastInsertPos = loc; m_iLength += CSeqNo::seqlen(seqno1, seqno2); } else if (offset > 0) { if (seqno1 == m_piData1[loc]) { m_iLastInsertPos = loc; // first seqno is equivlent, compare the second if (-1 == m_piData2[loc]) { if (seqno2 != seqno1) { m_iLength += CSeqNo::seqlen(seqno1, seqno2) - 1; m_piData2[loc] = seqno2; } } else if (CSeqNo::seqcmp(seqno2, m_piData2[loc]) > 0) { // new seq pair is longer than old pair, e.g., insert [3, 7] to [3, 5], becomes [3, 7] m_iLength += CSeqNo::seqlen(m_piData2[loc], seqno2) - 1; m_piData2[loc] = seqno2; } else // Do nothing if it is already there return 0; } else { // searching the prior node int i; if ((-1 != m_iLastInsertPos) && (CSeqNo::seqcmp(m_piData1[m_iLastInsertPos], seqno1) < 0)) i = m_iLastInsertPos; else i = m_iHead; while ((-1 != m_piNext[i]) && (CSeqNo::seqcmp(m_piData1[m_piNext[i]], seqno1) < 0)) i = m_piNext[i]; if ((-1 == m_piData2[i]) || (CSeqNo::seqcmp(m_piData2[i], seqno1) < 0)) { m_iLastInsertPos = loc; // no overlap, create new node m_piData1[loc] = seqno1; if (seqno2 != seqno1) m_piData2[loc] = seqno2; m_piNext[loc] = m_piNext[i]; m_piNext[i] = loc; m_iLength += CSeqNo::seqlen(seqno1, seqno2); } else { m_iLastInsertPos = i; // overlap, coalesce with prior node, insert(3, 7) to [2, 5], ... becomes [2, 7] if (CSeqNo::seqcmp(m_piData2[i], seqno2) < 0) { m_iLength += CSeqNo::seqlen(m_piData2[i], seqno2) - 1; m_piData2[i] = seqno2; loc = i; } else return 0; } } } else { m_iLastInsertPos = m_iHead; // insert to head node if (seqno2 != seqno1) { if (-1 == m_piData2[loc]) { m_iLength += CSeqNo::seqlen(seqno1, seqno2) - 1; m_piData2[loc] = seqno2; } else if (CSeqNo::seqcmp(seqno2, m_piData2[loc]) > 0) { m_iLength += CSeqNo::seqlen(m_piData2[loc], seqno2) - 1; m_piData2[loc] = seqno2; } else return 0; } else return 0; } // coalesce with next node. E.g., [3, 7], ..., [6, 9] becomes [3, 9] while ((-1 != m_piNext[loc]) && (-1 != m_piData2[loc])) { int i = m_piNext[loc]; if (CSeqNo::seqcmp(m_piData1[i], CSeqNo::incseq(m_piData2[loc])) <= 0) { // coalesce if there is overlap if (-1 != m_piData2[i]) { if (CSeqNo::seqcmp(m_piData2[i], m_piData2[loc]) > 0) { if (CSeqNo::seqcmp(m_piData2[loc], m_piData1[i]) >= 0) m_iLength -= CSeqNo::seqlen(m_piData1[i], m_piData2[loc]); m_piData2[loc] = m_piData2[i]; } else m_iLength -= CSeqNo::seqlen(m_piData1[i], m_piData2[i]); } else { if (m_piData1[i] == CSeqNo::incseq(m_piData2[loc])) m_piData2[loc] = m_piData1[i]; else m_iLength--; } m_piData1[i] = -1; m_piData2[i] = -1; m_piNext[loc] = m_piNext[i]; } else break; } return m_iLength - origlen; }
1. 這個函數首先處理了最簡單的向空鏈表中插入元素的case。
這種狀況下,m_iHead被賦予0值。m_piData1[m_iHead]會被賦值爲要插入的這段丟失packet範圍的起始SeqNo。
若是起始SeqNo和結束SeqNo的值不一樣,m_piData2[m_iHead]還會被賦值爲結束SeqNo;若是相同,則m_piData2[m_iHead]將仍然保持構造函數中初始化的-1,以表示這段丟失packet範圍只有一個元素。
m_piNext[m_iHead]被賦值爲-1以表示這是鏈表中的最後一個元素。m_iLastInsertPos用來記錄上一次插入的位置,這裏會被賦值爲m_iHead。m_iLength表示CSndLossList中記錄的丟失packet的總格數,這裏會被設置爲這段packet的長度。而後返回m_iLength。
可見m_iHead指向鏈表的頭部。m_piData1,m_piData2和m_piNext這三個數組中相同位置的元素共同表示一個鏈表節點,它們分別表示一個丟失packet範圍的起始SeqNo,結束SeqNo和該節點在鏈表中next節點的位置。
2. 鏈表中已經有元素了,則將m_iLength保存在origlen中。計算要插入的這段丟失packet的起始SeqNo與鏈表中原有的頭節點的起始SeqNo字段的差值offset。而後計算要插入的這段丟失packet範圍的的可能的位置loc,這個可能的位置主要由這段丟失packet範圍的起始SeqNo與鏈表中原有的頭節點的起始SeqNo字段的差值決定。
由loc的計算方法可見,數組中的空間是被循環利用的。好比要插入的節點是向CSndLossList中插入的第二個節點,則此時m_iHead仍然爲0,而要插入的這個丟失packet範圍的起始位置小於原有的頭節點的起始SeqNo字段,則新插入的節點將被繞回到數組的尾部。
3. 處理offset小於0的狀況。這表示插入的這個丟失packet範圍的起始位置小於原有的頭節點的起始SeqNo字段值,此時則會在loc位置插入一個新的節點以描述這段丟失packet範圍。更新m_iHead和m_iLastInsertPos指向新插入的這個節點。並更新m_iLength以體現新加入的這個丟失packet範圍。
向單向鏈表的頭部插入元素老是比較簡單。由此咱們也看到,這個鏈表是以節點的起始SeqNo字段值的升序排列的有序鏈表。
但這個地方貌似沒有處理新插入的這個丟失packet範圍與原有頭節點表示的丟失packet範圍存在交叉的狀況?沒錯,是沒有處理,這種狀況會在處理完全部的插入狀況以後再統一來作。
4. 處理offset大於0的狀況。這又分爲兩種狀況:
(1). seqno1 == m_piData1[loc],代表新節點的目標插入位置中原有節點保存的丟失Packet範圍的起始SeqNo與要插入的這個丟失Packets範圍的起始SeqNo相同。則此時會首先更新m_iLastInsertPos爲loc,還須要處理這樣的幾種case,
case 1:原有的範圍中只有一個元素,要插入的這個範圍有多個元素。
case 2:原有的範圍中有多個元素,要插入的這個範圍有一個元素。
case 3:原有的範圍和要插入的範圍都只有一個元素。
case 4:原有的範圍和要插入的範圍中都有多個元素,但新插入的範圍徹底包含原有的範圍。
case 5:原有的範圍和要插入的範圍中都有多個元素,但原有的範圍徹底包含新插入的範圍。
case 6:原有的範圍和要插入的範圍中都有多個元素,且徹底相同。
這些case包含的packet範圍的相對關係能夠用下圖來簡單表示:
代碼的具體寫法不一樣,這些case中的一些可能會以不一樣的方式被合併成一個處理,而有些case則不須要對原有的鏈表進行任何的調整。
if block中處理的是case1和case3,else-if block中處理的是case 4,else block中處理的是case 2,case 5和case 6。其中case 3,case 2,case 5和case 6都不須要對鏈表作出調整。
以此來看,在第一個if block的內部,應該再加一個else block來直接返回0會比較好一點。
那段代碼的一種等價實現形式:
if (seqno2 != seqno1) { if (-1 == m_piData2[loc]) { m_iLength += CSeqNo::seqlen(seqno1, seqno2) - 1; m_piData2[loc] = seqno2; } else if (CSeqNo::seqcmp(seqno2, m_piData2[loc]) > 0) { // new seq pair is longer than old pair, e.g., insert [3, 7] to [3, 5], becomes [3, 7] m_iLength += CSeqNo::seqlen(m_piData2[loc], seqno2) - 1; m_piData2[loc] = seqno2; } else { return 0; } } else { // Do nothing if it is already there return 0; }
(2). seqno1與m_piData1[loc]不相等。這其實主要有兩種可能,一是loc位置已經有了其它的節點,但該節點所表示的範圍的起始SeqNo與要插入的這個範圍的起始SeqNo不一樣;二是loc位置尚未被插入節點。但第一種可能應該是不會出現的,於是這裏實際要處理的也就是loc位置尚未節點插入的狀況。
對於這種狀況,節點的插入位置徹底不是問題,關鍵的問題是調整鏈表中一些節點的關係。能夠看到這裏的處理過程:
查找新插入節點前面的那個節點。該查找過程的開始位置由m_piData1[m_iLastInsertPos]與seqno1的相對大小決定,若是前者較小,則從m_iLastInsertPos開始,不然,從m_iHead開始。這大概主要是想要利用空間局部性原理來提升查找的效率。而後就是經過一個循環找到新插入節點前面的那個節點。
找到的這前面的節點所表示的丟失packet範圍與要插入的節點所要表示的範圍之間的關係又有這樣的幾種case:
case 1:前面的節點表示的範圍只有一個packet。
case 2:前面的節點表示的範圍含有多個packet,但它的結束SeqNo仍然小於要插入的範圍的起始SeqNo。
case 3:前面的節點表示的範圍含有多個packet,但它的結束SeqNo大於等於要插入的範圍的起始SeqNo。
前兩個case代表兩個範圍不相交,而case 3則代表兩個範圍是相交的。對於前兩個case,則在前面的那個節點以後插入一個節點,鏈表中節點的鏈接關係作適當的調整便可。對於case 3,則須要將插入的這個範圍合併入前面的那個節點。若是前面的那個節點包含的範圍徹底覆蓋了要插入的範圍,則什麼都不作,若是不是則須要對結束SeqNo字段作一些調整。
5. offset值等於0,代表要插入的這個範圍的起始SeqNo與Head節點表示的範圍的起始SeqNo相同,這個過程則與offset大於0時,seqno1 == m_piData1[loc]中的處理基本一致。
6. 從插入新節點的位置開始,合併鏈表中與新插入的這個丟失packet範圍相交,或被包含或牢牢相鄰的節點。
7. 返回新加入CSndLossList的丟失packet的總個數。
有這個函數的整個執行過程不難看出,頭節點中將包含最老的丟失packets。
看完了插入,天然不能再也不來看一下CSndLossList::remove():
void CSndLossList::remove(int32_t seqno) { CGuard listguard(m_ListLock); if (0 == m_iLength) return; // Remove all from the head pointer to a node with a larger seq. no. or the list is empty int offset = CSeqNo::seqoff(m_piData1[m_iHead], seqno); int loc = (m_iHead + offset + m_iSize) % m_iSize; if (0 == offset) { // It is the head. Remove the head and point to the next node loc = (loc + 1) % m_iSize; if (-1 == m_piData2[m_iHead]) loc = m_piNext[m_iHead]; else { m_piData1[loc] = CSeqNo::incseq(seqno); if (CSeqNo::seqcmp(m_piData2[m_iHead], CSeqNo::incseq(seqno)) > 0) m_piData2[loc] = m_piData2[m_iHead]; m_piData2[m_iHead] = -1; m_piNext[loc] = m_piNext[m_iHead]; } m_piData1[m_iHead] = -1; if (m_iLastInsertPos == m_iHead) m_iLastInsertPos = -1; m_iHead = loc; m_iLength--; } else if (offset > 0) { int h = m_iHead; if (seqno == m_piData1[loc]) { // target node is not empty, remove part/all of the seqno in the node. int temp = loc; loc = (loc + 1) % m_iSize; if (-1 == m_piData2[temp]) m_iHead = m_piNext[temp]; else { // remove part, e.g., [3, 7] becomes [], [4, 7] after remove(3) m_piData1[loc] = CSeqNo::incseq(seqno); if (CSeqNo::seqcmp(m_piData2[temp], m_piData1[loc]) > 0) m_piData2[loc] = m_piData2[temp]; m_iHead = loc; m_piNext[loc] = m_piNext[temp]; m_piNext[temp] = loc; m_piData2[temp] = -1; } } else { // target node is empty, check prior node int i = m_iHead; while ((-1 != m_piNext[i]) && (CSeqNo::seqcmp(m_piData1[m_piNext[i]], seqno) < 0)) i = m_piNext[i]; loc = (loc + 1) % m_iSize; if (-1 == m_piData2[i]) m_iHead = m_piNext[i]; else if (CSeqNo::seqcmp(m_piData2[i], seqno) > 0) { // remove part/all seqno in the prior node m_piData1[loc] = CSeqNo::incseq(seqno); if (CSeqNo::seqcmp(m_piData2[i], m_piData1[loc]) > 0) m_piData2[loc] = m_piData2[i]; m_piData2[i] = seqno; m_piNext[loc] = m_piNext[i]; m_piNext[i] = loc; m_iHead = loc; } else m_iHead = m_piNext[i]; } // Remove all nodes prior to the new head while (h != m_iHead) { if (m_piData2[h] != -1) { m_iLength -= CSeqNo::seqlen(m_piData1[h], m_piData2[h]); m_piData2[h] = -1; } else m_iLength--; m_piData1[h] = -1; if (m_iLastInsertPos == h) m_iLastInsertPos = -1; h = m_piNext[h]; } } }
先來回憶下CSndLossList的類定義中,對於這個函數的語義的說明:移除全部不大於參數值的SeqNo。這個函數的主要執行過程以下:
1. 檢查m_iLength是否爲0,若爲0,說明CSndLossList尚未加入任何SeqNo,則直接返回,不然繼續執行。
2. 計算頭節點所表示的丟失packet範圍的起始SeqNo與參數seqno的offset,及可能以seqno做爲起始SeqNo字段的節點的位置loc。
根據這個函數的語義,咱們知道,實際上是不須要處理offset小於的case的。如咱們前面所瞭解的,頭節點所表示的SeqNo範圍是CSndLossList中SeqNo值最小的一個範圍,而若是seqno小於這個範圍的起始SeqNo的話,則說明不大於seqno的全部SeqNo都已經不存在了。
3. 處理offset == 0的狀況。offset == 0,代表seqno是包含於頭節點所表示的範圍,並且仍是這個範圍的起始SeqNo。此時又主要分兩種狀況來處理:
(1). 頭節點表示的這個範圍只有一個SeqNo。
(2). 頭節點表示的範圍包含多個SeqNo。
對於狀況(1),則頭節點將向後滑動一個節點,原來頭節點的存放位置會被複位。對於狀況(2),爲了保證兩個節點的相對位置等於節點所表示的丟失packet範圍的起始SeqNo的差值這樣的一種節點間關係依然成立,須要將頭節點保存的位置後移一個位置。對於狀況(2),還會再分爲兩種狀況來處理,一是原來的頭節點中只包含2個SeqNo,則在移除seqno後只剩下了一個,此時要保持m_piData2[loc]爲-1,若包含3個及以上SeqNo的,則要複製原來的頭節點的結束SeqNo到新的位置。
其它就是適當地更新m_iLastInsertPos,m_iHead和m_iLength了。這裏彷佛補一個
m_piNext[m_iHead] = -1;
要更好一點。
4. 處理offset大於0的狀況。對於這種狀況,比較麻煩的是找到seqno具體包含在哪一個節點中。處理過程大體爲:
(1). 檢查一下,seqno與m_piData1[loc]是否相等,若相等,則要找的節點已是找到了,且seqno爲目標節點表示的丟失packet範圍的起始seqno。此時則會再分爲兩種狀況來處理,一是目標節點中只包含一個SeqNo,則使鏈表頭指向目標節點的下一個節點。
二是目標節點中包含多個SeqNo,則須要將seqno排除在目標節點範圍以外新建一個節點,將新節點保存在目標節點後面相鄰的位置,若目標節點中包含2個節點,則須要設置新節點的結束SeqNo字段爲-1,若大於等於3,則複製此字段的值,而後將原來的目標節點改造爲只包含seqno的節點,並使它指向新建的這個節點。總之就是將原來的一個節點拆分紅了兩個節點,一個節點只包含seqno,另外一個則包含原來的目標節點中其他的SeqNo。
(2). seqno與m_piData1[loc]不相等的狀況,則須要先找到起始SeqNo字段小於seqno的SeqNo值最大的那個節點。這又能夠分爲3種case來處理,
case 1:找到的節點只包含一個SeqNo,此時則使鏈表頭指向找到的節點的next節點。
case 2:找到的節點包含多個SeqNo,且seqno小於找到的節點的結束SeqNo字段,此時則須要將找到的節點裂爲鏈表中的兩個節點,一個包含的範圍爲[原節點的起始SeqNo,seqno],另外一個包含的範圍爲[seqno + 1, 原節點的結束SeqNo]。同時鏈表頭應該指向後者。
case 3:找到的節點包含多個SeqNo,且seqno大於等於找到的節點的結束SeqNo字段,此時則一樣使鏈表頭指向找到的節點的next節點。
5. 移除新找到的頭節點以前全部的節點。適當地更新m_iLength等。
這個地方移除的操做看起來好羅嗦。必需要計算出offset值,而不是簡單的用比較符號,是由SeqNo的遞增規則決定的。但對於offset大於等於0的狀況,則可使用統一的過程來處理:找到起始SeqNo字段不大於seqno的 SeqNo值最大的那個節點,而後根據seqno與這個節點描述的範圍的4種關係,即seqno等於找到的節點的起始SeqNo值,seqno大於起始SeqNo值但小於結束SeqNo值,seqno等於結束SeqNo值,seqno大於結束SeqNo值,及找到的節點所描述的範圍的大小,來分類處理便可。
還有咱們前面在CUDT::packData()中看到的CSndLossList::getLostSeq():
int32_t CSndLossList::getLostSeq() { if (0 == m_iLength) return -1; CGuard listguard(m_ListLock); if (0 == m_iLength) return -1; if (m_iLastInsertPos == m_iHead) m_iLastInsertPos = -1; // return the first loss seq. no. int32_t seqno = m_piData1[m_iHead]; // head moves to the next node if (-1 == m_piData2[m_iHead]) { //[3, -1] becomes [], and head moves to next node in the list m_piData1[m_iHead] = -1; m_iHead = m_piNext[m_iHead]; } else { // shift to next node, e.g., [3, 7] becomes [], [4, 7] int loc = (m_iHead + 1) % m_iSize; m_piData1[loc] = CSeqNo::incseq(seqno); if (CSeqNo::seqcmp(m_piData2[m_iHead], m_piData1[loc]) > 0) m_piData2[loc] = m_piData2[m_iHead]; m_piData1[m_iHead] = -1; m_piData2[m_iHead] = -1; m_piNext[loc] = m_piNext[m_iHead]; m_iHead = loc; } m_iLength--; return seqno; }
這個函數讀取首個丟失packet的SeqNo。
這個函數會取鏈表頭節點中最小的SeqNo,返回給調用者,而後將這個SeqNo從鏈表頭節點中移除。移除的時候又能夠分爲2種狀況:1. 頭節點描述的packet範圍只包含一個SeqNo,2. 包含多個SeqNo。
對於狀況1,則將鏈表頭節點向後移動一個節點,而後移除原來的頭節點。對於狀況2,則將原來的頭節點分裂爲兩個節點,一個只包含首個丟失packet的SeqNo,另外一個包含原來頭節點中其他的SeqNo,令鏈表頭節點指向後一個節點,並移除前一個節點。
可見,經過CSndLossList::getLostSeq()返回給調用者的Packet是會被從CSndLossList中移除出去的。
最後能夠再來看一下,通過了這些操做的蹂躪以後,這個鏈表可能的樣子。好比有4段丟失的packet,其SeqNo範圍及被插入的順序爲[8, 9],[3, 5],[12,12],[15, 19],假設緩衝區大小爲20,則看起來可能爲:
UDT中,丟失packet列表大致如此。
Done。