UDT主要經過在數據收發的過程當中進行精細的控制來實現對於網絡帶寬更加有效的利用,並使網絡中數據傳輸的速率儘量快。
算法
如咱們前面在分析數據發送的控制中看到的,對於正常的順序packet發送,發送控制主要在於兩個方面,一是發送窗口的大小,也就是某個時刻已經發送但未獲得相應的packet的最大個數,這一點主要由擁塞窗口大小m_dCongestionWindow和滑動窗口大小m_iFlowWindowSize來描述,發送窗口大小爲二者中較小的那一個;二是控制兩個數據包發送的時間間隔,也就是包的發送速率,這一點則主要用數據包發送時間間隔m_ullInterval來描述。全部的發送控制機制主要經過影響這幾個變量來控制發送過程。 網絡
發送窗口大小對於數據packet發送過程的影響比較直接,在CUDT::packData(CPacket& packet, uint64_t& ts)中,會檢查最後被ACK的數據packet的SeqNo m_iSndLastAck到最近發送的數據packet的SeqNo m_iSndCurrSeqNo的offset,若offset大於窗口大小,就再也不發送數據。 app
發送送率的控制則略微複雜一點。在CUDT::packData(CPacket& packet, uint64_t& ts)中會計算下一個數據packet發送的理想的時間點,記錄在m_ullTargetTime中用於track及調整後續數據packet的發送時間,並會將該時間值返回給調用者CSndUList::pop(),CSndUList::pop()則會在將CUDT從新插入發送列表時更新CUDT的CSNode m_pSNode的時間戳字段,並根據新的時間戳來說CUDT放在CSndUList的CUDT堆的適當位置上。
dom
這裏就來更細緻地看一下UDT中發送窗口大小及發送速率的調整。 ide
對於m_iFlowWindowSize,搜遍UDT的整個code,能夠看到,主要在這樣的幾個地方會去更新它: 大數據
1. 數據接收端和數據發送端在創建鏈接的過程當中會經過HandShake消息協商確認該值。
ui
CUDT::connect(const CPacket& response): this
m_iFlowWindowSize = m_ConnRes.m_iFlightFlagSize;
和CUDT::connect(const sockaddr* peer, CHandShake* hs):
spa
// exchange info for maximum flow window size m_iFlowWindowSize = hs->m_iFlightFlagSize;
CUDT::connect(const sockaddr* serv_addr)中鏈接握手請求m_ConnReq的m_iFlightFlagSize: .net
m_ConnReq.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize) ? m_iRcvBufSize : m_iFlightFlagSize;
在CUDT::CUDT()中會初始化m_iFlightFlagSize和m_iRcvBufSize:
m_iFlightFlagSize = 25600; m_iSndBufSize = 8192; m_iRcvBufSize = 8192; //Rcv buffer MUST NOT be bigger than Flight Flag size
m_iFlightFlagSize和m_iRcvBufSize這兩個選項還能夠經過UDT::setsockopt()進行設置(src/core.cpp):
void CUDT::setOpt(UDTOpt optName, const void* optval, int) { if (m_bBroken || m_bClosing) throw CUDTException(2, 1, 0); CGuard cg(m_ConnectionLock); CGuard sendguard(m_SendLock); CGuard recvguard(m_RecvLock); switch (optName) { case UDT_FC: if (m_bConnecting || m_bConnected) throw CUDTException(5, 2, 0); if (*(int*) optval < 1) throw CUDTException(5, 3); // Mimimum recv flight flag size is 32 packets if (*(int*) optval > 32) m_iFlightFlagSize = *(int*) optval; else m_iFlightFlagSize = 32; break; case UDT_RCVBUF: if (m_bOpened) throw CUDTException(5, 1, 0); if (*(int*) optval <= 0) throw CUDTException(5, 3, 0); // Mimimum recv buffer size is 32 packets if (*(int*) optval > (m_iMSS - 28) * 32) m_iRcvBufSize = *(int*) optval / (m_iMSS - 28); else m_iRcvBufSize = 32; // recv buffer MUST not be greater than FC size if (m_iRcvBufSize > m_iFlightFlagSize) m_iRcvBufSize = m_iFlightFlagSize; break;
2. 數據接收端發送的「light」 ACK消息減少m_iFlowWindowSize。
如CUDT::processCtrl(CPacket& ctrlpkt)中這樣的一段code:
// process a lite ACK if (4 == ctrlpkt.getLength()) { ack = *(int32_t *) ctrlpkt.m_pcData; if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0) { m_iFlowWindowSize -= CSeqNo::seqoff(m_iSndLastAck, ack); m_iSndLastAck = ack; } break; }
如咱們前面在 UDT數據收發的可靠性保障 的ACK部分看到的那樣,「light」 ACK消息一般是因爲在某一小段時間內忽然到達了大量的數據packet,纔會發送的,這一般代表發送端發送數據過快過多了,於是「light」 ACK消息會減少數據發送端的滑動窗口大小m_iFlowWindowSize。
3. 數據接收端發送的常規ACK消息直接設置發送端的滑動窗口大小m_iFlowWindowSize。
如CUDT::processCtrl(CPacket& ctrlpkt)中這樣的一段code:
if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0) { // Update Flow Window Size, must update before and together with m_iSndLastAck m_iFlowWindowSize = *((int32_t *) ctrlpkt.m_pcData + 3); m_iSndLastAck = ack; }
而數據接收端則主要根據它本身的接收緩衝區的可用大小來設置,如CUDT::sendCtrl()中的這段code:
data[3] = m_pRcvBuffer->getAvailBufSize(); // a minimum flow window of 2 is used, even if buffer is full, to break potential deadlock if (data[3] < 2) data[3] = 2; if (currt
更新m_iFlowWindowSize的地方基本上就是這3個。
m_iFlowWindowSize除了會在CUDT::packData()中被用來決定發送窗口的大小以外,它的初始值還決定着發送丟失列表的大小,如CUDT::connect()中:
// after introducing lite ACK, the sndlosslist may not be cleared in time, so it requires twice space. m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2); m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);
及擁塞控制器最大的擁塞窗口的大小,如CUDT::connect()中:
m_pCC->setMaxCWndSize(m_iFlowWindowSize);
UDT中與滑動窗口大小相關的內容基本上就是這些。
對於m_ullInterval,在UDT中,則有以下的的幾個地方會去更新它:
1. 數據接收端和數據發送端在創建鏈接過程當中的CUDT::connect(const CPacket& response)和CUDT::connect(const sockaddr* peer, CHandShake* hs)中會根據擁塞控制器的m_dPktSndPeriod的初始值計算該值:
m_ullInterval = (uint64_t) (m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
UDT默認的擁塞控制器是將m_dPktSndPeriod初始化爲1.0的(src/ccc.cpp):
CCC::CCC() : m_iSYNInterval(CUDT::m_iSYNInterval), m_dPktSndPeriod(1.0), ...... void CUDTCC::init() { ...... m_dCWndSize = 16; m_dPktSndPeriod = 1; }
2. 將擁塞控制器中的計算成果同步進CUDT中時會更新m_ullInterval。能夠看下CUDT::CCUpdate():
void CUDT::CCUpdate() { m_ullInterval = (uint64_t) (m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); m_dCongestionWindow = m_pCC->m_dCWndSize; if (m_llMaxBW <= 0) return; const double minSP = 1000000.0 / (double(m_llMaxBW) / m_iMSS) * m_ullCPUFrequency; if (m_ullInterval < minSP) m_ullInterval = minSP; }
這裏一樣是根據擁塞控制器的m_dPktSndPeriod值計算該值,計算方法也與前一種相同。但m_ullInterval的實際取值會受限與最大數據傳輸率m_llMaxBW的值。最大數據傳輸率m_llMaxBW默認爲無效值-1,CUDT::CUDT()中:
m_llMaxBW = -1;
但能夠經過UDT::setsockopt()進行設置(src/core.cpp):
case UDT_MAXBW: m_llMaxBW = *(int64_t*) optval; break;
3. 數據接收端反饋的DelayWarning控制消息增長m_ullInterval。在CUDT::processCtrl()中能夠看到對於DelayWarning消息的處理:
case 4: //100 - Delay Warning // One way packet delay is increasing, so decrease the sending rate m_ullInterval = (uint64_t) ceil(m_ullInterval * 1.125); m_iLastDecSeq = m_iSndCurrSeqNo; break;
CUDT::sendCtrl()中發送DelayWarning的過程:
case 4: //100 - Congestion Warning ctrlpkt.pack(pkttype); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); CTimer::rdtsc(m_ullLastWarningTime); break;
這個過程看上去蠻直接的。但在4.0版本的UDT中,DelayWarning/CongestiongWarning消息實際上再也不被用到了。在CUDT::processCtrl()中處理ACK2的case中能夠看到被註釋掉的這行code:
//if increasing delay detected... // sendCtrl(4);只能緬懷曾經在發揮做用的 DelayWarning /CongestiongWarning消息 了。
如此看來,則數據包發送時間間隔m_ullInterval彷佛老是由擁塞控制器的m_dPktSndPeriod在決定了。
UDT使用擁塞控制器來追蹤網絡數據傳輸過程當中發生的事件,如接收到了ACK,發生了超時等,並根據這些事件產生的時機及其它的一些基本設置進行計算,用計算結果來控制ACK消息發送的頻率,發送窗口的大小,及數據包的發送頻率等。
UDT的擁塞控制器機制中,不直接建立擁塞控制器,而是經過擁塞控制器工廠來建立。能夠看一下擁塞控制器工廠接口CCCVirtualFactory及默認的擁塞控制器工廠實現CCCFactory的定義:
class CCCVirtualFactory { public: virtual ~CCCVirtualFactory() { } virtual CCC* create() = 0; virtual CCCVirtualFactory* clone() = 0; }; template<class T> class CCCFactory : public CCCVirtualFactory { public: virtual ~CCCFactory() { } virtual CCC* create() { return new T; } virtual CCCVirtualFactory* clone() { return new CCCFactory<T> ; } };CCCFactory主要會被用於建立默認擁塞控制器CUDTCC(src/core.cpp):
CUDT::CUDT() { ...... m_pCCFactory = new CCCFactory<CUDTCC>;
要實現自定義的擁塞控制策略,也是須要實現本身的擁塞控制器虛擬工廠接口CCCVirtualFactory,令自定義的擁塞控制器虛擬工廠建立自定義的擁塞控制器,並將自定義的擁塞控制器虛擬工廠設置給UDT。在CUDT::setOpt()中可見:
case UDT_CC: if (m_bConnecting || m_bConnected) throw CUDTException(5, 1, 0); if (NULL != m_pCCFactory) delete m_pCCFactory; m_pCCFactory = ((CCCVirtualFactory *) optval)->clone(); break;
順便提一下,在CUDT::getOpt()中獲取UDT_CC選項的值則是直接獲取的擁塞控制器而不是擁塞控制器虛擬工廠:
void CUDT::getOpt(UDTOpt optName, void* optval, int& optlen) { CGuard cg(m_ConnectionLock); 。。。。。。 case UDT_CC: if (!m_bOpened) throw CUDTException(5, 5, 0); *(CCC**) optval = m_pCC; optlen = sizeof(CCC*); break;
UDT中擁塞控制器虛擬工廠的內容基本上就是這些了。
這裏再來看一下擁塞控制CCC的定義(src/ccc.h):
class UDT_API CCC { friend class CUDT; public: CCC(); virtual ~CCC(); private: CCC(const CCC&); CCC& operator=(const CCC&) { return *this; } public: // Functionality: // Callback function to be called (only) at the start of a UDT connection. // note that this is different from CCC(), which is always called. // Parameters: // None. // Returned value: // None. virtual void init() { } // Functionality: // Callback function to be called when a UDT connection is closed. // Parameters: // None. // Returned value: // None. virtual void close() { } // Functionality: // Callback function to be called when an ACK packet is received. // Parameters: // 0) [in] ackno: the data sequence number acknowledged by this ACK. // Returned value: // None. virtual void onACK(int32_t) { } // Functionality: // Callback function to be called when a loss report is received. // Parameters: // 0) [in] losslist: list of sequence number of packets, in the format describled in packet.cpp. // 1) [in] size: length of the loss list. // Returned value: // None. virtual void onLoss(const int32_t*, int) { } // Functionality: // Callback function to be called when a timeout event occurs. // Parameters: // None. // Returned value: // None. virtual void onTimeout() { } // Functionality: // Callback function to be called when a data is sent. // Parameters: // 0) [in] seqno: the data sequence number. // 1) [in] size: the payload size. // Returned value: // None. virtual void onPktSent(const CPacket*) { } // Functionality: // Callback function to be called when a data is received. // Parameters: // 0) [in] seqno: the data sequence number. // 1) [in] size: the payload size. // Returned value: // None. virtual void onPktReceived(const CPacket*) { } // Functionality: // Callback function to Process a user defined packet. // Parameters: // 0) [in] pkt: the user defined packet. // Returned value: // None. virtual void processCustomMsg(const CPacket*) { } protected: // Functionality: // Set periodical acknowldging and the ACK period. // Parameters: // 0) [in] msINT: the period to send an ACK. // Returned value: // None. void setACKTimer(int msINT); // Functionality: // Set packet-based acknowldging and the number of packets to send an ACK. // Parameters: // 0) [in] pktINT: the number of packets to send an ACK. // Returned value: // None. void setACKInterval(int pktINT); // Functionality: // Set RTO value. // Parameters: // 0) [in] msRTO: RTO in macroseconds. // Returned value: // None. void setRTO(int usRTO); // Functionality: // Send a user defined control packet. // Parameters: // 0) [in] pkt: user defined packet. // Returned value: // None. void sendCustomMsg(CPacket& pkt) const; // Functionality: // retrieve performance information. // Parameters: // None. // Returned value: // Pointer to a performance info structure. const CPerfMon* getPerfInfo(); // Functionality: // Set user defined parameters. // Parameters: // 0) [in] param: the paramters in one buffer. // 1) [in] size: the size of the buffer. // Returned value: // None. void setUserParam(const char* param, int size); private: void setMSS(int mss); void setMaxCWndSize(int cwnd); void setBandwidth(int bw); void setSndCurrSeqNo(int32_t seqno); void setRcvRate(int rcvrate); void setRTT(int rtt); protected: const int32_t& m_iSYNInterval; // UDT constant parameter, SYN double m_dPktSndPeriod; // Packet sending period, in microseconds double m_dCWndSize; // Congestion window size, in packets int m_iBandwidth; // estimated bandwidth, packets per second double m_dMaxCWndSize; // maximum cwnd size, in packets int m_iMSS; // Maximum Packet Size, including all packet headers int32_t m_iSndCurrSeqNo; // current maximum seq no sent out int m_iRcvRate; // packet arrive rate at receiver side, packets per second int m_iRTT; // current estimated RTT, microsecond char* m_pcParam; // user defined parameter int m_iPSize; // size of m_pcParam private: UDTSOCKET m_UDT; // The UDT entity that this congestion control algorithm is bound to int m_iACKPeriod; // Periodical timer to send an ACK, in milliseconds int m_iACKInterval; // How many packets to send one ACK, in packets bool m_bUserDefinedRTO; // if the RTO value is defined by users int m_iRTO; // RTO value, microseconds CPerfMon m_PerfInfo; // protocol statistics information };
而後是CCC的實現(src/ccc.cpp):
CCC::CCC() : m_iSYNInterval(CUDT::m_iSYNInterval), m_dPktSndPeriod(1.0), m_dCWndSize(16.0), m_iBandwidth(), m_dMaxCWndSize(), m_iMSS(), m_iSndCurrSeqNo(), m_iRcvRate(), m_iRTT(), m_pcParam(NULL), m_iPSize(0), m_UDT(), m_iACKPeriod(0), m_iACKInterval(0), m_bUserDefinedRTO(false), m_iRTO(-1), m_PerfInfo() { } CCC::~CCC() { delete[] m_pcParam; } void CCC::setACKTimer(int msINT) { m_iACKPeriod = msINT > m_iSYNInterval ? m_iSYNInterval : msINT; } void CCC::setACKInterval(int pktINT) { m_iACKInterval = pktINT; } void CCC::setRTO(int usRTO) { m_bUserDefinedRTO = true; m_iRTO = usRTO; } void CCC::sendCustomMsg(CPacket& pkt) const { CUDT* u = CUDT::getUDTHandle(m_UDT); if (NULL != u) { pkt.m_iID = u->m_PeerID; u->m_pSndQueue->sendto(u->m_pPeerAddr, pkt); } } const CPerfMon* CCC::getPerfInfo() { try { CUDT* u = CUDT::getUDTHandle(m_UDT); if (NULL != u) u->sample(&m_PerfInfo, false); } catch (...) { return NULL; } return &m_PerfInfo; } void CCC::setMSS(int mss) { m_iMSS = mss; } void CCC::setBandwidth(int bw) { m_iBandwidth = bw; } void CCC::setSndCurrSeqNo(int32_t seqno) { m_iSndCurrSeqNo = seqno; } void CCC::setRcvRate(int rcvrate) { m_iRcvRate = rcvrate; } void CCC::setMaxCWndSize(int cwnd) { m_dMaxCWndSize = cwnd; } void CCC::setRTT(int rtt) { m_iRTT = rtt; } void CCC::setUserParam(const char* param, int size) { delete[] m_pcParam; m_pcParam = new char[size]; memcpy(m_pcParam, param, size); m_iPSize = size; }
而後是UDT中默認的CCC CUDTCC定義:
class CUDTCC : public CCC { public: CUDTCC(); public: virtual void init(); virtual void onACK(int32_t); virtual void onLoss(const int32_t*, int); virtual void onTimeout(); private: int m_iRCInterval; // UDT Rate control interval uint64_t m_LastRCTime; // last rate increase time bool m_bSlowStart; // if in slow start phase int32_t m_iLastAck; // last ACKed seq no bool m_bLoss; // if loss happened since last rate increase int32_t m_iLastDecSeq; // max pkt seq no sent out when last decrease happened double m_dLastDecPeriod; // value of pktsndperiod when last decrease happened int m_iNAKCount; // NAK counter int m_iDecRandom; // random threshold on decrease by number of loss events int m_iAvgNAKNum; // average number of NAKs per congestion int m_iDecCount; // number of decreases in a congestion epoch };
而後是UDT中默認的CCC CUDTCC的實現:
CUDTCC::CUDTCC() : m_iRCInterval(), m_LastRCTime(), m_bSlowStart(), m_iLastAck(), m_bLoss(), m_iLastDecSeq(), m_dLastDecPeriod(), m_iNAKCount(), m_iDecRandom(), m_iAvgNAKNum(), m_iDecCount() { } void CUDTCC::init() { m_iRCInterval = m_iSYNInterval; m_LastRCTime = CTimer::getTime(); setACKTimer(m_iRCInterval); m_bSlowStart = true; m_iLastAck = m_iSndCurrSeqNo; m_bLoss = false; m_iLastDecSeq = CSeqNo::decseq(m_iLastAck); m_dLastDecPeriod = 1; m_iAvgNAKNum = 0; m_iNAKCount = 0; m_iDecRandom = 1; m_dCWndSize = 16; m_dPktSndPeriod = 1; } void CUDTCC::onACK(int32_t ack) { int64_t B = 0; double inc = 0; // Note: 1/24/2012 // The minimum increase parameter is increased from "1.0 / m_iMSS" to 0.01 // because the original was too small and caused sending rate to stay at low level // for long time. const double min_inc = 0.01; uint64_t currtime = CTimer::getTime(); if (currtime - m_LastRCTime < (uint64_t) m_iRCInterval) return; m_LastRCTime = currtime; if (m_bSlowStart) { m_dCWndSize += CSeqNo::seqlen(m_iLastAck, ack); m_iLastAck = ack; if (m_dCWndSize > m_dMaxCWndSize) { m_bSlowStart = false; if (m_iRcvRate > 0) m_dPktSndPeriod = 1000000.0 / m_iRcvRate; else m_dPktSndPeriod = (m_iRTT + m_iRCInterval) / m_dCWndSize; } } else m_dCWndSize = m_iRcvRate / 1000000.0 * (m_iRTT + m_iRCInterval) + 16; // During Slow Start, no rate increase if (m_bSlowStart) return; if (m_bLoss) { m_bLoss = false; return; } B = (int64_t) (m_iBandwidth - 1000000.0 / m_dPktSndPeriod); if ((m_dPktSndPeriod > m_dLastDecPeriod) && ((m_iBandwidth / 9) < B)) B = m_iBandwidth / 9; if (B <= 0) inc = min_inc; else { // inc = max(10 ^ ceil(log10( B * MSS * 8 ) * Beta / MSS, 1/MSS) // Beta = 1.5 * 10^(-6) inc = pow(10.0, ceil(log10(B * m_iMSS * 8.0))) * 0.0000015 / m_iMSS; if (inc < min_inc) inc = min_inc; } m_dPktSndPeriod = (m_dPktSndPeriod * m_iRCInterval) / (m_dPktSndPeriod * inc + m_iRCInterval); } void CUDTCC::onLoss(const int32_t* losslist, int) { //Slow Start stopped, if it hasn't yet if (m_bSlowStart) { m_bSlowStart = false; if (m_iRcvRate > 0) { // Set the sending rate to the receiving rate. m_dPktSndPeriod = 1000000.0 / m_iRcvRate; return; } // If no receiving rate is observed, we have to compute the sending // rate according to the current window size, and decrease it // using the method below. m_dPktSndPeriod = m_dCWndSize / (m_iRTT + m_iRCInterval); } m_bLoss = true; if (CSeqNo::seqcmp(losslist[0] & 0x7FFFFFFF, m_iLastDecSeq) > 0) { m_dLastDecPeriod = m_dPktSndPeriod; m_dPktSndPeriod = ceil(m_dPktSndPeriod * 1.125); m_iAvgNAKNum = (int) ceil(m_iAvgNAKNum * 0.875 + m_iNAKCount * 0.125); m_iNAKCount = 1; m_iDecCount = 1; m_iLastDecSeq = m_iSndCurrSeqNo; // remove global synchronization using randomization srand(m_iLastDecSeq); m_iDecRandom = (int) ceil(m_iAvgNAKNum * (double(rand()) / RAND_MAX)); if (m_iDecRandom < 1) m_iDecRandom = 1; } else if ((m_iDecCount++ < 5) && (0 == (++m_iNAKCount % m_iDecRandom))) { // 0.875^5 = 0.51, rate should not be decreased by more than half within a congestion period m_dPktSndPeriod = ceil(m_dPktSndPeriod * 1.125); m_iLastDecSeq = m_iSndCurrSeqNo; } } void CUDTCC::onTimeout() { if (m_bSlowStart) { m_bSlowStart = false; if (m_iRcvRate > 0) m_dPktSndPeriod = 1000000.0 / m_iRcvRate; else m_dPktSndPeriod = m_dCWndSize / (m_iRTT + m_iRCInterval); } else { /* m_dLastDecPeriod = m_dPktSndPeriod; m_dPktSndPeriod = ceil(m_dPktSndPeriod * 2); m_iLastDecSeq = m_iLastAck; */ } }
具體的算法這裏就再也不仔細釐清了。
擁塞控制器就像是一個加工長同樣,接收一些事件,通過本身的處理,輸出一些數據來控制數據的收發過程。擁塞控制器輸出的數據主要有:
1. 包發送週期m_dPktSndPeriod用於控制數據包的發送週期。
2. 擁塞控制窗口大小m_dCWndSize用於控制發送窗口的大小。
3. ACK發送週期m_iACKPeriod和ACK發送間隔m_iACKInterval用於控制ACK包發送的頻率。
4. m_bUserDefinedRTO和m_iRTO用於控制超時時間。
如咱們前面看到的m_dPktSndPeriod和m_dCWndSize會在CUDT::CCUpdate()中同步給CUDT。
Done。