UDT協議實現分析——UDT Socket的建立

UDT API的用法

在分析 鏈接的創建過程 以前,先來看一下UDT API的用法。在UDT網絡中,一般要有一個UDT Server監聽在某臺機器的某個UDP端口上,等待客戶端的鏈接;有一個或多個客戶端鏈接UDT Server;UDT Server接收到來自客戶端的鏈接請求後,建立另一個單獨的UDT Socket用於與該客戶端進行通訊。ios

先來看一下UDT Server的簡單的實現,UDT的開發者已經提供了一些demo程序可供參考,位於app/目錄下。編程

#include <unistd.h>
#include <cstdlib>
#include <cstring>
#include <netdb.h>

#include <iostream>
#include <udt.h>

using namespace std;

void* recvdata(void*);

struct UDTUpDown {
    UDTUpDown() {
        // use this function to initialize the UDT library
        UDT::startup();
    }
    ~UDTUpDown() {
        // use this function to release the UDT library
        UDT::cleanup();
    }
};

int main(int argc, char* argv[]) {
    if ((1 != argc) && ((2 != argc) || (0 == atoi(argv[1])))) {
        cout << "usage: appserver [server_port]" << endl;
        return 0;
    }

    // Automatically start up and clean up UDT module.
    UDTUpDown _udt_;

    addrinfo hints;
    addrinfo* res;

    memset(&hints, 0, sizeof(struct addrinfo));

    hints.ai_flags = AI_PASSIVE;
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    //hints.ai_socktype = SOCK_DGRAM;

    string service("9000");
    if (2 == argc)
        service = argv[1];

    if (0 != getaddrinfo(NULL, service.c_str(), &hints, &res)) {
        cout << "illegal port number or port is busy.\n" << endl;
        return 0;
    }

    UDTSOCKET serv = UDT::socket(res->ai_family, res->ai_socktype, res->ai_protocol);

    // UDT Options
    //UDT::setsockopt(serv, 0, UDT_CC, new CCCFactory<CUDPBlast>, sizeof(CCCFactory<CUDPBlast>));
    //UDT::setsockopt(serv, 0, UDT_MSS, new int(9000), sizeof(int));
    //UDT::setsockopt(serv, 0, UDT_RCVBUF, new int(10000000), sizeof(int));
    //UDT::setsockopt(serv, 0, UDP_RCVBUF, new int(10000000), sizeof(int));

    if (UDT::ERROR == UDT::bind(serv, res->ai_addr, res->ai_addrlen)) {
        cout << "bind: " << UDT::getlasterror().getErrorMessage() << endl;
        return 0;
    }

    freeaddrinfo(res);

    cout << "server is ready at port: " << service << endl;

    if (UDT::ERROR == UDT::listen(serv, 10)) {
        cout << "listen: " << UDT::getlasterror().getErrorMessage() << endl;
        return 0;
    }

    sockaddr_storage clientaddr;
    int addrlen = sizeof(clientaddr);

    UDTSOCKET recver;

    while (true) {
        if (UDT::INVALID_SOCK == (recver = UDT::accept(serv, (sockaddr*) &clientaddr, &addrlen))) {
            cout << "accept: " << UDT::getlasterror().getErrorMessage() << endl;
            return 0;
        }

        char clienthost[NI_MAXHOST];
        char clientservice[NI_MAXSERV];
        getnameinfo((sockaddr *) &clientaddr, addrlen, clienthost, sizeof(clienthost), clientservice,
                    sizeof(clientservice), NI_NUMERICHOST | NI_NUMERICSERV);
        cout << "new connection: " << clienthost << ":" << clientservice << endl;

        pthread_t rcvthread;
        pthread_create(&rcvthread, NULL, recvdata, new UDTSOCKET(recver));
        pthread_detach(rcvthread);
    }

    UDT::close(serv);

    return 0;
}

void* recvdata(void* usocket) {
    UDTSOCKET recver = *(UDTSOCKET*) usocket;
    delete (UDTSOCKET*) usocket;

    char* data;
    int size = 100000;
    data = new char[size];

    while (true) {
        int rsize = 0;
        int rs;
        while (rsize < size) {
            int rcv_size;
            int var_size = sizeof(int);
            UDT::getsockopt(recver, 0, UDT_RCVDATA, &rcv_size, &var_size);
            if (UDT::ERROR == (rs = UDT::recv(recver, data + rsize, size - rsize, 0))) {
                cout << "recv:" << UDT::getlasterror().getErrorMessage() << endl;
                break;
            }

            rsize += rs;
        }

        if (rsize < size)
            break;
    }

    delete[] data;

    UDT::close(recver);

    return NULL;
}

1. 如在  UDT協議實現分析——UDT初始化和銷燬  一文中提到的, 在調用任何UDT API以前,須要首先調用UDT::startup()來對庫作一個初始化,並在結束以後執行UDT::cleanup()作最後的清理。在這個示例中,是建立了一個helper類UDTUpDown來幫助作這些事情的。利用編程語言自己提供的構造-析夠機制來對UDT進行初始化和銷燬要比手動調用這些函數要可靠得多。api

