UDT協議實現分析——close過程

最後再來看一下close的過程(src/api.cpp): api

int CUDTUnited::close(const UDTSOCKET u) {
    CUDTSocket* s = locate(u);
    if (NULL == s)
        throw CUDTException(5, 4, 0);

    CGuard socket_cg(s->m_ControlLock);

    if (s->m_Status == LISTENING) {
        if (s->m_pUDT->m_bBroken)
            return 0;

        s->m_TimeStamp = CTimer::getTime();
        s->m_pUDT->m_bBroken = true;

        // broadcast all "accept" waiting
#ifndef WIN32
        pthread_mutex_lock(&(s->m_AcceptLock));
        pthread_cond_broadcast(&(s->m_AcceptCond));
        pthread_mutex_unlock(&(s->m_AcceptLock));
#else
        SetEvent(s->m_AcceptCond);
#endif

        return 0;
    }

    s->m_pUDT->close();

    // synchronize with garbage collection.
    CGuard manager_cg(m_ControlLock);

    // since "s" is located before m_ControlLock, locate it again in case it became invalid
    map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u);
    if ((i == m_Sockets.end()) || (i->second->m_Status == CLOSED))
        return 0;
    s = i->second;

    s->m_Status = CLOSED;

    // a socket will not be immediated removed when it is closed
    // in order to prevent other methods from accessing invalid address
    // a timer is started and the socket will be removed after approximately 1 second
    s->m_TimeStamp = CTimer::getTime();

    m_Sockets.erase(s->m_SocketID);
    m_ClosedSockets.insert(pair<UDTSOCKET, CUDTSocket*>(s->m_SocketID, s));

    CTimer::triggerEvent();

    return 0;
}



int CUDT::close(UDTSOCKET u) {
    try {
        return s_UDTUnited.close(u);
    } catch (CUDTException &e) {
        s_UDTUnited.setError(new CUDTException(e));
        return ERROR;
    } catch (...) {
        s_UDTUnited.setError(new CUDTException(-1, 0, 0));
        return ERROR;
    }
}


int close(UDTSOCKET u) {
    return CUDT::close(u);
}

這個API的實現結構並無什麼特別值得關注的地方。直接來看CUDTUnited::close()。在CUDTUnited::close()函數中,主要是分兩種狀況來處理的:一種是Server端用於接受鏈接的Listening socket;另外一種是常規的用於進行數據收發的socket。 app

對於第一種狀況,能夠看到,這裏主要是設置了對應的CUDTSocket s的m_TimeStamp爲當前時間,並將s->m_pUDT->m_bBroken置爲true,而後將等待在accept的線程喚醒就結束了。無論是在CUDTUnited::close()中,仍是在被它喚醒的執行CUDTUnited::accept()的線程中,都沒有看到有實際作最後的清理的動做,好比被加入隊列的爲新鏈接請求建立的UDT Socket的清理,將當前UDT Socket從RcvQueue的listener移除這些清理的動做只有在UDT的垃圾回收線程裏作了。 socket

接着是第二種狀況,能夠看到首先是執行了s->m_pUDT->close(),不難想像這個close中作的事情必定特別多;作狀態的切換,將Socket的狀態切換到CLOSED狀態;更新對應的CUDTSocket s的m_TimeStamp爲當前時間;將UDT Socket從總的打開socket表m_Sockets中移除,並加入到已關閉socket表m_ClosedSockets中,而後trigger一個Timer event並返回。 async

而後來看CUDT::close()(src/core.cpp): 函數

