UDT協議實現分析——數據的接收

看了UDT中數據發送的部分以後,咱們轉換一個角度,來看一下接收端發生的故事。 ios

如咱們前面在 UDT協議實現分析——鏈接的創建 一文中看到的那樣,CUDT在connect()的後半場,會經過調用CRcvQueue::removeConnector()把它本身從它的CChannel的接收隊列CRcvQueue的m_pRendezvousQueue隊列中移除出去,以表示鏈接已成功創建,後面再也不經過m_pRendezvousQueue接收鏈接相關消息,並經過調用CRcvQueue::setNewEntry()及CRcvQueue::worker()的機制,將本身加進本身的CChannel的接收隊列CRcvQueue的m_pRcvUList和m_pHash中,以便於可以經過m_pRcvUList和m_pHash接收後續發送給本身的控制消息及數據消息。 數組

CUnitQueue

如在前面CRcvQueue::worker()函數的定義中所看到的那樣,一次接收過程大致爲,從CUnitQueue m_UnitQueue中獲取一個可用的CUnit unit,將接收到的數據/控制消息保存進unit,而後將根據消息的具體狀況,將消息dispatch不一樣的接收者。 函數

這裏就先來看一下CUnitQueue。先是這個類的定義(src/queue.h): ui

struct CUnit {
    CPacket m_Packet;		// packet
    int m_iFlag;			// 0: free, 1: occupied, 2: msg read but not freed (out-of-order), 3: msg dropped
};

class CUnitQueue {
    friend class CRcvQueue;
    friend class CRcvBuffer;

 public:
    CUnitQueue();
    ~CUnitQueue();

 public:

    // Functionality:
    //    Initialize the unit queue.
    // Parameters:
    //    1) [in] size: queue size
    //    2) [in] mss: maximum segament size
    //    3) [in] version: IP version
    // Returned value:
    //    0: success, -1: failure.

    int init(int size, int mss, int version);

    // Functionality:
    //    Increase (double) the unit queue size.
    // Parameters:
    //    None.
    // Returned value:
    //    0: success, -1: failure.

    int increase();

    // Functionality:
    //    Decrease (halve) the unit queue size.
    // Parameters:
    //    None.
    // Returned value:
    //    0: success, -1: failure.

    int shrink();

    // Functionality:
    //    find an available unit for incoming packet.
    // Parameters:
    //    None.
    // Returned value:
    //    Pointer to the available unit, NULL if not found.

    CUnit* getNextAvailUnit();

 private:
    struct CQEntry {
        CUnit* m_pUnit;		// unit queue
        char* m_pBuffer;		// data buffer
        int m_iSize;		// size of each queue

        CQEntry* m_pNext;
    }*m_pQEntry,			// pointer to the first unit queue
            *m_pCurrQueue,		// pointer to the current available queue
            *m_pLastQueue;		// pointer to the last unit queue

    CUnit* m_pAvailUnit;         // recent available unit

    int m_iSize;			// total size of the unit queue, in number of packets
    int m_iCount;		// total number of valid packets in the queue

    int m_iMSS;			// unit buffer size
    int m_iIPversion;		// IP version

 private:
    CUnitQueue(const CUnitQueue&);
    CUnitQueue& operator=(const CUnitQueue&);
};

一樣是一個不可複製的容器,CUnit的容器。但到底是什麼類型的容器,則還須要看一下成員函數的具體定義。先來看一下構造函數,析構函數及初始化init()函數(src/queue.cpp): this

CUnitQueue::CUnitQueue()
        : m_pQEntry(NULL),
          m_pCurrQueue(NULL),
          m_pLastQueue(NULL),
          m_iSize(0),
          m_iCount(0),
          m_iMSS(),
          m_iIPversion() {
}

CUnitQueue::~CUnitQueue() {
    CQEntry* p = m_pQEntry;

    while (p != NULL) {
        delete[] p->m_pUnit;
        delete[] p->m_pBuffer;

        CQEntry* q = p;
        if (p == m_pLastQueue)
            p = NULL;
        else
            p = p->m_pNext;
        delete q;
    }
}

int CUnitQueue::init(int size, int mss, int version) {
    CQEntry* tempq = NULL;
    CUnit* tempu = NULL;
    char* tempb = NULL;

    try {
        tempq = new CQEntry;
        tempu = new CUnit[size];
        tempb = new char[size * mss];
    } catch (...) {
        delete tempq;
        delete[] tempu;
        delete[] tempb;

        return -1;
    }

    for (int i = 0; i < size; ++i) {
        tempu[i].m_iFlag = 0;
        tempu[i].m_Packet.m_pcData = tempb + i * mss;
    }
    tempq->m_pUnit = tempu;
    tempq->m_pBuffer = tempb;
    tempq->m_iSize = size;

    m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq;
    m_pQEntry->m_pNext = m_pQEntry;

    m_pAvailUnit = m_pCurrQueue->m_pUnit;

    m_iSize = size;
    m_iMSS = mss;
    m_iIPversion = version;

    return 0;
}

構造函數沒有什麼特別須要注意的地方。來看CUnitQueue::init()的定義,它執行了這樣一些步驟: spa

1. 先是分配了一個CQEntry對象tempq,一個長度爲size的CUnit數組tempu,和長度爲(size * mss)的一段內存緩衝區tempb。 .net

2. 初始化CUnit數組tempu中的每一個元素,設置每一個元素的m_iFlag爲0,每一個元素的CPacket類型字段m_Packet的數據區指針m_pcData指向內存緩衝區tempb的適當位置,即第0個元素的CPacket類型字段m_Packet的數據區指針m_pcData指向tempb的起始位置,後面的元素的CPacket類型字段m_Packet的數據區指針m_pcData依次指向它前面的元素所指位置以後的mss字節處。 線程

3. 設置tempq的m_pUnit字段指向tempu,tempq的m_pBuffer字段指向tempb,tempq的m_iSize字段爲size。將tempq的值賦給m_pQEntry,m_pCurrQueue和m_pLastQueue。令m_pQEntry->m_pNext爲m_pQEntry,令m_pAvailUnit爲m_pCurrQueue->m_pUnit,令m_iSize,m_iMSSm_iIPversion分別爲傳入的參數size,mss,和version。 設計

UDT中執行CUnitQueue::init()時實際傳入的參數爲傳遞給CRcvQueue::init()的參數qsize, payload, version,這些參數的實際值分別爲32,s->m_pUDT->m_iPayloadSize和m.m_iIPversion。
指針

而後返回0。

基本上能夠認爲,在CUnitQueue::init()中基本上就是構造了一個有效的CQEntry。

由此不難看到CUnitQueue中主要是一個CQEntry的環形鏈表。CQEntry用於管理一段用於存放接收到的數據的內存緩衝區m_pBuffer及CUnit結構m_pUnit,前者用於實際存放接收到的消息的數據部分,後者則用於將接收到的內容組織爲CPacket。

(CUnitQueue + CQEntry + CUnit)這樣的結構與(CSndBuffer + Buffer + Block)結構仍是蠻類似的。