2. 得到本地UDP端口的網絡地址。網絡

3. 調用UDT::socket()函數建立一個UDT Socket。這個Socket是UDT抽象出來的一個邏輯的Socket,咱們並不能利用這個Socket自己來收發數據。併發

4. 調用UDT::bind()將建立的UDT Socket綁定到本地端口的網絡地址。這一步將會把UDT的邏輯Socket與可以進行數據收發的系統UDP socket進行關聯,自此咱們就能夠利用UDT Socket進行收發數據了。app

5. 調用UDT::listen()告訴UDT,把這個UDT Socket作爲這個端口上的Listening Socket。一個UDP端口能夠被多個UDT Socket複用,在調用UDT::listen()以後,UDT就知道,在有其它節點鏈接這個UDP端口時,須要把相關的鏈接請求消息發送到哪一個UDT Socket的接收緩衝區了。less

對綁定到相同UDP端口的多個不一樣UDT Socket調用UDT::listen()時,UDT是如何處理的?咱們知道,一個UDP端口上最多隻能有一個listening Socket。dom

6. 調用UDT::accept()函數等待其它節點的鏈接。其它節點鏈接時,這個函數返回另一個單獨的UDT Socket,以用於與發起鏈接的節點進行通訊。socket

UDT::accept()函數返回的UDT Socket會被綁定到另外的一個不一樣的UDP端口,仍是會被綁定到listening Socket所綁定的UDP端口?編程語言

7. 使用UDT::accept()返回的UDT Socket,利用UDT::recv()與UDT::send()等函數同發起鏈接的節點進行數據傳輸。

8. 在再也不須要UDT Socket時,調用UDT::close()函數關掉它,以釋放資源。

而後再來看下UDT Client的實現:

#include <unistd.h>
#include <cstdlib>
#include <cstring>
#include <netdb.h>

#include <iostream>
#include <udt.h>

using namespace std;

void* monitor(void*);

struct UDTUpDown {
    UDTUpDown() {
        // use this function to initialize the UDT library
        UDT::startup();
    }
    ~UDTUpDown() {
        // use this function to release the UDT library
        UDT::cleanup();
    }
};

int main(int argc, char* argv[]) {
    if ((3 != argc) || (0 == atoi(argv[2]))) {
        cout << "usage: appclient server_ip server_port" << endl;
        return 0;
    }

    // Automatically start up and clean up UDT module.
    UDTUpDown _udt_;

    struct addrinfo hints, *local, *peer;

    memset(&hints, 0, sizeof(struct addrinfo));

    hints.ai_flags = AI_PASSIVE;
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    //hints.ai_socktype = SOCK_DGRAM;

    if (0 != getaddrinfo(NULL, "9000", &hints, &local)) {
        cout << "incorrect network address.\n" << endl;
        return 0;
    }

    UDTSOCKET client = UDT::socket(local->ai_family, local->ai_socktype, local->ai_protocol);

    // UDT Options
    //UDT::setsockopt(client, 0, UDT_CC, new CCCFactory<CUDPBlast>, sizeof(CCCFactory<CUDPBlast>));
    //UDT::setsockopt(client, 0, UDT_MSS, new int(9000), sizeof(int));
    //UDT::setsockopt(client, 0, UDT_SNDBUF, new int(10000000), sizeof(int));
    //UDT::setsockopt(client, 0, UDP_SNDBUF, new int(10000000), sizeof(int));
    //UDT::setsockopt(client, 0, UDT_MAXBW, new int64_t(12500000), sizeof(int));

    // for rendezvous connection, enable the code below
    /*
     UDT::setsockopt(client, 0, UDT_RENDEZVOUS, new bool(true), sizeof(bool));
     if (UDT::ERROR == UDT::bind(client, local->ai_addr, local->ai_addrlen))
     {
     cout << "bind: " << UDT::getlasterror().getErrorMessage() << endl;
     return 0;
     }
     */

    freeaddrinfo(local);

    if (0 != getaddrinfo(argv[1], argv[2], &hints, &peer)) {
        cout << "incorrect server/peer address. " << argv[1] << ":" << argv[2] << endl;
        return 0;
    }

    // connect to the server, implict bind
    if (UDT::ERROR == UDT::connect(client, peer->ai_addr, peer->ai_addrlen)) {
        cout << "connect: " << UDT::getlasterror().getErrorMessage() << endl;
        return 0;
    }

    freeaddrinfo(peer);

    int size = 100000;
    char* data = new char[size];

    pthread_create(new pthread_t, NULL, monitor, &client);

    for (int i = 0; i < 1000000; i++) {
        int ssize = 0;
        int ss;
        while (ssize < size) {
            if (UDT::ERROR == (ss = UDT::send(client, data + ssize, size - ssize, 0))) {
                cout << "send:" << UDT::getlasterror().getErrorMessage() << endl;
                break;
            }

            ssize += ss;
        }

        if (ssize < size)
            break;
    }

    UDT::close(client);
    delete[] data;
    return 0;
}

