UDT協議實現分析——數據發送控制

前文中,咱們有看到,數據發送的過程,大致是發送者CUDT將要發送的數據放進它的CSndBuffer m_pSndBuffer,並將它本身添加進它的CSndQueue m_pSndQueue的CSndUList m_pSndUList的堆裏,後面CSndQueue m_pSndQueue的worker線程會經過CSndUList::pop()從CSndUList m_pSndUList的堆頂CUDT中獲取一個要發送的包來發送,包的獲取主要是經過CUDT::packData()來完成,而這個函數正是UDT中包發送的執行中心。 node

CUDT::packData()

這裏就來看一下CUDT::packData()的定義(src/core.cpp): 數組

int CUDT::packData(CPacket& packet, uint64_t& ts) {
    int payload = 0;
    bool probe = false;

    uint64_t entertime;
    CTimer::rdtsc(entertime);

    if ((0 != m_ullTargetTime) && (entertime > m_ullTargetTime))
        m_ullTimeDiff += entertime - m_ullTargetTime;

    // Loss retransmission always has higher priority.
    if ((packet.m_iSeqNo = m_pSndLossList->getLostSeq()) >= 0) {
        // protect m_iSndLastDataAck from updating by ACK processing
        CGuard ackguard(m_AckLock);

        int offset = CSeqNo::seqoff(m_iSndLastDataAck, packet.m_iSeqNo);
        if (offset < 0)
            return 0;

        int msglen;

        payload = m_pSndBuffer->readData(&(packet.m_pcData), offset, packet.m_iMsgNo, msglen);

        if (-1 == payload) {
            int32_t seqpair[2];
            seqpair[0] = packet.m_iSeqNo;
            seqpair[1] = CSeqNo::incseq(seqpair[0], msglen);
            sendCtrl(7, &packet.m_iMsgNo, seqpair, 8);

            // only one msg drop request is necessary
            m_pSndLossList->remove(seqpair[1]);

            // skip all dropped packets
            if (CSeqNo::seqcmp(m_iSndCurrSeqNo, CSeqNo::incseq(seqpair[1])) < 0)
                m_iSndCurrSeqNo = CSeqNo::incseq(seqpair[1]);

            return 0;
        } else if (0 == payload)
            return 0;

        ++m_iTraceRetrans;
        ++m_iRetransTotal;
    } else {
        // If no loss, pack a new packet.

        // check congestion/flow window limit
        int cwnd = (m_iFlowWindowSize < (int) m_dCongestionWindow) ? m_iFlowWindowSize : (int) m_dCongestionWindow;
        if (cwnd >= CSeqNo::seqlen(m_iSndLastAck, CSeqNo::incseq(m_iSndCurrSeqNo))) {
            if (0 != (payload = m_pSndBuffer->readData(&(packet.m_pcData), packet.m_iMsgNo))) {
                m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo);
                m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);

                packet.m_iSeqNo = m_iSndCurrSeqNo;

                // every 16 (0xF) packets, a packet pair is sent
                if (0 == (packet.m_iSeqNo & 0xF))
                    probe = true;
            } else {
                m_ullTargetTime = 0;
                m_ullTimeDiff = 0;
                ts = 0;
                return 0;
            }
        } else {
            m_ullTargetTime = 0;
            m_ullTimeDiff = 0;
            ts = 0;
            return 0;
        }
    }

    packet.m_iTimeStamp = int(CTimer::getTime() - m_StartTime);
    packet.m_iID = m_PeerID;
    packet.setLength(payload);

    m_pCC->onPktSent(&packet);
    //m_pSndTimeWindow->onPktSent(packet.m_iTimeStamp);

    ++m_llTraceSent;
    ++m_llSentTotal;

    if (probe) {
        // sends out probing packet pair
        ts = entertime;
        probe = false;
    } else {
#ifndef NO_BUSY_WAITING
        ts = entertime + m_ullInterval;
#else
        if (m_ullTimeDiff >= m_ullInterval) {
            ts = entertime;
            m_ullTimeDiff -= m_ullInterval;
        } else {
            ts = entertime + m_ullInterval - m_ullTimeDiff;
            m_ullTimeDiff = 0;
        }
#endif
    }

    m_ullTargetTime = ts;

    return payload;
}

在這個函數中,處理了兩大類packet的讀取,一是丟失的packet,二是正常的順序傳輸包。來看一下這個函數具體的執行過程: 網絡

1. 讀取當前的時間entertime 數據結構

2. 更新m_ullTimeDiff。在UDT中,包發送會有一個隨着網絡情況調整的一個發送週期,也就是m_ullInterval值。在每一次發送包時,都會根據m_ullInterval值計算下一次包發送的理想時間,並記錄在m_ullTargetTime中。而m_ullTimeDiff則被用來記錄當前的此次包發送想對於理想的發送時間的延滯值,這個值會被用於計算下一次包發送的理想時間。UDT正是經過這樣的修正來儘量使的包發送週期可以保持在m_ullInterval值附近。 函數

3. 從丟失包列表m_pSndLossList中獲取一個丟失的包的SeqNo,並賦值給packet.m_iSeqNo。這個丟失包列表中的包多是來源於Timer,好比一個包超過了正常時間尚未獲得響應,也有可能來源於發送端發回的NACK消息,後面會在來研究這個問題。 ui