接着來看咱們的老朋友CUnitQueue::getNextAvailUnit(),CRcvQueue::worker()正是經過調用它得到一個CUnit,用於存放接收到的消息:

CUnit* CUnitQueue::getNextAvailUnit() {
    if (m_iCount * 10 > m_iSize * 9)
        increase();

    if (m_iCount >= m_iSize)
        return NULL;

    CQEntry* entrance = m_pCurrQueue;

    do {
        for (CUnit* sentinel = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize - 1; m_pAvailUnit != sentinel;
                ++m_pAvailUnit)
            if (m_pAvailUnit->m_iFlag == 0)
                return m_pAvailUnit;

        if (m_pCurrQueue->m_pUnit->m_iFlag == 0) {
            m_pAvailUnit = m_pCurrQueue->m_pUnit;
            return m_pAvailUnit;
        }

        m_pCurrQueue = m_pCurrQueue->m_pNext;
        m_pAvailUnit = m_pCurrQueue->m_pUnit;
    } while (m_pCurrQueue != entrance);

    increase();

    return NULL;
}

這函數的執行過程爲:

1. m_iCount爲CUnitQueue中已經被用到的CUnit個數,m_iSize爲CUnitQueue中包含的CUnit總數。這裏會在CUnitQueue中CUnit的使用率超過90%時,進行容量的擴充操做increase()。

2. 再次檢查CUnitQueue中已經用到的CUnit個數是否超出CUnit的總個數,若超出則直接返回。既然有了前面第1步的保證,這裏的檢查是否還有必要呢?後面就會明白。

3. 經過一個兩層循環,來查找一個可用的CUnit。

CUnit的m_iFlag用來標記它是否已經被佔用,若m_iFlag值爲0,表示尚未被佔用,若爲1則表示已經被佔用。

外層循環用於遍歷CQEntry的循環鏈表,自m_pCurrQueue始,至m_pCurrQueue終。

內層循環則遍歷一個CQEntry中的全部CUnit,若找到可用的CUnit,會直接返回給調用者。這裏會在遍歷結束找不到可用CUnit的狀況下,再次檢查m_pCurrQueue->m_pUnit,也就是一個CQEntry的首個CUnit的狀況,讓人看上去也是有點奇怪。

4. 上一步中沒有找到可用的CUnit,這裏會再次執行increase()以擴充容量,並向調用這返回NULL。

接着再來看下用於擴充容量的increase()函數:

int CUnitQueue::increase() {
    // adjust/correct m_iCount
    int real_count = 0;
    CQEntry* p = m_pQEntry;
    while (p != NULL) {
        CUnit* u = p->m_pUnit;
        for (CUnit* end = u + p->m_iSize; u != end; ++u)
            if (u->m_iFlag != 0)
                ++real_count;

        if (p == m_pLastQueue)
            p = NULL;
        else
            p = p->m_pNext;
    }
    m_iCount = real_count;
    if (double(m_iCount) / m_iSize < 0.9)
        return -1;

    CQEntry* tempq = NULL;
    CUnit* tempu = NULL;
    char* tempb = NULL;

    // all queues have the same size
    int size = m_pQEntry->m_iSize;

    try {
        tempq = new CQEntry;
        tempu = new CUnit[size];
        tempb = new char[size * m_iMSS];
    } catch (...) {
        delete tempq;
        delete[] tempu;
        delete[] tempb;

        return -1;
    }

    for (int i = 0; i < size; ++i) {
        tempu[i].m_iFlag = 0;
        tempu[i].m_Packet.m_pcData = tempb + i * m_iMSS;
    }
    tempq->m_pUnit = tempu;
    tempq->m_pBuffer = tempb;
    tempq->m_iSize = size;

    m_pLastQueue->m_pNext = tempq;
    m_pLastQueue = tempq;
    m_pLastQueue->m_pNext = m_pQEntry;

    m_iSize += size;

    return 0;
}

int CUnitQueue::shrink() {
    // currently queue cannot be shrunk.
    return -1;
}

這個函數的執行過程以下:

1. 計算真正的已被佔用的CUnit的個數real_count。這個計算過程所涵蓋的CQEntry自m_pQEntry始,至m_pLastQueue(包含m_pLastQueue)。

2. 將計算所得的real_count賦值給m_iCount,並基於計算所得值,再次檢查CUnit的使用率,若使用率依然低於90%,則直接返回。不然則繼續執行真正的容量擴充的動做。

3. 在原有的CQEntry環形鏈表的基礎上,新建立一個CQEntry,這包括建立一個CQEntry對象,建立CUnit的數組,建立數據緩衝區,初始化CUnit數組及正確的設置CQEntry的各字段。

這個過程與CUnitQueue::init()中建立首個有效的CQEntry對象的過程基本一致。又是大段的重複code唉。

4. 將新建立的CQEntry對象插入CQEntry對象的環形鏈表中,在m_pLastQueue以後,m_pQEntry以前。因而可知,m_pQEntry用於記錄CQEntry對象的環形鏈表的頭節點,而m_pLastQueue用於記錄CQEntry對象的環形鏈表的尾節點

5. 更新m_iSize的值並返回0。

這就是CUnitQueue類自己提供的全部東西了。奇怪的是,咱們只能看到從CUnitQueue中獲取CUnit的代碼,而看不到想CUnitQueue中還回CUnit的代碼。主要的祕密就在於CUnitQueue有將CRcvQueue和CRcvBuffer聲明爲它的friend class,這就容許這個類能夠自由地修改CUnitQueue的成員。這種嚴重破壞封裝的設計,還真是讓人頭大。

數據接收