void* monitor(void* s) {
    UDTSOCKET u = *(UDTSOCKET*) s;
    UDT::TRACEINFO perf;
    cout << "SendRate(Mb/s)\tRTT(ms)\tCWnd\tPktSndPeriod(us)\tRecvACK\tRecvNAK" << endl;

    while (true) {
        sleep(1);
        if (UDT::ERROR == UDT::perfmon(u, &perf)) {
            cout << "perfmon: " << UDT::getlasterror().getErrorMessage() << endl;
            break;
        }

        cout << perf.mbpsSendRate << "\t\t" << perf.msRTT << "\t" << perf.pktCongestionWindow << "\t"
             << perf.usPktSndPeriod << "\t\t\t" << perf.pktRecvACK << "\t" << perf.pktRecvNAK << endl;
    }

    return NULL;
}

能夠看到UDT Client鏈接UDT Server併發送數據的過程大致以下:

1. 一樣須要在調用任何UDT API以前,先調用UDT::startup()來對庫作初始化,並在結束以後執行UDT::cleanup()作最後的清理。這裏一樣使用helper類UDTUpDown來幫助作這些事情。

2. 得到UDT Server的網絡地址。

3. 調用UDT::socket()函數建立一個UDT Socket。這個Socket能夠與特定的本地地址綁定,也能夠不綁定。若是綁定,則發送數據時的出口地址就是該端口,若是不綁定,出口地址則是一個不肯定的值。

4. 調用UDT::connect()鏈接UDT Server。

5. 使用UDT Socket,利用UDT::recv()與UDT::send()等函數同UDT Server進行數據傳輸。

6. 在再也不須要UDT Socket時,調用UDT::close()函數關掉它,以釋放資源。

UDT基本的收發數據的API的用法大致如上面所示。接着咱們來看,這些函數是如何實現的。

UDT Socket的建立

不管是UDT Server要listening,仍是UDT client要鏈接UDT Server,在調用UDT::startup()初始化UDT以後,首先要作的事情都是調用UDT::socket()建立UDT Socket了。咱們就來看一下建立UDT Socket的過程:

UDTSOCKET CUDTUnited::newSocket(int af, int type) {
    if ((type != SOCK_STREAM) && (type != SOCK_DGRAM))
        throw CUDTException(5, 3, 0);

    CUDTSocket* ns = NULL;

    try {
        ns = new CUDTSocket;
        ns->m_pUDT = new CUDT;
        if (AF_INET == af) {
            ns->m_pSelfAddr = (sockaddr*) (new sockaddr_in);
            ((sockaddr_in*) (ns->m_pSelfAddr))->sin_port = 0;
        } else {
            ns->m_pSelfAddr = (sockaddr*) (new sockaddr_in6);
            ((sockaddr_in6*) (ns->m_pSelfAddr))->sin6_port = 0;
        }
    } catch (...) {
        delete ns;
        throw CUDTException(3, 2, 0);
    }

    CGuard::enterCS(m_IDLock);
    ns->m_SocketID = --m_SocketID;
    CGuard::leaveCS(m_IDLock);

    ns->m_Status = INIT;
    ns->m_ListenSocket = 0;
    ns->m_pUDT->m_SocketID = ns->m_SocketID;
    ns->m_pUDT->m_iSockType = (SOCK_STREAM == type) ? UDT_STREAM : UDT_DGRAM;
    ns->m_pUDT->m_iIPversion = ns->m_iIPversion = af;
    ns->m_pUDT->m_pCache = m_pCache;

    // protect the m_Sockets structure.
    CGuard::enterCS(m_ControlLock);
    try {
        m_Sockets[ns->m_SocketID] = ns;
    } catch (...) {
        //failure and rollback
        CGuard::leaveCS(m_ControlLock);
        delete ns;
        ns = NULL;
    }
    CGuard::leaveCS(m_ControlLock);

    if (NULL == ns)
        throw CUDTException(3, 2, 0);

    return ns->m_SocketID;
}


UDTSOCKET CUDT::socket(int af, int type, int) {
    if (!s_UDTUnited.m_bGCStatus)
        s_UDTUnited.startup();

    try {
        return s_UDTUnited.newSocket(af, type);
    } catch (CUDTException& e) {
        s_UDTUnited.setError(new CUDTException(e));
        return INVALID_SOCK;
    } catch (bad_alloc&) {
        s_UDTUnited.setError(new CUDTException(3, 2, 0));
        return INVALID_SOCK;
    } catch (...) {
        s_UDTUnited.setError(new CUDTException(-1, 0, 0));
        return INVALID_SOCK;
    }
}



UDTSOCKET socket(int af, int type, int protocol) {
    return CUDT::socket(af, type, protocol);
}