4. 前一步中獲取的SeqNo大於等於0,這代表存在丟失了須要重傳的包,則讀取丟失的包的內容: spa

(1). 計算丟失的包的SeqNo與SndLastDataAck的差值offset。 .net

(2). 檢查前一步計算出來的offset值,若小於0,代表發送窗口已經滑過了,則直接返回,不然繼續執行。 線程

(3). 根據前面計算的offset值,經過m_pSndBuffer->readData()把數據讀入packet中。packet的m_iMsgNo會被更新爲packet的MsgNo,msglen也會在packet過時時被更新。 設計

(4). m_pSndBuffer->readData()返回0,代表讀取的packet的數據長度爲0。這樣的packet沒有實際發送的必要,直接返回,無需進行後續的步驟。

(5). m_pSndBuffer->readData()返回-1,代表要讀取的packet已通過期,一樣沒有發送這個數據包自己的必要。

但此時會發送一個DropMsgRequest給數據接收端。

而後將過時的packet從SndLossList中移除出去。

m_iSndCurrSeqNo的值爲最近一次發送的packet的SeqNo,這裏還會在必要的時候更新m_iSndCurrSeqNo,以跳過全部被丟棄的packets。必要指的是,m_iSndCurrSeqNo的值小於等於要丟棄的這個Msg的最後一個packet的SeqNo。這也就意味着,要丟棄的這個Msg直到過時被丟棄都沒有發完。

這個地方有一點比較奇怪,簡化來看,seqpair[0] == packet.m_iSeqNoseqpair[1] == seqpair[0] + msglen,也就是說seqpair[1]的值爲要被丟棄Msg的最後一個packet的SeqNo加1,但在判斷是否要更新m_iSndCurrSeqNo時,倒是拿m_iSndCurrSeqNo(seqpair[1] + 1),也就是要被丟棄的Msg的最後一個packet的SeqNo加2在比較,而更新也是被設置爲這個值。但實際上,將m_iSndCurrSeqNo設置爲被丟棄Msg的最後一個packet的SeqNo已經能夠跳過整個Msg的發送了,由於下次要用m_iSndCurrSeqNo來得到SeqNo,會先將這個值加1的。這個地方的邏輯疑似存在bug。

返回0,向調用者代表暫時沒有數據要發送。

(6). m_pSndBuffer->readData()返回大於0的值,代表有一個丟失的包須要從新發送,則更新m_iTraceRetransm_iRetransTotal,這兩個值分別表示一次trace重發的總次數,和此UDT Socket總的重發次數,二者的區別在於前者在被讀取以後會被重置爲0(CUDT::sample()),然後者則不會。此時須要繼續執行後面的第6步。

5. 在第3步中讀取的SeqNo小於0,代表沒有丟失的packet。此時則:

(1). 根據m_iFlowWindowSize和m_dCongestionWindow的值計算cwnd發送窗口的大小。發送窗口大小取這兩個值中較小的那個,默認狀況下,前者爲8192(來自於Handshke消息的m_iFlightFlagSize字段,而m_iFlightFlagSize則根據m_iRcvBufSize和m_iFlightFlagSize得出),後者爲16(來自於CC的m_dCWndSize字段,在CUDTCC::init()中該值被初始化爲16。)。

(2). 檢查發送窗口是否已滿。若已滿,則將m_ullTargetTime和m_ullTimeDiff重置爲0,將ts置爲0,而後返回0,向調用者代表沒有數據要發送。不然繼續執行。

m_iSndLastAck的值爲下一次Ack應該確認的packet的SeqNo,CSeqNo::seqlen()計算的是包含兩個端點在內的區間的長度。此處對CSeqNo::seqlen()的調用被用來計算,下個packet發送以後,發送窗口中全部的packet的個數。

(3). 讀取下一個須要發送的packet,並檢查返回的payload值。若payload值爲0,代表數據緩衝區中全部的數據都已經發送了,無需再進行實際的發送,則將m_ullTargetTime和m_ullTimeDiff重置爲0將ts置爲0,返回0,向調用者代表沒有數據要發送。不然繼續執行。

(4). 主要是更新m_iSndCurrSeqNo,並設置packet的SeqNo字段。若是SeqNo爲16的整數倍,還會設置probe爲true。

6. 設置packet的m_iTimeStamp,m_iID,及數據長度。

7. 更新m_llTraceSent和m_llSentTotal,其中前者表示CUDT此次Trace的過程當中發送的總的packet數量,這個值會在CUDT::sample()獲取trace數據以後被重置爲0,然後者則表示發送的總共的packet數量,不會在CUDT::sample()獲取trace數據以後被重置。

8. 根據probe的值,更新ts值等。配 合CUDT::packData()的調用者CSndUList::pop()一塊兒看,可知ts是理想中該CUDT下次發送數據的時間點

probe設置爲true,就是代表,當前的這個packet被髮送結束以後當即發送下一個packet。即便probe的值不爲true,也有可能要當即發送下一個packet,好比延滯時間已經超過了理想的發生週期。存在延滯時間,但該延滯時間又沒有超出理想的發送週期的,則下個packet的發送時間具體本次packet的發送時間會小於理想的packet發送週期。

總之這裏是但願可以保持packet以接近理想的速率發送。