接着回到咱們的數據接收過程。創建鏈接以後,雙方傳遞的消息中都會包含有效Target SocketID字段,其CUDT會存在於CRcvQueue的m_pHash中,於是CRcvQueue::worker()主要是經過下面的這段code來將消息dispatch給接收數據的UDT Socket:

} else if (id > 0) {
            if (NULL != (u = self->m_pHash->lookup(id))) {
                if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion)) {
                    if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) {
                        if (0 == unit->m_Packet.getFlag())
                            u->processData(unit);
                        else
                            u->processCtrl(unit->m_Packet);

                        u->checkTimers();
                        self->m_pRcvUList->update(u);
                    }
                }
            } else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))) {
能夠看到,在這裏會先找到目標CUDT,而後在CUDT依然處於有效的鏈接狀態時,根據接收到的Packet的類型,將Packet dispatch給CUDT::processData()或 CUDT::processCtrl(),這裏研究數據packet的接收,於是主要來看CUDT::processData()的定義(src/core.cpp):

int CUDT::processData(CUnit* unit) {
    CPacket& packet = unit->m_Packet;

    // Just heard from the peer, reset the expiration count.
    m_iEXPCount = 1;
    uint64_t currtime;
    CTimer::rdtsc(currtime);
    m_ullLastRspTime = currtime;

    m_pCC->onPktReceived(&packet);
    ++m_iPktCount;
    // update time information
    m_pRcvTimeWindow->onPktArrival();

    // check if it is probing packet pair
    if (0 == (packet.m_iSeqNo & 0xF))
        m_pRcvTimeWindow->probe1Arrival();
    else if (1 == (packet.m_iSeqNo & 0xF))
        m_pRcvTimeWindow->probe2Arrival();

    ++m_llTraceRecv;
    ++m_llRecvTotal;

    int32_t offset = CSeqNo::seqoff(m_iRcvLastAck, packet.m_iSeqNo);
    if ((offset < 0) || (offset >= m_pRcvBuffer->getAvailBufSize()))
        return -1;

    if (m_pRcvBuffer->addData(unit, offset) < 0)
        return -1;

    // Loss detection.
    if (CSeqNo::seqcmp(packet.m_iSeqNo, CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0) {
        // If loss found, insert them to the receiver loss list
        m_pRcvLossList->insert(CSeqNo::incseq(m_iRcvCurrSeqNo), CSeqNo::decseq(packet.m_iSeqNo));

        // pack loss list for NAK
        int32_t lossdata[2];
        lossdata[0] = CSeqNo::incseq(m_iRcvCurrSeqNo) | 0x80000000;
        lossdata[1] = CSeqNo::decseq(packet.m_iSeqNo);

        // Generate loss report immediately.
        sendCtrl(3, NULL, lossdata, (CSeqNo::incseq(m_iRcvCurrSeqNo) == CSeqNo::decseq(packet.m_iSeqNo)) ? 1 : 2);

        int loss = CSeqNo::seqlen(m_iRcvCurrSeqNo, packet.m_iSeqNo) - 2;
        m_iTraceRcvLoss += loss;
        m_iRcvLossTotal += loss;
    }

    // This is not a regular fixed size packet...
    //an irregular sized packet usually indicates the end of a message, so send an ACK immediately
    if (packet.getLength() != m_iPayloadSize)
        CTimer::rdtsc(m_ullNextACKTime);

    // Update the current largest sequence number that has been received.
    // Or it is a retransmitted packet, remove it from receiver loss list.
    if (CSeqNo::seqcmp(packet.m_iSeqNo, m_iRcvCurrSeqNo) > 0)
        m_iRcvCurrSeqNo = packet.m_iSeqNo;
    else
        m_pRcvLossList->remove(packet.m_iSeqNo);

    return 0;
}

這個函數主要執行了這樣一個過程:

1. 將m_iEXPCount置爲1。m_iEXPCount具體的含義會在後面研究UDT的定時器時再來詳細地分析。

2. 獲取當前的時間,並賦值給m_ullLastRspTime。m_ullLastRspTime具體的含義會在後面研究UDT的定時器時再來詳細地分析。

3. 執行擁塞控制器m_pCC的回調,onPktReceived(&packet)。後面研究擁塞控制時會再來分析這個回調。

4. 遞增m_iPktCount。

5. 更新時間信息,執行接收時間窗口的m_pRcvTimeWindow->onPktArrival()。

6. 檢查是不是probing packet對,並根據具體狀況分別執行m_pRcvTimeWindow->probe1Arrival()或m_pRcvTimeWindow->probe2Arrival()。

7. 遞增m_llTraceRecv和m_llRecvTotal。用於進行統計的字段,前者表示一次trace,接收到的全部數據packet的個數,後者則表示自UDT Socket鏈接成功起,接收到的總的packet個數。二者的區別在於,前者會在執行CUDT::sample()獲取trace信息以後被複位,後者則不會。

8. 計算接收到的packet的SeqNo與m_iRcvLastAck的偏移offset。m_iRcvLastAck表示上一次發送的ACK確認的SeqNo,SeqNo值小於m_iRcvLastAck的全部packet都已經被接收或者再也不須要被接收,同時m_iRcvLastAck也是處於接收窗口中的SeqNo最小的packet。在鏈接創建的過程當中,雙方會進行協商來肯定m_iRcvLastAck,會被置爲雙方約定的首個數據packet的SeqNo。

9. 檢查offset值。若offset小於0,代表接收窗口已經滑過了接收到的這個packet,若大於接收緩衝區m_pRcvBuffer的可用大小,代表接收窗口尚未滑至包含接收到的packet的範圍。對於這兩種狀況,則直接返回-1退出,不然繼續執行。

10. 將unit添加進接收緩衝區。若失敗則返回-1退出,不然繼續執行。

11. 比較接收到的packet的SeqNo與CSeqNo::incseq(m_iRcvCurrSeqNo)。m_iRcvCurrSeqNo表示接收到的最大的Sequence number,在鏈接創建過程當中,雙方會協商肯定具體值,該值爲(雙方約定的首個數據packet的SeqNo - 1)。若packet.m_iSeqNo大於CSeqNo::incseq(m_iRcvCurrSeqNo),代表packet沒有連續到達,或可能有packet丟失。此時則會將[CSeqNo::incseq(m_iRcvCurrSeqNo),CSeqNo::decseq(packet.m_iSeqNo)]區間添加進接收丟失包列表m_pRcvLossList。並向發送端發送一個NACK,即Loss Report。

而後計算丟失的packet的個數,並更新m_iTraceRcvLoss和m_iRcvLossTotal,m_iTraceRcvLoss和m_iRcvLossTotal也是用來作統計的,用來統計packet丟失的數量。

12. 若是接收到的packet的數據長度不爲m_iPayloadSize,一般意味着這是一個Msg的結束packet,則應該向發送端發送一個ACK消息,因而將當前時間讀進m_ullNextACKTime。

可見在UDT中,對ACK消息的發送都不是在接收到數據包以後當即直接來進行的,而主要是在定時器Timer中進行的。

13. 比較packet.m_iSeqNo與m_iRcvCurrSeqNo,若前者較大,則會將後者更新爲前者的值,不然,多是接收到了一個本來丟失的packet,須要將該packet從丟失接收packet列表m_pRcvLossList中移除出去。

14. 返回0。

總結一下,在CRcvQueue::worker()這一層接收到數據包的dispatch過程,主要是將數據包保存在CUDT的接收緩衝區中。若是發現有疑似丟包現象,則將丟失的packet區間保存在接收包丟失列表中,並當即發送NACK消息給發送端,若是收到的packet爲Msg的最後一個packet,則調度發送ACK消息。

接收緩衝區CRcvBuffer

這裏再來看一下接收緩衝區CRcvBuffer。這個class的定義爲(src/buffer.h):

class CRcvBuffer {
 public:
    CRcvBuffer(CUnitQueue* queue, int bufsize = 65536);
    ~CRcvBuffer();

    // Functionality:
    //    Write data into the buffer.
    // Parameters:
    //    0) [in] unit: pointer to a data unit containing new packet
    //    1) [in] offset: offset from last ACK point.
    // Returned value:
    //    0 is success, -1 if data is repeated.
    int addData(CUnit* unit, int offset);

    // Functionality:
    //    Read data into a user buffer.
    // Parameters:
    //    0) [in] data: pointer to user buffer.
    //    1) [in] len: length of user buffer.
    // Returned value:
    //    size of data read.
    int readBuffer(char* data, int len);

    // Functionality:
    //    Read data directly into file.
    // Parameters:
    //    0) [in] file: C++ file stream.
    //    1) [in] len: expected length of data to write into the file.
    // Returned value:
    //    size of data read.
    int readBufferToFile(std::fstream& ofs, int len);

    // Functionality:
    //    Update the ACK point of the buffer.
    // Parameters:
    //    0) [in] len: size of data to be acknowledged.
    // Returned value:
    //    1 if a user buffer is fulfilled, otherwise 0.
    void ackData(int len);

    // Functionality:
    //    Query how many buffer space left for data receiving.
    // Parameters:
    //    None.
    // Returned value:
    //    size of available buffer space (including user buffer) for data receiving.
    int getAvailBufSize() const;

    // Functionality:
    //    Query how many data has been continuously received (for reading).
    // Parameters:
    //    None.
    // Returned value:
    //    size of valid (continous) data for reading.
    int getRcvDataSize() const;

    // Functionality:
    //    mark the message to be dropped from the message list.
    // Parameters:
    //    0) [in] msgno: message nuumer.
    // Returned value:
    //    None.
    void dropMsg(int32_t msgno);

    // Functionality:
    //    read a message.
    // Parameters:
    //    0) [out] data: buffer to write the message into.
    //    1) [in] len: size of the buffer.
    // Returned value:
    //    actuall size of data read.
    int readMsg(char* data, int len);

    // Functionality:
    //    Query how many messages are available now.
    // Parameters:
    //    None.
    // Returned value:
    //    number of messages available for recvmsg.
    int getRcvMsgNum();

 private:
    bool scanMsg(int& start, int& end, bool& passack);

 private:
    CUnit** m_pUnit;                     // pointer to the protocol buffer
    int m_iSize;                         // size of the protocol buffer
    CUnitQueue* m_pUnitQueue;		// the shared unit queue

    int m_iStartPos;                     // the head position for I/O (inclusive)
    int m_iLastAckPos;                   // the last ACKed position (exclusive)
    // EMPTY: m_iStartPos = m_iLastAckPos   FULL: m_iStartPos = m_iLastAckPos + 1
    int m_iMaxPos;			// the furthest data position

    int m_iNotch;			// the starting read point of the first unit

 private:
    CRcvBuffer();
    CRcvBuffer(const CRcvBuffer&);
    CRcvBuffer& operator=(const CRcvBuffer&);
};

僅僅由類的定義,實在是難以看出太多的信息。

構造與析構

接着來看一下成員函數的實現,首先是構造函數和析構函數(src/buffer.cpp):

CRcvBuffer::CRcvBuffer(CUnitQueue* queue, int bufsize)
        : m_pUnit(NULL),
          m_iSize(bufsize),
          m_pUnitQueue(queue),
          m_iStartPos(0),
          m_iLastAckPos(0),
          m_iMaxPos(0),
          m_iNotch(0) {
    m_pUnit = new CUnit*[m_iSize];
    for (int i = 0; i < m_iSize; ++i)
        m_pUnit[i] = NULL;
}

CRcvBuffer::~CRcvBuffer() {
    for (int i = 0; i < m_iSize; ++i) {
        if (NULL != m_pUnit[i]) {
            m_pUnit[i]->m_iFlag = 0;
            --m_pUnitQueue->m_iCount;
        }
    }

    delete[] m_pUnit;
}

在構造函數中主要是常見了一個CUnit指針的數組m_pUnit,長度爲m_iSize,而後初始化它。而在析構函數中則主要是更新抓到的CUnit的狀態爲未被使用狀態,並適當地更新m_pUnitQueue的已用CUnit個數字段m_iCount,而後delete掉在構造函數中分配的CUnit指針數組。

addData

而後來看CRcvBuffer::addData():

int CRcvBuffer::addData(CUnit* unit, int offset) {
    int pos = (m_iLastAckPos + offset) % m_iSize;
    if (offset > m_iMaxPos)
        m_iMaxPos = offset;

    if (NULL != m_pUnit[pos])
        return -1;

    m_pUnit[pos] = unit;

    unit->m_iFlag = 1;
    ++m_pUnitQueue->m_iCount;

    return 0;
}

這個函數的執行過程爲:

1. 計算unit在接受緩衝區的CUnit指針數組m_pUnit中的存放位置pos。

2. 若offset大於m_iMaxPos,則會將m_iMaxPos更新爲offset。

3. 將unit放進CUnit指針數組m_pUnit的pos位置處。

4. 將unit的m_iFlag置爲1,表示該unit被佔用,在CUnitQueue::getNextAvailUnit()中不能再被返回用於存放接收來的數據了。

5. 遞增m_pUnitQueue的m_iCount,m_pUnitQueue的m_iCount表示它的已被佔用CUnit的個數。

6. 返回0值。

CRcvBuffer中抓到CUnitQueue的指針,而後對後者的私有成員變量搞東搞西,也真是讓人醉了。

readBuffer

接收緩衝區CRcvBuffer就如同咱們前面分析的發送緩衝區CSndBuffer同樣,一邊有線程向裏面塞數據,另外一邊有線程從中讀取數據,向其中塞數據的正是咱們剛剛分析的CRcvQueue::worker()->...->CRcvBuffer::addData()。接着咱們就看看從中讀取數據那一邊的函數,先來看RcvBuffer::readBuffer():

int CRcvBuffer::readBuffer(char* data, int len) {
    int p = m_iStartPos;
    int lastack = m_iLastAckPos;
    int rs = len;

    while ((p != lastack) && (rs > 0)) {
        int unitsize = m_pUnit[p]->m_Packet.getLength() - m_iNotch;
        if (unitsize > rs)
            unitsize = rs;

        memcpy(data, m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize);
        data += unitsize;

        if ((rs > unitsize) || (rs == m_pUnit[p]->m_Packet.getLength() - m_iNotch)) {
            CUnit* tmp = m_pUnit[p];
            m_pUnit[p] = NULL;
            tmp->m_iFlag = 0;
            --m_pUnitQueue->m_iCount;

            if (++p == m_iSize)
                p = 0;

            m_iNotch = 0;
        } else
            m_iNotch += rs;

        rs -= unitsize;
    }

    m_iStartPos = p;
    return len - rs;
}

這個函數主要經過一個循環來儘量多的讀取已經接收到的完整連續的數據packet。

m_iStartPos爲下一次讀取時,讀取的起始packet在m_pUnit數組中的index,即index在m_iStartPos以前的全部packet/CUnit都已經被讀取了。

m_iLastAckPos表示最後被ACK的packet的index,即index在m_iLastAckPos以前的全部packet都已經收到了,m_iLastAckPos爲讀取的結束位置。

在傳入的數據緩衝區尚未塞滿,同時尚未讀到結束位置以前一直會嘗試着去讀取。

在循環體中會試圖去讀取一個數據packet中包含的數據。這個讀取過程爲:

1. 計算要讀取的packet中剩餘未讀取的數據的數量unitsize。一個已經徹底接收到的數據packet中的數據,可能因爲調用者請求的數據量比較小,而存在剩餘的未讀取的數據在。m_iNotch用於記錄 要讀取的packet中,上一次讀取時讀到的數據的大小。

2. 比較unitsize和rs,若unitsize大於rs,則將unitsize設置爲rs。rs表示剩餘要讀取的數據的大小,unitsize表示本輪讀取操做應該讀取的數據的大小。經過這個地方的檢查,能夠防止向data緩衝區中寫入數據時超出邊界。

3. 執行一次memcpy,將packet中的數據讀取到data緩衝區中,並更新data緩衝區的位置。

4. 若是rs大於unitsize或rs等於(m_pUnit[p]->m_Packet.getLength() - m_iNotch),代表當前的這個packet中的數據已經徹底被讀取了,此時須要將這個packet歸還給CUnitQueue m_pUnitQueue,跳到下一個CUnit/packet,即遞增p,並將m_iNotch置爲0。這個地方的if語句等價於下面的這個if語句:

if (rs >= m_pUnit[p]->m_Packet.getLength() - m_iNotch) {

或下面的這個:

if (unitsize == m_pUnit[p]->m_Packet.getLength() - m_iNotch) {

個語句看起來還要更加清晰簡潔一點呢。

5. 對於當前的這個packet中的數據沒有被讀完的狀況則是簡單的將m_iNotch加rs,此時rs應該等於unitsize。就各個變量的語義而言,這裏彷佛給m_iNotch加unitsize要更好讀一點。

6. 在循環體的結尾處會更新rs,主要是從中去除本次已經讀取的大小。

7. 在循環體外會更新m_iStartPos爲p。

8. 向調用者返回實際讀取的數據的大小。

readBufferToFile

接着咱們再來看一下CRcvBuffer::readBufferToFile():

int CRcvBuffer::readBufferToFile(fstream& ofs, int len) {
    int p = m_iStartPos;
    int lastack = m_iLastAckPos;
    int rs = len;

    while ((p != lastack) && (rs > 0)) {
        int unitsize = m_pUnit[p]->m_Packet.getLength() - m_iNotch;
        if (unitsize > rs)
            unitsize = rs;

        ofs.write(m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize);
        if (ofs.fail())
            break;

        if ((rs > unitsize) || (rs == m_pUnit[p]->m_Packet.getLength() - m_iNotch)) {
            CUnit* tmp = m_pUnit[p];
            m_pUnit[p] = NULL;
            tmp->m_iFlag = 0;
            --m_pUnitQueue->m_iCount;

            if (++p == m_iSize)
                p = 0;

            m_iNotch = 0;
        } else
            m_iNotch += rs;

        rs -= unitsize;
    }

    m_iStartPos = p;

    return len - rs;
}

這個函數的執行與CRcvBuffer::readBuffer()的執行極其類似。僅有的差異在於,後者是將一個packet的數據memcpy進調用者傳入的數據緩衝區,而這個函數則是將數據write進傳入的fstream,並會在失敗時跳出循環。

又是大段大段的重複code。

readMsg

而後再來看一下CRcvBuffer::readMsg():

int CRcvBuffer::readMsg(char* data, int len) {
    int p, q;
    bool passack;
    if (!scanMsg(p, q, passack))
        return 0;

    int rs = len;
    while (p != (q + 1) % m_iSize) {
        int unitsize = m_pUnit[p]->m_Packet.getLength();
        if ((rs >= 0) && (unitsize > rs))
            unitsize = rs;

        if (unitsize > 0) {
            memcpy(data, m_pUnit[p]->m_Packet.m_pcData, unitsize);
            data += unitsize;
            rs -= unitsize;
        }

        if (!passack) {
            CUnit* tmp = m_pUnit[p];
            m_pUnit[p] = NULL;
            tmp->m_iFlag = 0;
            --m_pUnitQueue->m_iCount;
        } else
            m_pUnit[p]->m_iFlag = 2;

        if (++p == m_iSize)
            p = 0;
    }

    if (!passack)
        m_iStartPos = (q + 1) % m_iSize;

    return len - rs;
}

在這個函數中主要完成了以下過程:

1. 執行scanMsg()找到下一個未讀取的Msg相關的信息,若找不到則直接返回0,不然繼續執行。

p爲Msg的起始packet的index,q爲結束packet的index。UDT的Msg與UDP的Msg有類似的地方,它們都是隻能總體的讀寫,但又有很是大的不一樣,前者的長度能夠很是長,然後者則要短許多。

2. 經過一個循環讀取index範圍在[p, q]的全部packet,循環每執行一輪,就讀取一個packet,但這裏老是整個整個packet的讀。在循環體中,一個packet的讀取過程大致爲:

(1). 獲取當前packet可讀的數據的長度unitsize。

(2). 主要是根據rs值,也就是傳入的數據緩衝區剩餘長度值,來修正unitsize,以防止後面的memcpy操做越界。

(3). 在unitsize大於0時,執行memcpy將數據複製進傳入的數據緩衝區。並更新數據緩衝區data的位置,和傳入的數據緩衝區剩餘長度值rs。

(4). 若是passack爲false,則會這個packet歸還給CUnitQueue m_pUnitQueue;若是爲true,則會將讀取的CUnit的m_iFlag置爲2。

(5). 跳到下一個CUnit/packet,即遞增p。

由這段邏輯可見,若是傳入的數據緩衝區空間不夠大的話,那麼會在把數據緩衝區塞滿以後,將相同的Msg後面的全部數據直接丟棄。

3. 若是passack爲false,則會更新m_iStartPos爲(q + 1) % m_iSize。

4. 返回讀入到數據緩衝區中的數據的長度。

這裏再來看一下CRcvBuffer::scanMsg():

bool CRcvBuffer::scanMsg(int& p, int& q, bool& passack) {
    // empty buffer
    if ((m_iStartPos == m_iLastAckPos) && (m_iMaxPos <= 0))
        return false;

    //skip all bad msgs at the beginning
    while (m_iStartPos != m_iLastAckPos) {
        if (NULL == m_pUnit[m_iStartPos]) {
            if (++m_iStartPos == m_iSize)
                m_iStartPos = 0;
            continue;
        }

        if ((1 == m_pUnit[m_iStartPos]->m_iFlag) && (m_pUnit[m_iStartPos]->m_Packet.getMsgBoundary() > 1)) {
            bool good = true;

            // look ahead for the whole message
            for (int i = m_iStartPos; i != m_iLastAckPos;) {
                if ((NULL == m_pUnit[i]) || (1 != m_pUnit[i]->m_iFlag)) {
                    good = false;
                    break;
                }

                if ((m_pUnit[i]->m_Packet.getMsgBoundary() == 1) || (m_pUnit[i]->m_Packet.getMsgBoundary() == 3))
                    break;

                if (++i == m_iSize)
                    i = 0;
            }

            if (good)
                break;
        }

        CUnit* tmp = m_pUnit[m_iStartPos];
        m_pUnit[m_iStartPos] = NULL;
        tmp->m_iFlag = 0;
        --m_pUnitQueue->m_iCount;

        if (++m_iStartPos == m_iSize)
            m_iStartPos = 0;
    }

    p = -1;                  // message head
    q = m_iStartPos;         // message tail
    passack = m_iStartPos == m_iLastAckPos;
    bool found = false;

    // looking for the first message
    for (int i = 0, n = m_iMaxPos + getRcvDataSize(); i <= n; ++i) {
        if ((NULL != m_pUnit[q]) && (1 == m_pUnit[q]->m_iFlag)) {
            switch (m_pUnit[q]->m_Packet.getMsgBoundary()) {
                case 3:  // 11
                    p = q;
                    found = true;
                    break;

                case 2:  // 10
                    p = q;
                    break;

                case 1:  // 01
                    if (p != -1)
                        found = true;
                    break;
            }
        } else {
            // a hole in this message, not valid, restart search
            p = -1;
        }

        if (found) {
            // the msg has to be ack'ed or it is allowed to read out of order, and was not read before
            if (!passack || !m_pUnit[q]->m_Packet.getMsgOrderFlag())
                break;

            found = false;
        }

        if (++q == m_iSize)
            q = 0;

        if (q == m_iLastAckPos)
            passack = true;
    }

    // no msg found
    if (!found) {
        // if the message is larger than the receiver buffer, return part of the message
        if ((p != -1) && ((q + 1) % m_iSize == p))
            found = true;
    }

    return found;
}

1. 在這個函數中會先檢查一下是不是空buffer,如果則直接返回false,表示沒有Msg可讀,不然繼續執行。

2. 經過一個兩層循環來檢查是否有一個Msg存在,在外層循環中主要來查找Msg的起始packet,而在內存循環中則主要用於查找結束packet。更具體的來看外層循環作的事情:

(1). 檢查m_pUnit[m_iStartPos]是否爲null,若爲null,則遞增m_iStartPos並進入下一輪循環,以跳過開頭一些無效的packet,不然繼續執行。

(2). 若m_iStartPos指向的packet已經被佔用,同時它的MsgBoundary大於1,則代表這是一個Msg的起始packet。此時則先進入內層循環,從m_iStartPos開始至m_iLastAckPos結束,尋找Msg的結束packet。good變量用於表示是否找到了一個完整的Msg或一箇中間沒有洞但還未徹底接收到的Msg。

在內層循環中,會檢查一下當前CUnit是否爲null或還未處於被佔用狀態,如果則代表Msg中有洞,於是將good置爲false,直接跳出內層循環,不然繼續執行內層循環後面的邏輯。而後檢查當前CUnit的MsgBoundary值是否爲1,或爲3,如果則代表找到了Msg的結束packet,則直接跳出內層循環,不然繼續執行內層循環後面的邏輯。最後遞增循環計數器。

若是good爲true,則跳出外層循環,不然繼續執行。

關於MsgBoundary的判斷,能夠對照CSndBuffer::addBuffer()的code來看。

(3). 將m_iStartPos指向的當前CUnit還回給CUnitQueue m_pUnitQueue。

(4). 遞增m_iStartPos。

內層循環會在一個Msg中有空洞時,將good置爲false,並從內層循環中跳出。因而可知,在內層循環外,檢查good值,發現good爲false時就知道應該跳過並還回index範圍在區間[m_iStartPos,i]內的全部CUnit了。但index範圍在區間[m_iStartPos,i]內的全部CUnit倒是由外層循環最後的那一段code來執行跳過和還回的。

總感受那段查找一個Msg的code應該寫成下面這樣才比較好一點:

if ((1 == m_pUnit[m_iStartPos]->m_iFlag) && (m_pUnit[m_iStartPos]->m_Packet.getMsgBoundary() = 3)) {
            break;
        } else if ((1 == m_pUnit[m_iStartPos]->m_iFlag) && (m_pUnit[m_iStartPos]->m_Packet.getMsgBoundary() = 2)) {
            bool good = true;

            // look ahead for the whole message
            for (int i = m_iStartPos + 1; i != m_iLastAckPos;) {
                if ((NULL == m_pUnit[i]) || (1 != m_pUnit[i]->m_iFlag)) {
                    good = false;
                    break;
                }

                if (m_pUnit[i]->m_Packet.getMsgBoundary() == 2 || m_pUnit[i]->m_Packet.getMsgBoundary() == 3) {
                    good = false;
                    break;
                } else if (m_pUnit[i]->m_Packet.getMsgBoundary() == 1) {
                    break;
                }

                if (++i == m_iSize)
                    i = 0;
            }

            if (good)
                break;
        }

這種寫法將單個packet Msg和多packet Msg的處理邏輯分開,明顯要清晰許多。

3. 肯定Msg的具體邊界。

在前一個步驟中,已經有跳過全部的含空洞的Msg等無效Msg,但在這裏依然作了是否爲NULL的判斷。

passack爲true表示對於Msg的查找已經到了最後Ack的那個packet了。這段code彷佛修改一下循環控制部分看起來會更清晰一點:

for (q = m_iStartPos; q != m_iLastAckPos;) {

4. 若是沒有找到Msg,同時(p != -1) && ((q + 1) % m_iSize == p),則代表一個Msg的packet把整個接收緩衝區都充爆了,仍是沒有消息的終止packet,則會讓調用端先讀取一部分。

對於接收緩衝區CRcvBuffer的研究就先到這裏。

應用程序接收數據

來看下UDT都給應用程序提供了哪些API以用於接收數據。UDT提供了以下的幾個函數用於不一樣方式的數據接收:

UDT_API int recv(UDTSOCKET u, char* buf, int len, int flags);

UDT_API int recvmsg(UDTSOCKET u, char* buf, int len);

UDT_API int64_t recvfile(UDTSOCKET u, std::fstream& ofs, int64_t& offset, int64_t size, int block = 7280000);

UDT_API int64_t recvfile2(UDTSOCKET u, const char* path, int64_t* offset, int64_t size, int block = 7280000);

recv()用於進行流式的數據接收。recvmsg()用於進行數據報式的數據接收。recvfile()和recvfile2()用來直接將流式接收的數據寫入文件,二者的執行過程基本相同,僅有的區別就在於以不一樣的形式來提供文件參數。

recv

先來看一下UDT::recv()函數:

int CUDT::recv(char* data, int len) {
    if (UDT_DGRAM == m_iSockType)
        throw CUDTException(5, 10, 0);

    // throw an exception if not connected
    if (!m_bConnected)
        throw CUDTException(2, 2, 0);
    else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()))
        throw CUDTException(2, 1, 0);

    if (len <= 0)
        return 0;

    CGuard recvguard(m_RecvLock);

    if (0 == m_pRcvBuffer->getRcvDataSize()) {
        if (!m_bSynRecving)
            throw CUDTException(6, 2, 0);
        else {
#ifndef WIN32
            pthread_mutex_lock(&m_RecvDataLock);
            if (m_iRcvTimeOut < 0) {
                while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
                    pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
            } else {
                uint64_t exptime = CTimer::getTime() + m_iRcvTimeOut * 1000ULL;
                timespec locktime;

                locktime.tv_sec = exptime / 1000000;
                locktime.tv_nsec = (exptime % 1000000) * 1000;

                while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize())) {
                    pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &locktime);
                    if (CTimer::getTime() >= exptime)
                        break;
                }
            }
            pthread_mutex_unlock(&m_RecvDataLock);
#else
            if (m_iRcvTimeOut < 0)
            {
                while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
                WaitForSingleObject(m_RecvDataCond, INFINITE);
            }
            else
            {
                uint64_t enter_time = CTimer::getTime();

                while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
                {
                    int diff = int(CTimer::getTime() - enter_time) / 1000;
                    if (diff >= m_iRcvTimeOut)
                    break;
                    WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut - diff ));
                }
            }
#endif
        }
    }

    // throw an exception if not connected
    if (!m_bConnected)
        throw CUDTException(2, 2, 0);
    else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()))
        throw CUDTException(2, 1, 0);

    int res = m_pRcvBuffer->readBuffer(data, len);

    if (m_pRcvBuffer->getRcvDataSize() <= 0) {
        // read is not available any more
        s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
    }

    if ((res <= 0) && (m_iRcvTimeOut >= 0))
        throw CUDTException(6, 3, 0);

    return res;
}