void CUDT::close() {
    if (!m_bOpened)
        return;

    if (0 != m_Linger.l_onoff) {
        uint64_t entertime = CTimer::getTime();

        while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() > 0)
                && (CTimer::getTime() - entertime < m_Linger.l_linger * 1000000ULL)) {
            // linger has been checked by previous close() call and has expired
            if (m_ullLingerExpiration >= entertime)
                break;

            if (!m_bSynSending) {
                // if this socket enables asynchronous sending, return immediately and let GC to close it later
                if (0 == m_ullLingerExpiration)
                    m_ullLingerExpiration = entertime + m_Linger.l_linger * 1000000ULL;

                return;
            }

#ifndef WIN32
            timespec ts;
            ts.tv_sec = 0;
            ts.tv_nsec = 1000000;
            nanosleep(&ts, NULL);
#else
            Sleep(1);
#endif
        }
    }

    // remove this socket from the snd queue
    if (m_bConnected)
        m_pSndQueue->m_pSndUList->remove(this);

    // trigger any pending IO events.
    s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_ERR, true);
    // then remove itself from all epoll monitoring
    try {
        for (set<int>::iterator i = m_sPollID.begin(); i != m_sPollID.end(); ++i)
            s_UDTUnited.m_EPoll.remove_usock(*i, m_SocketID);
    } catch (...) {
    }

    if (!m_bOpened)
        return;

    // Inform the threads handler to stop.
    m_bClosing = true;

    CGuard cg(m_ConnectionLock);

    // Signal the sender and recver if they are waiting for data.
    releaseSynch();

    if (m_bListening) {
        m_bListening = false;
        m_pRcvQueue->removeListener(this);
    } else if (m_bConnecting) {
        m_pRcvQueue->removeConnector(m_SocketID);
    }

    if (m_bConnected) {
        if (!m_bShutdown)
            sendCtrl(5);

        m_pCC->close();

        // Store current connection information.
        CInfoBlock ib;
        ib.m_iIPversion = m_iIPversion;
        CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
        ib.m_iRTT = m_iRTT;
        ib.m_iBandwidth = m_iBandwidth;
        m_pCache->update(&ib);

        m_bConnected = false;
    }

    // waiting all send and recv calls to stop
    CGuard sendguard(m_SendLock);
    CGuard recvguard(m_RecvLock);

    // CLOSED.
    m_bOpened = false;
}
能夠看到,這個函數中作的事情大致以下:

1. 檢查Open狀態m_bOpened,若m_bOpened爲false,則直接返回,不然繼續執行。 ui

2. 等待一段時間,以使在發送緩衝區中尚未發送完成的數據可以可靠地發送完成,固然過了必定時間以後,發送緩衝區中仍是存在沒有可靠地發送完成的數據,則那些數據會被直接丟棄掉。這裏的等待,是一種比較高頻率地輪詢。 this

3. 若是當前處於Connected狀態,則將當前UDT Socket從發送隊列的發送者列表m_pSndUList中移除出去。 spa

4. 這裏再一次檢查了m_bOpened的值。 線程

5. 將m_bClosing置爲true。 code

如咱們前面看到的,鏈接成功創建以後,UDT Socket會被加入到RcvQueue的數據接收者列表m_pRcvUList和m_pHash中,但這裏在關閉UDT Socket時,卻沒有看到有將當前UDT Socket從那些列表中移除的code。這究竟是怎麼回事呢?

來看一下CRcvQueue::worker()中的這段code:

CRNode* ul = self->m_pRcvUList->m_pUList;
        uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency();
        while ((NULL != ul) && (ul->m_llTimeStamp < ctime)) {
            CUDT* u = ul->m_pUDT;

            if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) {
                u->checkTimers();
                self->m_pRcvUList->update(u);
            } else {
                // the socket must be removed from Hash table first, then RcvUList
                self->m_pHash->remove(u->m_SocketID);
                self->m_pRcvUList->remove(u);
                u->m_pRNode->m_bOnList = false;
            }

            ul = self->m_pRcvUList->m_pUList;
        }
將一個UDT Socket加入 到RcvQueue的數據接收者列表m_pRcvUList和m_pHash中的動做是由RcvQueue的worker線程自動完成的,那麼將一個UDT Socket從RcvQueue的數據接收者列表m_pRcvUList和m_pHash中移除的動做天然也是有 RcvQueue的worker線程自動完成的

在這裏能夠看到,將UDT Socket的m_bClosing置爲true以後,RcvQueue的worker線程自會UDT Socket從RcvQueue的數據接收者列表m_pRcvUList和m_pHash中移除。

6. 喚醒全部等待在這個UDT Socket的condition上的線程。

7. 處理處於listening狀態的socket,主要是將m_bListening置爲false,並將當前UDT Socket從RcvQueue的listener移除。

這裏卻是看到了對於listener的處理了,可是仍是沒有看到對於處在隊列中,但尚未被accept返回的UDT Socket的處理。

8. 處理處於m_bConnecting狀態的socket,主要是將當前UDT Socket從connector列表中移除。

9. 處理處於m_bConnected狀態的socket,主要是發送shutdown消息給peer端,並將m_bConnected置爲false。

10. 將m_bOpened置爲false。

能夠看到,七、八、9的處理應該是互斥的。

Done。

相關文章
相關標籤/搜索