9. 更新m_ullTargetTime爲ts。ts這個下次發包的理想時間點還須要m_ullTargetTime進行記錄。

10. 返回payload值,也就是讀取的packet的大小。

這裏順便來看下CSeqNo的設計與實現。這個類被用來幫助進行與SeqNo有關的一些計算(src/common.h)

class CSeqNo {
 public:
    inline static int seqcmp(int32_t seq1, int32_t seq2) {
        return (abs(seq1 - seq2) < m_iSeqNoTH) ? (seq1 - seq2) : (seq2 - seq1);
    }

    inline static int seqlen(int32_t seq1, int32_t seq2) {
        return (seq1 <= seq2) ? (seq2 - seq1 + 1) : (seq2 - seq1 + m_iMaxSeqNo + 2);
    }

    inline static int seqoff(int32_t seq1, int32_t seq2) {
        if (abs(seq1 - seq2) < m_iSeqNoTH)
            return seq2 - seq1;

        if (seq1 < seq2)
            return seq2 - seq1 - m_iMaxSeqNo - 1;

        return seq2 - seq1 + m_iMaxSeqNo + 1;
    }

    inline static int32_t incseq(int32_t seq) {
        return (seq == m_iMaxSeqNo) ? 0 : seq + 1;
    }

    inline static int32_t decseq(int32_t seq) {
        return (seq == 0) ? m_iMaxSeqNo : seq - 1;
    }

    inline static int32_t incseq(int32_t seq, int32_t inc) {
        return (m_iMaxSeqNo - seq >= inc) ? seq + inc : seq - m_iMaxSeqNo + inc - 1;
    }

 public:
    static const int32_t m_iSeqNoTH;             // threshold for comparing seq. no.
    static const int32_t m_iMaxSeqNo;            // maximum sequence number used in UDT
};

鏈接發起端在執行CUDT::connect(const sockaddr* serv_addr)時,會計算一個隨機的值做爲m_iISN,也便是發送的首個數據packet的SeqNo,而在鏈接創建過程當中,這個值會被同步給Peer端的Socket。

這裏能夠看到,UDT Packet的SeqNo是[0, 0x7FFFFFFF]區間中的一個值。每發送一個packet,m_iSndCurrSeqNo都會被遞增。經過class CSeqNo能夠看到這個遞增的規則,即SeqNo超出0x7FFFFFFF時會被歸0。也正是因爲0是一個合法的SeqNo,在incseq(int32_t seq, int32_t inc)中,SeqNo超出最大值時的計算裏能看到有額外的減1,在seqlen()裏,SeqNo超出最大值時的計算裏能看到加2

由seqcmp()和seqoff()這兩個函數可見,同一時刻同時有效的兩個SeqNo seq1和seq2之間的距離不能超過m_iSeqNoTH 0x3FFFFFFF,若超過則代表必定有一個SeqNo越過了最大值0x7FFFFFFF,也即較小的那個值越過了最大值。

這裏還能夠再來看一下CSndBuffer::readData():

int CSndBuffer::readData(char** data, int32_t& msgno) {
    // No data to read
    if (m_pCurrBlock == m_pLastBlock)
        return 0;

    *data = m_pCurrBlock->m_pcData;
    int readlen = m_pCurrBlock->m_iLength;
    msgno = m_pCurrBlock->m_iMsgNo;

    m_pCurrBlock = m_pCurrBlock->m_pNext;

    return readlen;
}

int CSndBuffer::readData(char** data, const int offset, int32_t& msgno, int& msglen) {
    CGuard bufferguard(m_BufLock);

    Block* p = m_pFirstBlock;

    for (int i = 0; i < offset; ++i)
        p = p->m_pNext;

    if ((p->m_iTTL >= 0) && ((CTimer::getTime() - p->m_OriginTime) / 1000 > (uint64_t) p->m_iTTL)) {
        msgno = p->m_iMsgNo & 0x1FFFFFFF;

        msglen = 1;
        p = p->m_pNext;
        bool move = false;
        while (msgno == (p->m_iMsgNo & 0x1FFFFFFF)) {
            if (p == m_pCurrBlock)
                move = true;
            p = p->m_pNext;
            if (move)
                m_pCurrBlock = p;
            msglen++;
        }

        return -1;
    }

    *data = p->m_pcData;
    int readlen = p->m_iLength;
    msgno = p->m_iMsgNo;

    return readlen;
}

CSndBuffer::readData(char** data, int32_t& msgno)讀取當前的Block。基本上就是讀取Block,而後將指向當前Block的指針m_pCurrBlock向後移一個Block。

而CSndBuffer::readData(char** data, const int offset, int32_t& msgno, int& msglen)則是讀取距未響應的Block中最舊一塊Block offset個單位的Block。在這個函數中,首先是移動到要讀取的目標Block,若是要讀取的Block已過時,則使m_pCurrBlock跳過該packet所屬的Msg的全部Packet,而後返回-1退出。目標Block沒有過時,則讀取Block後返回數據長度。

總結在CUDT::packData()中對發送過程的控制。

丟失的包具備最高的發送優先級,這也是發送可靠性的保障方法。全部丟失的packet都會被放進SndLossList,這個List中的包可能來源於超時未獲得響應,也可能來源於消息接收端發回的NACK。