int CUDT::recv(UDTSOCKET u, char* buf, int len, int) {
    try {
        CUDT* udt = s_UDTUnited.lookup(u);
        return udt->recv(buf, len);
    } catch (CUDTException &e) {
        s_UDTUnited.setError(new CUDTException(e));
        return ERROR;
    } catch (...) {
        s_UDTUnited.setError(new CUDTException(-1, 0, 0));
        return ERROR;
    }
}


int recv(UDTSOCKET u, char* buf, int len, int flags) {
    return CUDT::recv(u, buf, len, flags);
}

這個API的實現結構與UDT::send()很是類似,此處再也不贅述。直接來看CUDT::recv(char* data, int len)的執行:

1. 檢查UDT Socket的類型是否爲UDT_DGRAM,如果則直接拋異常退出。這個檢查限定只有類型爲UDT_STREAM的UDT Socket才能執行recv操做。

2. 檢查CUDT的狀態,若不處於有效的鏈接狀態,或處於正在關閉狀態且沒有額外的數據能夠讀取會直接拋異常退出。這一步用於保證只有處於有效的鏈接狀態,或正在關閉但有數據未讀完的CUDT才能recv。

3. 檢查參數len,也就是數據緩衝區參數的長度是否爲一個有效的值,即大於0。若不是則沒有必要執行後續的recv動做,直接返回0值退出。