調用流程爲,UDT::socket() -> CUDT::socket() -> CUDTUnited::newSocket()。

在CUDT::socket()中,咱們看到,它會首先檢查s_UDTUnited.m_bGCStatus,若發現UDT尚未初始化完成的話,則會調用s_UDTUnited.startup()進行初始化。這個地方對UDT狀態s_UDTUnited.m_bGCStatus的檢查沒有問題,但在發現UDT沒有初始化完成時調用s_UDTUnited.startup()彷佛並不恰當。這個地方調用了s_UDTUnited.startup(),那與此次調用相對應的cleanup()又在哪調用了呢?顯然在UDT內部是沒有。如果調用cleanup()的職責老是在UDT的使用者,那倒不如在這個地方返回錯誤給調用者,徹底讓調用者來管理UDT的生命週期,以儘量地避免資源泄漏。

建立UDT Socket的工做實際都在CUDTUnited::newSocket()函數中完成。能夠看到,在這個函數中主要作了以下的這樣一些事情:

1. 建立一個CUDTSocket對象ns,並建立一個CUDT對象被ns->m_pUDT引用。初始化ns對象的Self網絡地址,端口會被設置爲0。在UDT中使用CUDTSocket和CUDT共同來描述一個Socket。每一個UDT Socket都會有其相應的CUDTSocket對象和CUDT對象。

能夠看一下CUDTSocket類的定義,來了解它都描述了UDT Socket的哪些屬性(src/api.h):

class CUDTSocket {
 public:
    CUDTSocket();
    ~CUDTSocket();

    UDTSTATUS m_Status;                       // current socket state

    uint64_t m_TimeStamp;                     // time when the socket is closed

    int m_iIPversion;                         // IP version
    sockaddr* m_pSelfAddr;                    // pointer to the local address of the socket
    sockaddr* m_pPeerAddr;                    // pointer to the peer address of the socket

    UDTSOCKET m_SocketID;                     // socket ID
    UDTSOCKET m_ListenSocket;                 // ID of the listener socket; 0 means this is an independent socket

    UDTSOCKET m_PeerID;                       // peer socket ID
    int32_t m_iISN;                      // initial sequence number, used to tell different connection from same IP:port

    CUDT* m_pUDT;                             // pointer to the UDT entity

    std::set<UDTSOCKET>* m_pQueuedSockets;    // set of connections waiting for accept()
    std::set<UDTSOCKET>* m_pAcceptSockets;    // set of accept()ed connections

    pthread_cond_t m_AcceptCond;              // used to block "accept" call
    pthread_mutex_t m_AcceptLock;             // mutex associated to m_AcceptCond

    unsigned int m_uiBackLog;                 // maximum number of connections in queue

    int m_iMuxID;                             // multiplexer ID

    pthread_mutex_t m_ControlLock;            // lock this socket exclusively for control APIs: bind/listen/connect

 private:
    CUDTSocket(const CUDTSocket&);
    CUDTSocket& operator=(const CUDTSocket&);
};

這個類只提供了構造和析構兩個成員函數。還聲明瞭私有的copy構造函數和賦值操做符函數,但沒有定義它們,以免類對象的複製。

能夠再來看一下CUDTSocket的構造函數實現(src/api.h):

CUDTSocket::CUDTSocket()
        : m_Status(INIT),
          m_TimeStamp(0),
          m_iIPversion(0),
          m_pSelfAddr(NULL),
          m_pPeerAddr(NULL),
          m_SocketID(0),
          m_ListenSocket(0),
          m_PeerID(0),
          m_iISN(0),
          m_pUDT(NULL),
          m_pQueuedSockets(NULL),
          m_pAcceptSockets(NULL),
          m_AcceptCond(),
          m_AcceptLock(),
          m_uiBackLog(0),
          m_iMuxID(-1) {
#ifndef WIN32
    pthread_mutex_init(&m_AcceptLock, NULL);
    pthread_cond_init(&m_AcceptCond, NULL);
    pthread_mutex_init(&m_ControlLock, NULL);
#else
    m_AcceptLock = CreateMutex(NULL, false, NULL);
    m_AcceptCond = CreateEvent(NULL, false, false, NULL);
    m_ControlLock = CreateMutex(NULL, false, NULL);
#endif
}

特別注意m_Status的初始化,該值被初始化爲了INIT。從狀態機的角度來看CUDTSocket,在它剛被new出來時,它處於INIT狀態。

CUDT類有兩個主要的職責,一是描述UDT Socket,包括全部的非靜態成員變量和非靜態成員函數,定義UDT Socket的大部分屬性和所能提供操做;二是提供API,包括絕大部分的static成員函數,這些函數將調用者與UDT內部的實現鏈接起來。CUDT類這樣的設計,明顯違背了OO的SRP單一職責原則,這多少仍是給代碼的閱讀帶來了必定的障礙。再來看一下CUDT的構造函數實現(src/core.cpp):