對於正常的順序packet發送的控制主要在於兩個方面,一是發送窗口的大小,也就是某個時刻已經發送但未獲得相應的packet的最大個數,這一點主要由m_dCongestionWindow和m_iFlowWindowSize來表示;二是控制兩個包發送的時間間隔,也就是包的發送速率,這一點則主要用m_ullInterval來表示。全部的發送控制機制主要經過影響這幾個變量來控制發送過程。

SndLossList

先來看一下CSndLossList這個數據結構。這個Class的定義以下(src/list.h):

class CSndLossList {
 public:
    CSndLossList(int size = 1024);
    ~CSndLossList();

    // Functionality:
    //    Insert a seq. no. into the sender loss list.
    // Parameters:
    //    0) [in] seqno1: sequence number starts.
    //    1) [in] seqno2: sequence number ends.
    // Returned value:
    //    number of packets that are not in the list previously.
    int insert(int32_t seqno1, int32_t seqno2);

    // Functionality:
    //    Remove ALL the seq. no. that are not greater than the parameter.
    // Parameters:
    //    0) [in] seqno: sequence number.
    // Returned value:
    //    None.
    void remove(int32_t seqno);

    // Functionality:
    //    Read the loss length.
    // Parameters:
    //    None.
    // Returned value:
    //    The length of the list.
    int getLossLength();

    // Functionality:
    //    Read the first (smallest) loss seq. no. in the list and remove it.
    // Parameters:
    //    None.
    // Returned value:
    //    The seq. no. or -1 if the list is empty.
    int32_t getLostSeq();

 private:
    int32_t* m_piData1;                  // sequence number starts
    int32_t* m_piData2;                  // seqnence number ends
    int* m_piNext;                       // next node in the list

    int m_iHead;                         // first node
    int m_iLength;                       // loss length
    int m_iSize;                         // size of the static array
    int m_iLastInsertPos;                // position of last insert node

    pthread_mutex_t m_ListLock;          // used to synchronize list operation

 private:
    CSndLossList(const CSndLossList&);
    CSndLossList& operator=(const CSndLossList&);
};

這是一個不可複製容器。提供的接口不是不少,配合註釋,都沒有太多難以理解的地方。這是一個用數組實現的鏈表。接着來看這個class的構造和析構(src/list.cpp):

CSndLossList::CSndLossList(int size)
        : m_piData1(NULL),
          m_piData2(NULL),
          m_piNext(NULL),
          m_iHead(-1),
          m_iLength(0),
          m_iSize(size),
          m_iLastInsertPos(-1),
          m_ListLock() {
    m_piData1 = new int32_t[m_iSize];
    m_piData2 = new int32_t[m_iSize];
    m_piNext = new int[m_iSize];

    // -1 means there is no data in the node
    for (int i = 0; i < size; ++i) {
        m_piData1[i] = -1;
        m_piData2[i] = -1;
    }

    // sender list needs mutex protection
#ifndef WIN32
    pthread_mutex_init(&m_ListLock, 0);
#else
    m_ListLock = CreateMutex(NULL, false, NULL);
#endif
}

CSndLossList::~CSndLossList() {
    delete[] m_piData1;
    delete[] m_piData2;
    delete[] m_piNext;

#ifndef WIN32
    pthread_mutex_destroy(&m_ListLock);
#else
    CloseHandle(m_ListLock);
#endif
}

CUDT::connect()中建立CSndLossList時,size值爲m_iFlowWindowSize * 2,也即8192 × 2 == 16384。若是不用數組,而用常規一點的方法來實現的話鏈表的節點定義多是這樣的

struct Node {
    int32_t m_iStart;
    int32_t m_iEnd;
    Node *m_pNext;
};

而後來看這個class最關鍵的函數之一CSndLossList::insert()的定義:

int CSndLossList::insert(int32_t seqno1, int32_t seqno2) {
    CGuard listguard(m_ListLock);

    if (0 == m_iLength) {
        // insert data into an empty list

        m_iHead = 0;
        m_piData1[m_iHead] = seqno1;
        if (seqno2 != seqno1)
            m_piData2[m_iHead] = seqno2;

        m_piNext[m_iHead] = -1;
        m_iLastInsertPos = m_iHead;

        m_iLength += CSeqNo::seqlen(seqno1, seqno2);

        return m_iLength;
    }

    // otherwise find the position where the data can be inserted
    int origlen = m_iLength;
    int offset = CSeqNo::seqoff(m_piData1[m_iHead], seqno1);
    int loc = (m_iHead + offset + m_iSize) % m_iSize;

    if (offset < 0) {
        // Insert data prior to the head pointer

        m_piData1[loc] = seqno1;
        if (seqno2 != seqno1)
            m_piData2[loc] = seqno2;

        // new node becomes head
        m_piNext[loc] = m_iHead;
        m_iHead = loc;
        m_iLastInsertPos = loc;

        m_iLength += CSeqNo::seqlen(seqno1, seqno2);
    } else if (offset > 0) {
        if (seqno1 == m_piData1[loc]) {
            m_iLastInsertPos = loc;

            // first seqno is equivlent, compare the second
            if (-1 == m_piData2[loc]) {
                if (seqno2 != seqno1) {
                    m_iLength += CSeqNo::seqlen(seqno1, seqno2) - 1;
                    m_piData2[loc] = seqno2;
                }
            } else if (CSeqNo::seqcmp(seqno2, m_piData2[loc]) > 0) {
                // new seq pair is longer than old pair, e.g., insert [3, 7] to [3, 5], becomes [3, 7]
                m_iLength += CSeqNo::seqlen(m_piData2[loc], seqno2) - 1;
                m_piData2[loc] = seqno2;
            } else
                // Do nothing if it is already there
                return 0;
        } else {
            // searching the prior node
            int i;
            if ((-1 != m_iLastInsertPos) && (CSeqNo::seqcmp(m_piData1[m_iLastInsertPos], seqno1) < 0))
                i = m_iLastInsertPos;
            else
                i = m_iHead;

            while ((-1 != m_piNext[i]) && (CSeqNo::seqcmp(m_piData1[m_piNext[i]], seqno1) < 0))
                i = m_piNext[i];

            if ((-1 == m_piData2[i]) || (CSeqNo::seqcmp(m_piData2[i], seqno1) < 0)) {
                m_iLastInsertPos = loc;

                // no overlap, create new node
                m_piData1[loc] = seqno1;
                if (seqno2 != seqno1)
                    m_piData2[loc] = seqno2;

                m_piNext[loc] = m_piNext[i];
                m_piNext[i] = loc;

                m_iLength += CSeqNo::seqlen(seqno1, seqno2);
            } else {
                m_iLastInsertPos = i;

                // overlap, coalesce with prior node, insert(3, 7) to [2, 5], ... becomes [2, 7]
                if (CSeqNo::seqcmp(m_piData2[i], seqno2) < 0) {
                    m_iLength += CSeqNo::seqlen(m_piData2[i], seqno2) - 1;
                    m_piData2[i] = seqno2;

                    loc = i;
                } else
                    return 0;
            }
        }
    } else {
        m_iLastInsertPos = m_iHead;

        // insert to head node
        if (seqno2 != seqno1) {
            if (-1 == m_piData2[loc]) {
                m_iLength += CSeqNo::seqlen(seqno1, seqno2) - 1;
                m_piData2[loc] = seqno2;
            } else if (CSeqNo::seqcmp(seqno2, m_piData2[loc]) > 0) {
                m_iLength += CSeqNo::seqlen(m_piData2[loc], seqno2) - 1;
                m_piData2[loc] = seqno2;
            } else
                return 0;
        } else
            return 0;
    }

    // coalesce with next node. E.g., [3, 7], ..., [6, 9] becomes [3, 9]
    while ((-1 != m_piNext[loc]) && (-1 != m_piData2[loc])) {
        int i = m_piNext[loc];

        if (CSeqNo::seqcmp(m_piData1[i], CSeqNo::incseq(m_piData2[loc])) <= 0) {
            // coalesce if there is overlap
            if (-1 != m_piData2[i]) {
                if (CSeqNo::seqcmp(m_piData2[i], m_piData2[loc]) > 0) {
                    if (CSeqNo::seqcmp(m_piData2[loc], m_piData1[i]) >= 0)
                        m_iLength -= CSeqNo::seqlen(m_piData1[i], m_piData2[loc]);

                    m_piData2[loc] = m_piData2[i];
                } else
                    m_iLength -= CSeqNo::seqlen(m_piData1[i], m_piData2[i]);
            } else {
                if (m_piData1[i] == CSeqNo::incseq(m_piData2[loc]))
                    m_piData2[loc] = m_piData1[i];
                else
                    m_iLength--;
            }

            m_piData1[i] = -1;
            m_piData2[i] = -1;
            m_piNext[loc] = m_piNext[i];
        } else
            break;
    }

    return m_iLength - origlen;
}

1. 這個函數首先處理了最簡單的向空鏈表中插入元素的case。

這種狀況下,m_iHead被賦予0值。m_piData1[m_iHead]會被賦值爲要插入的這段丟失packet範圍的起始SeqNo。

若是起始SeqNo和結束SeqNo的值不一樣,m_piData2[m_iHead]還會被賦值爲結束SeqNo;若是相同,則m_piData2[m_iHead]將仍然保持構造函數中初始化的-1,以表示這段丟失packet範圍只有一個元素。

m_piNext[m_iHead]被賦值爲-1以表示這是鏈表中的最後一個元素。m_iLastInsertPos用來記錄上一次插入的位置,這裏會被賦值爲m_iHead。m_iLength表示CSndLossList中記錄的丟失packet的總格數,這裏會被設置爲這段packet的長度。而後返回m_iLength

可見m_iHead指向鏈表的頭部。m_piData1,m_piData2m_piNext這三個數組中相同位置的元素共同表示一個鏈表節點,它們分別表示一個丟失packet範圍的起始SeqNo,結束SeqNo和該節點在鏈表中next節點的位置。

2. 鏈表中已經有元素了,則將m_iLength保存在origlen中。計算要插入的這段丟失packet的起始SeqNo與鏈表中原有的頭節點的起始SeqNo字段的差值offset。而後計算要插入的這段丟失packet範圍的的可能的位置loc,這個可能的位置主要由這段丟失packet範圍的起始SeqNo與鏈表中原有的頭節點的起始SeqNo字段的差值決定。