4. 在接收數據大小爲0,同時UDT Socket處於有效的鏈接狀態時等待數據的到達。

case 1:UDT Socket處於非m_bSynRecving狀態,則直接拋異常退出。

case 2:UDT Socket處於m_bSynRecving狀態,m_iRcvTimeOut小於0,此時只要UDT Socket依然處於有效的鏈接狀態,就無限制的等待。

case 3:UDT Socket處於m_bSynRecving狀態,m_iRcvTimeOut大於等於0,此時則最長等待m_iRcvTimeOut ms。

5. 再次檢查CUDT的狀態,若不處於有效的鏈接狀態,或處於正在關閉狀態且沒有額外的數據能夠讀取會直接拋異常退出。在這些條件下無需執行後面進一步的數據讀取操做。

6. 執行m_pRcvBuffer->readBuffer(data, len)從接收緩衝區中讀取數據。

7. 若沒有讀到數據,並且m_iRcvTimeOut則說明是讀取超時了,則拋出異常退出,不然繼續執行。

8. 返回讀到的數據的長度。

recvmsg

而後來看UDT::recvmsg():

int CUDT::recvmsg(char* data, int len) {
    if (UDT_STREAM == m_iSockType)
        throw CUDTException(5, 9, 0);

    // throw an exception if not connected
    if (!m_bConnected)
        throw CUDTException(2, 2, 0);

    if (len <= 0)
        return 0;

    CGuard recvguard(m_RecvLock);

    if (m_bBroken || m_bClosing) {
        int res = m_pRcvBuffer->readMsg(data, len);

        if (m_pRcvBuffer->getRcvMsgNum() <= 0) {
            // read is not available any more
            s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
        }

        if (0 == res)
            throw CUDTException(2, 1, 0);
        else
            return res;
    }

    if (!m_bSynRecving) {
        int res = m_pRcvBuffer->readMsg(data, len);
        if (0 == res)
            throw CUDTException(6, 2, 0);
        else
            return res;
    }

    int res = 0;
    bool timeout = false;

    do {
#ifndef WIN32
        pthread_mutex_lock(&m_RecvDataLock);

        if (m_iRcvTimeOut < 0) {
            while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_pRcvBuffer->readMsg(data, len))))
                pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
        } else {
            uint64_t exptime = CTimer::getTime() + m_iRcvTimeOut * 1000ULL;
            timespec locktime;

            locktime.tv_sec = exptime / 1000000;
            locktime.tv_nsec = (exptime % 1000000) * 1000;

            if (pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &locktime) == ETIMEDOUT)
                timeout = true;

            res = m_pRcvBuffer->readMsg(data, len);
        }
        pthread_mutex_unlock(&m_RecvDataLock);
