微信協程庫libco研究(三):協程的事件管理

前面的兩篇文章中介紹了微信的libco庫如何hook系統函數協程的建立和管理,本篇文章將介紹libco庫是進行事件管理的。java

libco庫使用一種相似時間片的技術進行輪詢,使得每一個註冊在其上的事件都有機會執行。segmentfault

1. 基礎數據結構

在上一篇文章中介紹stCoRoutineEnv_t時,咱們將stCoEpoll_t這個結構跳過了,如今咱們來仔細分析下這個數據結構。數組

struct stCoEpoll_t
{
    int iEpollFd;
    static const int _EPOLL_SIZE = 1024 * 10;
    struct stTimeout_t *pTimeout; //用於保存timeout item
    struct stTimeoutItemLink_t *pstTimeoutList; // 在後續的event_ loop中介紹
    struct stTimeoutItemLink_t *pstActiveList; 
    co_epoll_res *result; 
};

stCoEpoll_t中主要保存了epoll監聽的fd,以及註冊在其中的超時事件。
stTimeoutItem_t實際上是libco庫實現的雙向鏈表,有prev和next指針,同時保存了鏈表指針。後面在使用過程當中再介紹stTimeout_t服務器

struct stTimeoutItem_t
{
    enum
    {
        eMaxTimeout = 40 * 1000 //40s
    };
    stTimeoutItem_t *pPrev;
    stTimeoutItem_t *pNext;
    stTimeoutItemLink_t *pLink;

    unsigned long long ullExpireTime;

    OnPreparePfn_t pfnPrepare;
    OnProcessPfn_t pfnProcess;

    void *pArg; // routine 
    bool bTimeout;
};
struct stTimeoutItemLink_t
{
    stTimeoutItem_t *head;
    stTimeoutItem_t *tail;

};
struct stTimeout_t
{
    stTimeoutItemLink_t *pItems;
    int iItemSize;

    unsigned long long ullStart;
    long long llStartIdx;
};

2. 初始化

在上篇文章中,在初始化本線程的stCoRoutineEnv_t時,在co_init_curr_thread_env的最後,會調用AllocEpoll() => AllocTimeout() 方法,咱們看一下AllocTimeout中具體作了哪些事情。微信

stTimeout_t *AllocTimeout( int iSize )
{
    stTimeout_t *lp = (stTimeout_t*)calloc( 1,sizeof(stTimeout_t) );    

    lp->iItemSize = iSize;
    lp->pItems = (stTimeoutItemLink_t*)calloc( 1,sizeof(stTimeoutItemLink_t) * lp->iItemSize );

    lp->ullStart = GetTickMS();
    lp->llStartIdx = 0;

    return lp;
}
  1. 申請了60*1000個timeoutLink鏈表
  2. 設置當前時間爲起始時間
  3. 設置當前遊標爲0

3. 添加監聽事件

下面以一個簡單的客戶端連接服務器的例子在說明在libco中是如何添加監聽事件的。網絡

fd = socket(PF_INET, SOCK_STREAM, 0);
    struct sockaddr_in addr;
    SetAddr(endpoint->ip, endpoint->port, addr);
    ret = connect(fd,(struct sockaddr*)&addr,sizeof(addr));

因爲在libco庫中hook了socket和connect的函數,所以,這個邏輯會調用poll函數,最終將調用co_poll_inner。下面介紹co_poll_inner的具體邏輯。
第一步,先將epoll結構轉換成poll結構(不清楚爲何必定要轉換成poll類型,難道是爲了兼容性嗎?)數據結構

//1.struct change
    stPoll_t& arg = *((stPoll_t*)malloc(sizeof(stPoll_t)));
    memset( &arg,0,sizeof(arg) );

    arg.iEpollFd = epfd;
    arg.fds = (pollfd*)calloc(nfds, sizeof(pollfd));
    arg.nfds = nfds;

    stPollItem_t arr[2];
    if( nfds < sizeof(arr) / sizeof(arr[0]) && !self->cIsShareStack)
    {
        arg.pPollItems = arr;
    }    
    else
    {
        arg.pPollItems = (stPollItem_t*)malloc( nfds * sizeof( stPollItem_t ) );
    }
    memset( arg.pPollItems,0,nfds * sizeof(stPollItem_t) );

    arg.pfnProcess = OnPollProcessEvent; //記住這個函數,後續有用
    arg.pArg = GetCurrCo( co_get_curr_thread_env() );//參數爲當前Env指針

第二步,將poll結構加入到epoll的監聽事件中
第三步,添加timeout事件框架