由loc的計算方法可見,數組中的空間是被循環利用的。好比要插入的節點是向CSndLossList中插入的第二個節點,則此時m_iHead仍然爲0,而要插入的這個丟失packet範圍的起始位置小於原有的頭節點的起始SeqNo字段,則新插入的節點將被繞回到數組的尾部。

3. 處理offset小於0的狀況。這表示插入的這個丟失packet範圍的起始位置小於原有的頭節點的起始SeqNo字段值,此時則會在loc位置插入一個新的節點以描述這段丟失packet範圍。更新m_iHead和m_iLastInsertPos指向新插入的這個節點。並更新m_iLength以體現新加入的這個丟失packet範圍

向單向鏈表的頭部插入元素老是比較簡單。由此咱們也看到,這個鏈表是以節點的起始SeqNo字段值的升序排列的有序鏈表。

但這個地方貌似沒有處理新插入的這個丟失packet範圍與原有頭節點表示的丟失packet範圍存在交叉的狀況?沒錯,是沒有處理,這種狀況會在處理完全部的插入狀況以後再統一來作。

4. 處理offset大於0的狀況。這又分爲兩種狀況:

(1). seqno1 == m_piData1[loc],代表新節點的目標插入位置中原有節點保存的丟失Packet範圍的起始SeqNo與要插入的這個丟失Packets範圍的起始SeqNo相同。則此時會首先更新m_iLastInsertPos爲loc,還須要處理這樣的幾種case,

case 1:原有的範圍中只有一個元素,要插入的這個範圍有多個元素。

case 2:原有的範圍中有多個元素,要插入的這個範圍有一個元素。

case 3:原有的範圍和要插入的範圍都只有一個元素。

case 4:原有的範圍和要插入的範圍中都有多個元素,但新插入的範圍徹底包含原有的範圍

case 5:原有的範圍和要插入的範圍中都有多個元素,但原有的範圍徹底包含新插入的範圍。

case 6:原有的範圍和要插入的範圍中都有多個元素,且徹底相同。

這些case包含的packet範圍的相對關係能夠用下圖來簡單表示:


代碼的具體寫法不一樣,這些case中的一些可能會以不一樣的方式被合併成一個處理,而有些case則不須要對原有的鏈表進行任何的調整。

if block中處理的是case1和case3,else-if block中處理的是case 4,else block中處理的是case 2,case 5和case 6。其中case 3,case 2,case 5和case 6都不須要對鏈表作出調整。

以此來看,在第一個if block的內部,應該再加一個else block來直接返回0會比較好一點。

那段代碼的一種等價實現形式:

if (seqno2 != seqno1) {
                if (-1 == m_piData2[loc]) {
                    m_iLength += CSeqNo::seqlen(seqno1, seqno2) - 1;
                    m_piData2[loc] = seqno2;
                } else if (CSeqNo::seqcmp(seqno2, m_piData2[loc]) > 0) {
                    // new seq pair is longer than old pair, e.g., insert [3, 7] to [3, 5], becomes [3, 7]
                    m_iLength += CSeqNo::seqlen(m_piData2[loc], seqno2) - 1;
                    m_piData2[loc] = seqno2;
                } else {
                    return 0;
                }
            } else {
                // Do nothing if it is already there
                return 0;
            }

(2). seqno1與m_piData1[loc]不相等。這其實主要有兩種可能,一是loc位置已經有了其它的節點,但該節點所表示的範圍的起始SeqNo與要插入的這個範圍的起始SeqNo不一樣;二是loc位置尚未被插入節點。但第一種可能應該是不會出現的,於是這裏實際要處理的也就是loc位置尚未節點插入的狀況。

對於這種狀況,節點的插入位置徹底不是問題,關鍵的問題是調整鏈表中一些節點的關係。能夠看到這裏的處理過程:

查找新插入節點前面的那個節點。該查找過程的開始位置由m_piData1[m_iLastInsertPos]與seqno1的相對大小決定,若是前者較小,則從m_iLastInsertPos開始,不然,從m_iHead開始。這大概主要是想要利用空間局部性原理來提升查找的效率。而後就是經過一個循環找到新插入節點前面的那個節點。

找到的這前面的節點所表示的丟失packet範圍與要插入的節點所要表示的範圍之間的關係又有這樣的幾種case:

case 1:前面的節點表示的範圍只有一個packet。

case 2:前面的節點表示的範圍含有多個packet,但它的結束SeqNo仍然小於要插入的範圍的起始SeqNo。

case 3:前面的節點表示的範圍含有多個packet,但它的結束SeqNo大於等於要插入的範圍的起始SeqNo。

前兩個case代表兩個範圍不相交,而case 3則代表兩個範圍是相交的。對於前兩個case,則在前面的那個節點以後插入一個節點,鏈表中節點的鏈接關係作適當的調整便可。對於case 3,則須要將插入的這個範圍合併入前面的那個節點。若是前面的那個節點包含的範圍徹底覆蓋了要插入的範圍,則什麼都不作,若是不是則須要對結束SeqNo字段作一些調整。

5. offset值等於0,代表要插入的這個範圍的起始SeqNo與Head節點表示的範圍的起始SeqNo相同,這個過程則與offset大於0時,seqno1 == m_piData1[loc]中的處理基本一致。

6. 從插入新節點的位置開始,合併鏈表中與新插入的這個丟失packet範圍相交,或被包含或牢牢相鄰的節點。