#else
        if (m_iRcvTimeOut < 0)
        {
            while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_pRcvBuffer->readMsg(data, len))))
            WaitForSingleObject(m_RecvDataCond, INFINITE);
        }
        else
        {
            if (WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut)) == WAIT_TIMEOUT)
            timeout = true;

            res = m_pRcvBuffer->readMsg(data, len);
        }
#endif

        if (m_bBroken || m_bClosing)
            throw CUDTException(2, 1, 0);
        else if (!m_bConnected)
            throw CUDTException(2, 2, 0);
    } while ((0 == res) && !timeout);

    if (m_pRcvBuffer->getRcvMsgNum() <= 0) {
        // read is not available any more
        s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
    }

    if ((res <= 0) && (m_iRcvTimeOut >= 0))
        throw CUDTException(6, 3, 0);

    return res;
}



int CUDT::recvmsg(UDTSOCKET u, char* buf, int len) {
    try {
        CUDT* udt = s_UDTUnited.lookup(u);
        return udt->recvmsg(buf, len);
    } catch (CUDTException e) {
        s_UDTUnited.setError(new CUDTException(e));
        return ERROR;
    } catch (...) {
        s_UDTUnited.setError(new CUDTException(-1, 0, 0));
        return ERROR;
    }
}



int recvmsg(UDTSOCKET u, char* buf, int len) {
    return CUDT::recvmsg(u, buf, len);
}