CUDT::CUDT() {
    m_pSndBuffer = NULL;
    m_pRcvBuffer = NULL;
    m_pSndLossList = NULL;
    m_pRcvLossList = NULL;
    m_pACKWindow = NULL;
    m_pSndTimeWindow = NULL;
    m_pRcvTimeWindow = NULL;

    m_pSndQueue = NULL;
    m_pRcvQueue = NULL;
    m_pPeerAddr = NULL;
    m_pSNode = NULL;
    m_pRNode = NULL;

    // Initilize mutex and condition variables
    initSynch();

    // Default UDT configurations
    m_iMSS = 1500;
    m_bSynSending = true;
    m_bSynRecving = true;
    m_iFlightFlagSize = 25600;
    m_iSndBufSize = 8192;
    m_iRcvBufSize = 8192;  //Rcv buffer MUST NOT be bigger than Flight Flag size
    m_Linger.l_onoff = 1;
    m_Linger.l_linger = 180;
    m_iUDPSndBufSize = 65536;
    m_iUDPRcvBufSize = m_iRcvBufSize * m_iMSS;
    m_iSockType = UDT_STREAM;
    m_iIPversion = AF_INET;
    m_bRendezvous = false;
    m_iSndTimeOut = -1;
    m_iRcvTimeOut = -1;
    m_bReuseAddr = true;
    m_llMaxBW = -1;

    m_pCCFactory = new CCCFactory<CUDTCC>;
    m_pCC = NULL;
    m_pCache = NULL;

    // Initial status
    m_bOpened = false;
    m_bListening = false;
    m_bConnecting = false;
    m_bConnected = false;
    m_bClosing = false;
    m_bShutdown = false;
    m_bBroken = false;
    m_bPeerHealth = true;
    m_ullLingerExpiration = 0;
}


void CUDT::initSynch() {
#ifndef WIN32
    pthread_mutex_init(&m_SendBlockLock, NULL);
    pthread_cond_init(&m_SendBlockCond, NULL);
    pthread_mutex_init(&m_RecvDataLock, NULL);
    pthread_cond_init(&m_RecvDataCond, NULL);
    pthread_mutex_init(&m_SendLock, NULL);
    pthread_mutex_init(&m_RecvLock, NULL);
    pthread_mutex_init(&m_AckLock, NULL);
    pthread_mutex_init(&m_ConnectionLock, NULL);
#else
    m_SendBlockLock = CreateMutex(NULL, false, NULL);
    m_SendBlockCond = CreateEvent(NULL, false, false, NULL);
    m_RecvDataLock = CreateMutex(NULL, false, NULL);
    m_RecvDataCond = CreateEvent(NULL, false, false, NULL);
    m_SendLock = CreateMutex(NULL, false, NULL);
    m_RecvLock = CreateMutex(NULL, false, NULL);
    m_AckLock = CreateMutex(NULL, false, NULL);
    m_ConnectionLock = CreateMutex(NULL, false, NULL);
#endif
}

都是成員變量的初始化,後續再來詳細瞭解這些成員變量的做用。

2. 爲Socket分配SocketID,其值爲CUDTUnited的m_SocketID遞減的結果。m_SocketID在CUDTUnited的構造函數中初始化:

CUDTUnited::CUDTUnited()
        : m_Sockets(),
          m_ControlLock(),
          m_IDLock(),
          m_SocketID(0),
          m_TLSError(),
          m_mMultiplexer(),
          m_MultiplexerLock(),
          m_pCache(NULL),
          m_bClosing(false),
          m_GCStopLock(),
          m_GCStopCond(),
          m_InitLock(),
          m_iInstanceCount(0),
          m_bGCStatus(false),
          m_GCThread(),
          m_ClosedSockets() {
    // Socket ID MUST start from a random value
    srand((unsigned int) CTimer::getTime());
    m_SocketID = 1 + (int) ((1 << 30) * (double(rand()) / RAND_MAX));

#ifndef WIN32
    pthread_mutex_init(&m_ControlLock, NULL);
    pthread_mutex_init(&m_IDLock, NULL);
    pthread_mutex_init(&m_InitLock, NULL);
#else
    m_ControlLock = CreateMutex(NULL, false, NULL);
    m_IDLock = CreateMutex(NULL, false, NULL);
    m_InitLock = CreateMutex(NULL, false, NULL);
#endif

#ifndef WIN32
    pthread_key_create(&m_TLSError, TLSDestroy);
#else
    m_TLSError = TlsAlloc();
    m_TLSLock = CreateMutex(NULL, false, NULL);
#endif

    m_pCache = new CCache<CInfoBlock>;
}

m_SocketID的初始值是一個隨機數。

3. 初始化ns及它的CUDT對象的一些成員變量。特別注意ns->m_Status的賦值,這裏該值被賦爲了INIT。從狀態機的角度來看待CUDTSocket,在執行UDT Socket建立結束執行時,它處於INIT狀態下。