7. 返回新加入CSndLossList的丟失packet的總個數。

有這個函數的整個執行過程不難看出,頭節點中將包含最老的丟失packets。

看完了插入,天然不能再也不來看一下CSndLossList::remove():

void CSndLossList::remove(int32_t seqno) {
    CGuard listguard(m_ListLock);

    if (0 == m_iLength)
        return;

    // Remove all from the head pointer to a node with a larger seq. no. or the list is empty
    int offset = CSeqNo::seqoff(m_piData1[m_iHead], seqno);
    int loc = (m_iHead + offset + m_iSize) % m_iSize;

    if (0 == offset) {
        // It is the head. Remove the head and point to the next node
        loc = (loc + 1) % m_iSize;

        if (-1 == m_piData2[m_iHead])
            loc = m_piNext[m_iHead];
        else {
            m_piData1[loc] = CSeqNo::incseq(seqno);
            if (CSeqNo::seqcmp(m_piData2[m_iHead], CSeqNo::incseq(seqno)) > 0)
                m_piData2[loc] = m_piData2[m_iHead];

            m_piData2[m_iHead] = -1;

            m_piNext[loc] = m_piNext[m_iHead];
        }

        m_piData1[m_iHead] = -1;

        if (m_iLastInsertPos == m_iHead)
            m_iLastInsertPos = -1;

        m_iHead = loc;

        m_iLength--;
    } else if (offset > 0) {
        int h = m_iHead;

        if (seqno == m_piData1[loc]) {
            // target node is not empty, remove part/all of the seqno in the node.
            int temp = loc;
            loc = (loc + 1) % m_iSize;

            if (-1 == m_piData2[temp])
                m_iHead = m_piNext[temp];
            else {
                // remove part, e.g., [3, 7] becomes [], [4, 7] after remove(3)
                m_piData1[loc] = CSeqNo::incseq(seqno);
                if (CSeqNo::seqcmp(m_piData2[temp], m_piData1[loc]) > 0)
                    m_piData2[loc] = m_piData2[temp];
                m_iHead = loc;
                m_piNext[loc] = m_piNext[temp];
                m_piNext[temp] = loc;
                m_piData2[temp] = -1;
            }
        } else {
            // target node is empty, check prior node
            int i = m_iHead;
            while ((-1 != m_piNext[i]) && (CSeqNo::seqcmp(m_piData1[m_piNext[i]], seqno) < 0))
                i = m_piNext[i];

            loc = (loc + 1) % m_iSize;

            if (-1 == m_piData2[i])
                m_iHead = m_piNext[i];
            else if (CSeqNo::seqcmp(m_piData2[i], seqno) > 0) {
                // remove part/all seqno in the prior node
                m_piData1[loc] = CSeqNo::incseq(seqno);
                if (CSeqNo::seqcmp(m_piData2[i], m_piData1[loc]) > 0)
                    m_piData2[loc] = m_piData2[i];

                m_piData2[i] = seqno;

                m_piNext[loc] = m_piNext[i];
                m_piNext[i] = loc;

                m_iHead = loc;
            } else
                m_iHead = m_piNext[i];
        }

        // Remove all nodes prior to the new head
        while (h != m_iHead) {
            if (m_piData2[h] != -1) {
                m_iLength -= CSeqNo::seqlen(m_piData1[h], m_piData2[h]);
                m_piData2[h] = -1;
            } else
                m_iLength--;

            m_piData1[h] = -1;

            if (m_iLastInsertPos == h)
                m_iLastInsertPos = -1;

            h = m_piNext[h];
        }
    }
}

先來回憶下CSndLossList的類定義中,對於這個函數的語義的說明:移除全部不大於參數值的SeqNo。這個函數的主要執行過程以下:

1. 檢查m_iLength是否爲0,若爲0,說明CSndLossList尚未加入任何SeqNo,則直接返回,不然繼續執行。

2. 計算頭節點所表示的丟失packet範圍的起始SeqNo與參數seqno的offset,及可能以seqno做爲起始SeqNo字段的節點的位置loc。

根據這個函數的語義,咱們知道,實際上是不須要處理offset小於的case的。如咱們前面所瞭解的,頭節點所表示的SeqNo範圍是CSndLossList中SeqNo值最小的一個範圍,而若是seqno小於這個範圍的起始SeqNo的話,則說明不大於seqno的全部SeqNo都已經不存在了。

3. 處理offset == 0的狀況。offset == 0,代表seqno是包含於頭節點所表示的範圍,並且仍是這個範圍的起始SeqNo。此時又主要分兩種狀況來處理:

(1). 頭節點表示的這個範圍只有一個SeqNo。

(2). 頭節點表示的範圍包含多個SeqNo。

對於狀況(1),則頭節點將向後滑動一個節點,原來頭節點的存放位置會被複位。對於狀況(2),爲了保證兩個節點的相對位置等於節點所表示的丟失packet範圍的起始SeqNo的差值這樣的一種節點間關係依然成立,須要將頭節點保存的位置後移一個位置。對於狀況(2),還會再分爲兩種狀況來處理,一是原來的頭節點中只包含2個SeqNo,則在移除seqno後只剩下了一個,此時要保持m_piData2[loc]爲-1,若包含3個及以上SeqNo的,則要複製原來的頭節點的結束SeqNo到新的位置。