這個API的實現結構與UDT::recv()很是類似,此處再也不贅述。直接來看CUDT::recvmsg(char* data, int len)的執行:

1. 檢查UDT Socket的類型是否爲UDT_STREAM,如果則直接拋異常退出。這個檢查限定只有類型爲UDT_DGRAM的UDT Socket才能執行recvmsg操做。

2. 處理UDT Socket處於非Connected的狀況。處理方法是,直接拋異常退出。

3. 檢查參數len,也就是數據緩衝區參數的長度是否爲一個有效的值,即大於0。若不是則沒有必要執行後續的recvmsg動做,直接返回0值退出。

4. 處理UDT Socket依然處於Connected狀態,但同時已經在Closing或被Broken時的狀況。處理方法便是,嘗試讀取一個Msg。若讀取到了數據,就返回讀取到的數據的長度;若沒有讀取到就拋出一個異常退出。

5. 處理UDT Socket處於有效的鏈接狀態,可是不是SynRecving模式的狀況。處理方法便是,嘗試讀取一個Msg。若讀取到了數據,就返回讀取到的數據的長度;若沒有讀取到就拋出一個異常退出。

6. 處理UDT Socket處於有效的鏈接狀態,且是SynRecving模式的狀況。這種狀況會在讀取不到數據時進行等待,而後再嘗試讀取。這裏用了一個do-while循環來實現等待-讀取的邏輯。分兩種狀況來處理,一是m_iRcvTimeOut小於0,即只要UDT Socket處於有效的鏈接狀態時就無限制等待的狀況;二是m_iRcvTimeOut大於等於0,即UDT Socket處於有效的鏈接狀態的狀況下等待必定時間的狀況。