//3.add timeout
    unsigned long long now = GetTickMS();
    arg.ullExpireTime = now + timeout;
    int ret = AddTimeout( ctx->pTimeout,&arg,now ); // 將本事件加入到timeout的指定鏈表中
    int iRaiseCnt = 0;
    if( ret != 0 )
    {
        co_log_err("CO_ERR: AddTimeout ret %d now %lld timeout %d arg.ullExpireTime %lld",
                ret,now,timeout,arg.ullExpireTime);
        errno = EINVAL;
        iRaiseCnt = -1;
    }
    else
    {
        co_yield_env( co_get_curr_thread_env() );
        iRaiseCnt = arg.iRaiseCnt;
    }

AllocTimeout只初始化了60*1000(即60s)的鏈表數組,此時在AddTimeout中,將根據本監聽事件的超時時間添加到對應的數組index中的鏈表中,是否是比較相似於java中的HashMap的實現方式?
這裏有個問題,若是超時時間超過了60s,那麼超時事件都會添加到當前index的前一個遊標處,至關於有可能61s,65s的超時事件都會在同一個timeout鏈表中,那麼會不會出現,因爲時間還沒到,而超時事件被處理呢?異步

AddTail( apTimeout->pItems + ( apTimeout->llStartIdx + diff ) % apTimeout->iItemSize , apItem );

添加完超時事件後,本協程調用co_yield_env放棄執行,stRoutineEnv_t將會調用其餘的協程進行處理。socket

4. 輪詢

將事件都加入到timeout鏈表,以及註冊到epoll fd後,main 協程將調用co_eventloop進行輪詢。

void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg )
{
    if( !ctx->result )
    {
        ctx->result =  co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE );
    }
    co_epoll_res *result = ctx->result;


    for(;;)
    {
        // 1. 調用epoll_wait
        int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );

        stTimeoutItemLink_t *active = (ctx->pstActiveList);
        stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);

        // 將timeout鏈表清空
        memset( timeout,0,sizeof(stTimeoutItemLink_t) );

        // 處理poll事件
        for(int i=0;i<ret;i++)
        {
            stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
            if( item->pfnPrepare ) 
            {
                // 這個函數基本是 OnPollPreparePfn
                // 在pollPreaprePfn中,將poll_inner中添加的timeout事件刪除,並添加到active list中
                item->pfnPrepare( item,result->events[i],active );
            }
            else
            {
                AddTail( active,item );
            }
        }


        // 2. 將stTimeout_t中的timeout事件所有添加到timeout鏈表中
        unsigned long long now = GetTickMS();
        TakeAllTimeout( ctx->pTimeout,now,timeout );

        // 設置其爲timeout事件
        stTimeoutItem_t *lp = timeout->head;
        while( lp )
        {
            //printf("raise timeout %p\n",lp);
            lp->bTimeout = true;
            lp = lp->pNext;
        }

        // 3. 添加timeoutList 到 active list
        Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );

        // 4. 對active list進行遍歷執行
        lp = active->head;
        while( lp )
        {

            PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
            // 這裏會對timeout事件進行判斷,若時間不超時,仍然會將其加入到stTimeout_t的timeout數組隊列中
            if (lp->bTimeout && now < lp->ullExpireTime) 
            {
                int ret = AddTimeout(ctx->pTimeout, lp, now);
                if (!ret) 
                {
                    lp->bTimeout = false;
                    lp = active->head;
                    continue;
                }
            }
            if( lp->pfnProcess )
            {
                lp->pfnProcess( lp );
            }

            lp = active->head;
        }
        if( pfn )
        {
            if( -1 == pfn( arg ) )
            {
                break;
            }
        }
    }
}

具體步驟以下:

  1. 調用epoll_wait等待監聽的事件
  2. 將stTimeout_t中的timeout鏈表清空
  3. 若epoll中有數據,則將對應的事件加入到stTimeout_t的active鏈表中;同時將timeout數組鏈表中刪除本事件的超時事件
  4. 遍歷timout數組鏈表,將已經超時的事件加入到timeout鏈表中
  5. 將timeout鏈表中的全部事件置爲超時事件,須要後續特殊處理;同時將timeout鏈表合併到active鏈表
  6. 遍歷active鏈表,對超時事件且當前時間未超過超時時間的,從新將其加入到timeout數組鏈表中,這就解決了上面超時時間超過60s的問題;對其餘的事件進行處理

至此,整個libco庫的事件監聽的分析已經完成。

5. 總結

每一個網絡框架都會有一個相似eventloop的函數,用於輪詢註冊的io事件,libco庫也不例外,輪詢就是比較簡單粗暴,可是又是頗有效果。libco庫將socket相關的函數都進行了hook,使得調用者可使用同步的方法進行編碼,卻可以異步的執行。

相關文章
相關標籤/搜索