4. 將ns放在std::map<UDTSOCKET, CUDTSocket*> m_Sockets中。

5. 將UDT socket的SocketID返回給調用者。

這個接口直接返回一個表示UDT Socket的類對象,而不是一個handle,以便於調用者在調用UDT API時無需每次都輸入UDT Socket handle參數,這樣的API設計相對於UDT的這種API設計,有什麼樣的優缺點?

UDT的錯誤處理機制

在前面UDT Server和UDT Client的demo程序中,咱們有看到,全部的UDT API都會在出錯時返回一個錯誤碼,好比UDT::bind()、UDT::bind2()、UDT::listen()和UDT::connect()等返回值類型爲int的函數,在出錯時返回UDT::ERROR,UDT::socket()和UDT::accept()等返回值類型爲UDTSOCKET的函數在出錯時返回UDT::INVALID_SOCK。

UDT API的調用者在檢測到它們返回了錯誤值時,經過調用UDT::getlasterror()函數來獲取關於異常更加詳細的信息。這裏咱們就來看一下這套機制的實現。

注意看CUDT::socket()函數的實現。在這個函數中,會將實際建立UDT Socket的任務委託給s_UDTUnited.newSocket(),但這個調用會被包在一個try-catch塊中。在s_UDTUnited.newSocket()函數執行的過程當中發生任何的異常,都會先被CUDT::socket()捕獲。CUDT::socket()函數在捕獲這些異常以後,會根據捕獲的異常的類型建立不一樣的CUDTException對象,並經過調用CUDTUnited::setError()函數將該CUDTException對象設置給s_UDTUnited。

咱們來看一下CUDTUnited::setError()函數的實現(src/api.cpp):

void CUDTUnited::setError(CUDTException* e) {
#ifndef WIN32
    delete (CUDTException*) pthread_getspecific(m_TLSError);
    pthread_setspecific(m_TLSError, e);
#else
    CGuard tg(m_TLSLock);
    delete (CUDTException*)TlsGetValue(m_TLSError);
    TlsSetValue(m_TLSError, e);
    m_mTLSRecord[GetCurrentThreadId()] = e;
#endif
}

在這個函數中,會首先取出線程局部存儲變量m_TLSError中保存的本線程上一次建立的 CUDTException對象,將其delete掉,並將本次建立的CUDTException對象設置進去。CUDTUnited類定義中m_TLSError的聲明(src/api.h):

private:
    pthread_key_t m_TLSError;                         // thread local error record (last error)
#ifndef WIN32
    static void TLSDestroy(void* e) {
        if (NULL != e)
            delete (CUDTException*) e;
    }
#else
    std::map<DWORD, CUDTException*> m_mTLSRecord;
    void checkTLSValue();
    pthread_mutex_t m_TLSLock;
#endif

在CUDTUnited構造函數中也能夠看到對這個對象的初始化。 CUDTUnited::TLSDestroy()函數是m_TLSError的析構函數,在m_TLSError最後被銷燬時,這個函數被調用,以便於釋放搜有還未釋放的資源。

再來看UDT API的調用者獲取上一次發生的異常的函數UDT::getlasterror():

CUDTException* CUDTUnited::getError() {
#ifndef WIN32
    if (NULL == pthread_getspecific(m_TLSError))
        pthread_setspecific(m_TLSError, new CUDTException);
    return (CUDTException*) pthread_getspecific(m_TLSError);
#else
    CGuard tg(m_TLSLock);
    if(NULL == TlsGetValue(m_TLSError))
    {
        CUDTException* e = new CUDTException;
        TlsSetValue(m_TLSError, e);
        m_mTLSRecord[GetCurrentThreadId()] = e;
    }
    return (CUDTException*)TlsGetValue(m_TLSError);
#endif
}



CUDTException& CUDT::getlasterror() {
    return *s_UDTUnited.getError();
}


ERRORINFO& getlasterror() {
    return CUDT::getlasterror();
}

調用過程爲UDT::getlasterror() -> CUDT::getlasterror() -> CUDTUnited::getError()。

主要就是從s_UDTUnited的線程局部存儲變量m_TLSError中取出前面設置的本線程上次建立的CUDTException對象返回給調用者。

總結一下,UDT的使用者在調用UDT API時,UDT API會直接調用CUDT類對應的static API函數,在CUDT類的這些static API函數中會將作實際事情的工做委託給s_UDTUnited的相應函數,但這個委託調用會被包在一個try-catch block中。s_UDTUnited的函數在遇到異常狀況時拋出異常,CUDT類的static API函數捕獲異常,根據捕獲到的異常的具體類型,建立不一樣的CUDTException對象設置給s_UDTUnited的線程局部存儲變量m_TLSError中並向UDT API調用者返回錯誤碼,UDT API的調用者檢測到錯誤碼後,經過UDT::getlasterror()獲取存儲在m_TLSError中的異常。