在循環體的最後還會處理因爲UDT Socket狀態改變而致使等待過程被終止的狀況。

循環會在沒有接收到數據同時沒有超時的狀況下一直等待。循環體即將結束時對於狀態的檢查使得狀態的改變可以終結等待過程,而不會出現死循環。

7. 循環退出以後,發現是因爲timeout而結束的狀況就拋個異常退出。

8. 一切順利讀取了Msg的數據,則返回讀取到的數據的長度。

這個函數作的事情與UDT::recv()作的事情極其類似,可是寫法的差異看上去比較大。不一樣寫法個有好壞吧。

recvfilerecvfile2

接着再來看一下recvfile和recvfile2

int64_t CUDT::recvfile(fstream& ofs, int64_t& offset, int64_t size, int block) {
    if (UDT_DGRAM == m_iSockType)
        throw CUDTException(5, 10, 0);

    if (!m_bConnected)
        throw CUDTException(2, 2, 0);
    else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()))
        throw CUDTException(2, 1, 0);

    if (size <= 0)
        return 0;

    CGuard recvguard(m_RecvLock);

    int64_t torecv = size;
    int unitsize = block;
    int recvsize;

    // positioning...
    try {
        ofs.seekp((streamoff) offset);
    } catch (...) {
        throw CUDTException(4, 3);
    }

    // receiving... "recvfile" is always blocking
    while (torecv > 0) {
        if (ofs.fail()) {
            // send the sender a signal so it will not be blocked forever
            int32_t err_code = CUDTException::EFILE;
            sendCtrl(8, &err_code);

            throw CUDTException(4, 4);
        }

#ifndef WIN32
        pthread_mutex_lock(&m_RecvDataLock);
        while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
            pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
        pthread_mutex_unlock(&m_RecvDataLock);
#else
        while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
        WaitForSingleObject(m_RecvDataCond, INFINITE);
#endif

        if (!m_bConnected)
            throw CUDTException(2, 2, 0);
        else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()))
            throw CUDTException(2, 1, 0);

        unitsize = int((torecv >= block) ? block : torecv);
        recvsize = m_pRcvBuffer->readBufferToFile(ofs, unitsize);

        if (recvsize > 0) {
            torecv -= recvsize;
            offset += recvsize;
        }
    }

    if (m_pRcvBuffer->getRcvDataSize() <= 0) {
        // read is not available any more
        s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
    }

    return size - torecv;
}


int64_t CUDT::recvfile(UDTSOCKET u, fstream& ofs, int64_t& offset, int64_t size, int block) {
    try {
        CUDT* udt = s_UDTUnited.lookup(u);
        return udt->recvfile(ofs, offset, size, block);
    } catch (CUDTException &e) {
        s_UDTUnited.setError(new CUDTException(e));
        return ERROR;
    } catch (...) {
        s_UDTUnited.setError(new CUDTException(-1, 0, 0));
        return ERROR;
    }
}


int64_t recvfile(UDTSOCKET u, fstream& ofs, int64_t& offset, int64_t size, int block) {
    return CUDT::recvfile(u, ofs, offset, size, block);
}



int64_t recvfile2(UDTSOCKET u, const char* path, int64_t* offset, int64_t size, int block) {
    fstream ofs(path, ios::binary | ios::out);
    int64_t ret = CUDT::recvfile(u, ofs, *offset, size, block);
    ofs.close();
    return ret;
}

這個API的實現結構與UDT::recv()和CUDT::recvmsg()很是類似,此處再也不贅述。直接來看CUDT::recvfile(fstream& ofs, int64_t& offset, int64_t size, int block)的執行:

1. 與UDT::recv()同樣,確認UDT Socket的類型爲UDT_STREAM

2. 檢查CUDT的狀態,處理方式也UDT::recv()的對應部分同樣。

3. 檢查參數size,也就是要讀取的數據長度爲一個有效值,即大於0。若不是則沒有必要執行後續的recvfile動做,直接返回0值退出。

4. seek到文件的目的offset,若失敗就拋出異常退出。

5. 經過一個循環不停地讀取文件,直到讀取了指望的數據大小。在循環體中:

(1). 首先檢查ofs是否處於fail狀態,如果就向發送端發送一個Error消息,不然繼續執行。

(2). 在接收緩衝區中無數據可讀時等待,並且是不顧及m_iRcvTimeOut及m_bSynRecving模式的無限制等待,直到UDT Socket再也不處於有效的鏈接狀態。

(3). 檢查UDT Socket的狀態,是否再也不處於有效的鏈接狀態且沒有其它數據能夠讀了,如果就拋異常退出。

(4). 經過torecv和block值計算本次讀操做應該讀取的數據的大小。

(5). 由接收緩衝區向文件讀取數據。

(6). 讀到了數據時更新torecv及偏移兩offset。

這個循環僅有的退出條件就是一些異常的狀況,ofs fail,或者UDT Socket再也不處於有效的鏈接狀態。

6. 返回實際讀取到的數據的長度。固然這個地方的計算過程略顯莫名其妙,就前面讀取數據的循環而言,在讀取到請求的數據長度以前貌似是不會結束的。

應用這個函數所須要的條件與UDT::recv()比較類似。但執行的操做又差異比較大,在這個函數中貌似就是必定要讀到請求大小的數據纔會結束。

Done。

相關文章
相關標籤/搜索