其它就是適當地更新m_iLastInsertPos,m_iHead和m_iLength了。這裏彷佛補一個

m_piNext[m_iHead] = -1;

要更好一點。

4. 處理offset大於0的狀況。對於這種狀況,比較麻煩的是找到seqno具體包含在哪一個節點中。處理過程大體爲:

(1). 檢查一下,seqno與m_piData1[loc]是否相等,若相等,則要找的節點已是找到了,且seqno爲目標節點表示的丟失packet範圍的起始seqno。此時則會再分爲兩種狀況來處理,一是目標節點中只包含一個SeqNo,則使鏈表頭指向目標節點的下一個節點。

二是目標節點中包含多個SeqNo,則須要將seqno排除在目標節點範圍以外新建一個節點,將新節點保存在目標節點後面相鄰的位置,若目標節點中包含2個節點,則須要設置新節點的結束SeqNo字段爲-1,若大於等於3,則複製此字段的值,而後將原來的目標節點改造爲只包含seqno的節點,並使它指向新建的這個節點。總之就是將原來的一個節點拆分紅了兩個節點,一個節點只包含seqno,另外一個則包含原來的目標節點中其他的SeqNo。

(2). seqno與m_piData1[loc]不相等的狀況,則須要先找到起始SeqNo字段小於seqno的SeqNo值最大的那個節點。這又能夠分爲3種case來處理,

case 1:找到的節點只包含一個SeqNo,此時則使鏈表頭指向找到的節點的next節點。

case 2:找到的節點包含多個SeqNo,且seqno小於找到的節點的結束SeqNo字段,此時則須要將找到的節點裂爲鏈表中的兩個節點,一個包含的範圍爲[原節點的起始SeqNo,seqno],另外一個包含的範圍爲[seqno + 1, 原節點的結束SeqNo]。同時鏈表頭應該指向後者。

case 3:找到的節點包含多個SeqNo,且seqno大於等於找到的節點的結束SeqNo字段,此時則一樣使鏈表頭指向找到的節點的next節點

5. 移除新找到的頭節點以前全部的節點。適當地更新m_iLength等。

這個地方移除的操做看起來好羅嗦。必需要計算出offset值,而不是簡單的用比較符號,是由SeqNo的遞增規則決定的。但對於offset大於等於0的狀況,則可使用統一的過程來處理:找到起始SeqNo字段不大於seqno的 SeqNo值最大的那個節點,而後根據seqno與這個節點描述的範圍的4種關係,即seqno等於找到的節點的起始SeqNo值,seqno大於起始SeqNo值但小於結束SeqNo值,seqno等於結束SeqNo值,seqno大於結束SeqNo值,及找到的節點所描述的範圍的大小,來分類處理便可。

還有咱們前面在CUDT::packData()中看到的CSndLossList::getLostSeq():

int32_t CSndLossList::getLostSeq() {
    if (0 == m_iLength)
        return -1;

    CGuard listguard(m_ListLock);

    if (0 == m_iLength)
        return -1;

    if (m_iLastInsertPos == m_iHead)
        m_iLastInsertPos = -1;

    // return the first loss seq. no.
    int32_t seqno = m_piData1[m_iHead];

    // head moves to the next node
    if (-1 == m_piData2[m_iHead]) {
        //[3, -1] becomes [], and head moves to next node in the list
        m_piData1[m_iHead] = -1;
        m_iHead = m_piNext[m_iHead];
    } else {
        // shift to next node, e.g., [3, 7] becomes [], [4, 7]
        int loc = (m_iHead + 1) % m_iSize;

        m_piData1[loc] = CSeqNo::incseq(seqno);
        if (CSeqNo::seqcmp(m_piData2[m_iHead], m_piData1[loc]) > 0)
            m_piData2[loc] = m_piData2[m_iHead];

        m_piData1[m_iHead] = -1;
        m_piData2[m_iHead] = -1;

        m_piNext[loc] = m_piNext[m_iHead];
        m_iHead = loc;
    }

    m_iLength--;

    return seqno;
}

這個函數讀取首個丟失packet的SeqNo。

這個函數會取鏈表頭節點中最小的SeqNo,返回給調用者,而後將這個SeqNo從鏈表頭節點中移除。移除的時候又能夠分爲2種狀況:1. 頭節點描述的packet範圍只包含一個SeqNo,2. 包含多個SeqNo。

對於狀況1,則將鏈表頭節點向後移動一個節點,而後移除原來的頭節點。對於狀況2,則將原來的頭節點分裂爲兩個節點,一個只包含首個丟失packet的SeqNo,另外一個包含原來頭節點中其他的SeqNo,令鏈表頭節點指向後一個節點,並移除前一個節點。

可見,經過CSndLossList::getLostSeq()返回給調用者的Packet是會被從CSndLossList中移除出去的。

最後能夠再來看一下,通過了這些操做的蹂躪以後,這個鏈表可能的樣子。好比有4段丟失的packet,其SeqNo範圍及被插入的順序爲[8, 9],[3, 5],[12,12],[15, 19],假設緩衝區大小爲20,則看起來可能爲:

UDT中,丟失packet列表大致如此。

Done。

相關文章
相關標籤/搜索