此處能夠看到,CUDT提供的這一層API,一個比較重要的做用大概就是作異常處理了。

在UDT中,使用CUDTException來描述全部出現的異常。能夠看一下這個類的定義(src/udt.h):

class UDT_API CUDTException {
 public:
    CUDTException(int major = 0, int minor = 0, int err = -1);
    CUDTException(const CUDTException& e);
    virtual ~CUDTException();

    // Functionality:
    //    Get the description of the exception.
    // Parameters:
    //    None.
    // Returned value:
    //    Text message for the exception description.

    virtual const char* getErrorMessage();

    // Functionality:
    //    Get the system errno for the exception.
    // Parameters:
    //    None.
    // Returned value:
    //    errno.

    virtual int getErrorCode() const;

    // Functionality:
    //    Clear the error code.
    // Parameters:
    //    None.
    // Returned value:
    //    None.

    virtual void clear();

 private:
    int m_iMajor;        // major exception categories

// 0: correct condition
// 1: network setup exception
// 2: network connection broken
// 3: memory exception
// 4: file exception
// 5: method not supported
// 6+: undefined error

    int m_iMinor;		// for specific error reasons
    int m_iErrno;		// errno returned by the system if there is any
    std::string m_strMsg;	// text error message

    std::string m_strAPI;	// the name of UDT function that returns the error
    std::string m_strDebug;  // debug information, set to the original place that causes the error

 public:
    // Error Code
    static const int SUCCESS;
    static const int ECONNSETUP;
    static const int ENOSERVER;
    static const int ECONNREJ;
    static const int ESOCKFAIL;
    static const int ESECFAIL;
    static const int ECONNFAIL;
    static const int ECONNLOST;
    static const int ENOCONN;
    static const int ERESOURCE;
    static const int ETHREAD;
    static const int ENOBUF;
    static const int EFILE;
    static const int EINVRDOFF;
    static const int ERDPERM;
    static const int EINVWROFF;
    static const int EWRPERM;
    static const int EINVOP;
    static const int EBOUNDSOCK;
    static const int ECONNSOCK;
    static const int EINVPARAM;
    static const int EINVSOCK;
    static const int EUNBOUNDSOCK;
    static const int ENOLISTEN;
    static const int ERDVNOSERV;
    static const int ERDVUNBOUND;
    static const int ESTREAMILL;
    static const int EDGRAMILL;
    static const int EDUPLISTEN;
    static const int ELARGEMSG;
    static const int EINVPOLLID;
    static const int EASYNCFAIL;
    static const int EASYNCSND;
    static const int EASYNCRCV;
    static const int ETIMEOUT;
    static const int EPEERERR;
    static const int EUNKNOWN;
};

這個class主要經過Major錯誤碼和Minor錯誤碼來描述異常狀況,若是是調用系統調用出錯了,還會用Errno值。

具體看一下這個class的實現,特別是CUDTException::getErrorMessage()函數,來了解每個錯誤碼所表明的含義:

CUDTException::CUDTException(int major, int minor, int err)
        : m_iMajor(major),
          m_iMinor(minor) {
    if (-1 == err)
#ifndef WIN32
        m_iErrno = errno;
#else
        m_iErrno = GetLastError();
#endif
        else
        m_iErrno = err;
}

CUDTException::CUDTException(const CUDTException& e)
        : m_iMajor(e.m_iMajor),
          m_iMinor(e.m_iMinor),
          m_iErrno(e.m_iErrno),
          m_strMsg() {
}

CUDTException::~CUDTException() {
}

const char* CUDTException::getErrorMessage() {
    // translate "Major:Minor" code into text message.

    switch (m_iMajor) {
        case 0:
            m_strMsg = "Success";
            break;

        case 1:
            m_strMsg = "Connection setup failure";

            switch (m_iMinor) {
                case 1:
                    m_strMsg += ": connection time out";
                    break;

                case 2:
                    m_strMsg += ": connection rejected";
                    break;

                case 3:
                    m_strMsg += ": unable to create/configure UDP socket";
                    break;

                case 4:
                    m_strMsg += ": abort for security reasons";
                    break;

                default:
                    break;
            }

            break;

        case 2:
            switch (m_iMinor) {
                case 1:
                    m_strMsg = "Connection was broken";
                    break;

                case 2:
                    m_strMsg = "Connection does not exist";
                    break;

                default:
                    break;
            }

            break;

        case 3:
            m_strMsg = "System resource failure";

            switch (m_iMinor) {
                case 1:
                    m_strMsg += ": unable to create new threads";
                    break;

                case 2:
                    m_strMsg += ": unable to allocate buffers";
                    break;

                default:
                    break;
            }

            break;

        case 4:
            m_strMsg = "File system failure";

            switch (m_iMinor) {
                case 1:
                    m_strMsg += ": cannot seek read position";
                    break;

                case 2:
                    m_strMsg += ": failure in read";
                    break;

                case 3:
                    m_strMsg += ": cannot seek write position";
                    break;

                case 4:
                    m_strMsg += ": failure in write";
                    break;

                default:
                    break;
            }

            break;

        case 5:
            m_strMsg = "Operation not supported";

            switch (m_iMinor) {
                case 1:
                    m_strMsg += ": Cannot do this operation on a BOUND socket";
                    break;

                case 2:
                    m_strMsg += ": Cannot do this operation on a CONNECTED socket";
                    break;

                case 3:
                    m_strMsg += ": Bad parameters";
                    break;

                case 4:
                    m_strMsg += ": Invalid socket ID";
                    break;

                case 5:
                    m_strMsg += ": Cannot do this operation on an UNBOUND socket";
                    break;

                case 6:
                    m_strMsg += ": Socket is not in listening state";
                    break;

                case 7:
                    m_strMsg += ": Listen/accept is not supported in rendezous connection setup";
                    break;

                case 8:
                    m_strMsg += ": Cannot call connect on UNBOUND socket in rendezvous connection setup";
                    break;

                case 9:
                    m_strMsg += ": This operation is not supported in SOCK_STREAM mode";
                    break;

                case 10:
                    m_strMsg += ": This operation is not supported in SOCK_DGRAM mode";
                    break;

                case 11:
                    m_strMsg += ": Another socket is already listening on the same port";
                    break;

                case 12:
                    m_strMsg += ": Message is too large to send (it must be less than the UDT send buffer size)";
                    break;

                case 13:
                    m_strMsg += ": Invalid epoll ID";
                    break;

                default:
                    break;
            }

            break;

        case 6:
            m_strMsg = "Non-blocking call failure";

            switch (m_iMinor) {
                case 1:
                    m_strMsg += ": no buffer available for sending";
                    break;

                case 2:
                    m_strMsg += ": no data available for reading";
                    break;

                default:
                    break;
            }

            break;

        case 7:
            m_strMsg = "The peer side has signalled an error";

            break;

        default:
            m_strMsg = "Unknown error";
    }

    // Adding "errno" information
    if ((0 != m_iMajor) && (0 < m_iErrno)) {
        m_strMsg += ": ";
#ifndef WIN32
        char errmsg[1024];
        if (strerror_r(m_iErrno, errmsg, 1024) == 0)
            m_strMsg += errmsg;
#else
        LPVOID lpMsgBuf;
        FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, m_iErrno, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPTSTR)&lpMsgBuf, 0, NULL);
        m_strMsg += (char*)lpMsgBuf;
        LocalFree(lpMsgBuf);
#endif
    }

    // period
#ifndef WIN32
    m_strMsg += ".";
#endif

    return m_strMsg.c_str();
}

int CUDTException::getErrorCode() const {
    return m_iMajor * 1000 + m_iMinor;
}

void CUDTException::clear() {
    m_iMajor = 0;
    m_iMinor = 0;
    m_iErrno = 0;
}

const int CUDTException::SUCCESS = 0;
const int CUDTException::ECONNSETUP = 1000;
const int CUDTException::ENOSERVER = 1001;
const int CUDTException::ECONNREJ = 1002;
const int CUDTException::ESOCKFAIL = 1003;
const int CUDTException::ESECFAIL = 1004;
const int CUDTException::ECONNFAIL = 2000;
const int CUDTException::ECONNLOST = 2001;
const int CUDTException::ENOCONN = 2002;
const int CUDTException::ERESOURCE = 3000;
const int CUDTException::ETHREAD = 3001;
const int CUDTException::ENOBUF = 3002;
const int CUDTException::EFILE = 4000;
const int CUDTException::EINVRDOFF = 4001;
const int CUDTException::ERDPERM = 4002;
const int CUDTException::EINVWROFF = 4003;
const int CUDTException::EWRPERM = 4004;
const int CUDTException::EINVOP = 5000;
const int CUDTException::EBOUNDSOCK = 5001;
const int CUDTException::ECONNSOCK = 5002;
const int CUDTException::EINVPARAM = 5003;
const int CUDTException::EINVSOCK = 5004;
const int CUDTException::EUNBOUNDSOCK = 5005;
const int CUDTException::ENOLISTEN = 5006;
const int CUDTException::ERDVNOSERV = 5007;
const int CUDTException::ERDVUNBOUND = 5008;
const int CUDTException::ESTREAMILL = 5009;
const int CUDTException::EDGRAMILL = 5010;
const int CUDTException::EDUPLISTEN = 5011;
const int CUDTException::ELARGEMSG = 5012;
const int CUDTException::EINVPOLLID = 5013;
const int CUDTException::EASYNCFAIL = 6000;
const int CUDTException::EASYNCSND = 6001;
const int CUDTException::EASYNCRCV = 6002;
const int CUDTException::ETIMEOUT = 6003;
const int CUDTException::EPEERERR = 7000;
const int CUDTException::EUNKNOWN = -1;

Done。

相關文章
相關標籤/搜索