前面的兩篇文章中介紹了微信的libco庫如何hook系統函數和協程的建立和管理,本篇文章將介紹libco庫是進行事件管理的。java
libco庫使用一種相似時間片的技術進行輪詢,使得每一個註冊在其上的事件都有機會執行。segmentfault
在上一篇文章中介紹stCoRoutineEnv_t
時,咱們將stCoEpoll_t
這個結構跳過了,如今咱們來仔細分析下這個數據結構。數組
struct stCoEpoll_t { int iEpollFd; static const int _EPOLL_SIZE = 1024 * 10; struct stTimeout_t *pTimeout; //用於保存timeout item struct stTimeoutItemLink_t *pstTimeoutList; // 在後續的event_ loop中介紹 struct stTimeoutItemLink_t *pstActiveList; co_epoll_res *result; };
stCoEpoll_t
中主要保存了epoll監聽的fd,以及註冊在其中的超時事件。stTimeoutItem_t
實際上是libco庫實現的雙向鏈表,有prev和next指針,同時保存了鏈表指針。後面在使用過程當中再介紹stTimeout_t
。服務器
struct stTimeoutItem_t { enum { eMaxTimeout = 40 * 1000 //40s }; stTimeoutItem_t *pPrev; stTimeoutItem_t *pNext; stTimeoutItemLink_t *pLink; unsigned long long ullExpireTime; OnPreparePfn_t pfnPrepare; OnProcessPfn_t pfnProcess; void *pArg; // routine bool bTimeout; }; struct stTimeoutItemLink_t { stTimeoutItem_t *head; stTimeoutItem_t *tail; }; struct stTimeout_t { stTimeoutItemLink_t *pItems; int iItemSize; unsigned long long ullStart; long long llStartIdx; };
在上篇文章中,在初始化本線程的stCoRoutineEnv_t
時,在co_init_curr_thread_env
的最後,會調用AllocEpoll() => AllocTimeout()
方法,咱們看一下AllocTimeout中具體作了哪些事情。微信
stTimeout_t *AllocTimeout( int iSize ) { stTimeout_t *lp = (stTimeout_t*)calloc( 1,sizeof(stTimeout_t) ); lp->iItemSize = iSize; lp->pItems = (stTimeoutItemLink_t*)calloc( 1,sizeof(stTimeoutItemLink_t) * lp->iItemSize ); lp->ullStart = GetTickMS(); lp->llStartIdx = 0; return lp; }
下面以一個簡單的客戶端連接服務器的例子在說明在libco中是如何添加監聽事件的。網絡
fd = socket(PF_INET, SOCK_STREAM, 0); struct sockaddr_in addr; SetAddr(endpoint->ip, endpoint->port, addr); ret = connect(fd,(struct sockaddr*)&addr,sizeof(addr));
因爲在libco庫中hook了socket和connect的函數,所以,這個邏輯會調用poll
函數,最終將調用co_poll_inner
。下面介紹co_poll_inner
的具體邏輯。
第一步,先將epoll結構轉換成poll結構(不清楚爲何必定要轉換成poll類型,難道是爲了兼容性嗎?)數據結構
//1.struct change stPoll_t& arg = *((stPoll_t*)malloc(sizeof(stPoll_t))); memset( &arg,0,sizeof(arg) ); arg.iEpollFd = epfd; arg.fds = (pollfd*)calloc(nfds, sizeof(pollfd)); arg.nfds = nfds; stPollItem_t arr[2]; if( nfds < sizeof(arr) / sizeof(arr[0]) && !self->cIsShareStack) { arg.pPollItems = arr; } else { arg.pPollItems = (stPollItem_t*)malloc( nfds * sizeof( stPollItem_t ) ); } memset( arg.pPollItems,0,nfds * sizeof(stPollItem_t) ); arg.pfnProcess = OnPollProcessEvent; //記住這個函數,後續有用 arg.pArg = GetCurrCo( co_get_curr_thread_env() );//參數爲當前Env指針
第二步,將poll結構加入到epoll的監聽事件中
第三步,添加timeout事件框架
//3.add timeout unsigned long long now = GetTickMS(); arg.ullExpireTime = now + timeout; int ret = AddTimeout( ctx->pTimeout,&arg,now ); // 將本事件加入到timeout的指定鏈表中 int iRaiseCnt = 0; if( ret != 0 ) { co_log_err("CO_ERR: AddTimeout ret %d now %lld timeout %d arg.ullExpireTime %lld", ret,now,timeout,arg.ullExpireTime); errno = EINVAL; iRaiseCnt = -1; } else { co_yield_env( co_get_curr_thread_env() ); iRaiseCnt = arg.iRaiseCnt; }
在AllocTimeout
只初始化了60*1000(即60s)的鏈表數組,此時在AddTimeout
中,將根據本監聽事件的超時時間添加到對應的數組index中的鏈表中,是否是比較相似於java
中的HashMap
的實現方式?
這裏有個問題,若是超時時間超過了60s,那麼超時事件都會添加到當前index的前一個遊標處,至關於有可能61s,65s的超時事件都會在同一個timeout鏈表中,那麼會不會出現,因爲時間還沒到,而超時事件被處理呢?異步
AddTail( apTimeout->pItems + ( apTimeout->llStartIdx + diff ) % apTimeout->iItemSize , apItem );
添加完超時事件後,本協程調用co_yield_env
放棄執行,stRoutineEnv_t將會調用其餘的協程進行處理。socket
將事件都加入到timeout鏈表,以及註冊到epoll fd後,main 協程將調用co_eventloop
進行輪詢。
void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg ) { if( !ctx->result ) { ctx->result = co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE ); } co_epoll_res *result = ctx->result; for(;;) { // 1. 調用epoll_wait int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 ); stTimeoutItemLink_t *active = (ctx->pstActiveList); stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList); // 將timeout鏈表清空 memset( timeout,0,sizeof(stTimeoutItemLink_t) ); // 處理poll事件 for(int i=0;i<ret;i++) { stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr; if( item->pfnPrepare ) { // 這個函數基本是 OnPollPreparePfn // 在pollPreaprePfn中,將poll_inner中添加的timeout事件刪除,並添加到active list中 item->pfnPrepare( item,result->events[i],active ); } else { AddTail( active,item ); } } // 2. 將stTimeout_t中的timeout事件所有添加到timeout鏈表中 unsigned long long now = GetTickMS(); TakeAllTimeout( ctx->pTimeout,now,timeout ); // 設置其爲timeout事件 stTimeoutItem_t *lp = timeout->head; while( lp ) { //printf("raise timeout %p\n",lp); lp->bTimeout = true; lp = lp->pNext; } // 3. 添加timeoutList 到 active list Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout ); // 4. 對active list進行遍歷執行 lp = active->head; while( lp ) { PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active ); // 這裏會對timeout事件進行判斷,若時間不超時,仍然會將其加入到stTimeout_t的timeout數組隊列中 if (lp->bTimeout && now < lp->ullExpireTime) { int ret = AddTimeout(ctx->pTimeout, lp, now); if (!ret) { lp->bTimeout = false; lp = active->head; continue; } } if( lp->pfnProcess ) { lp->pfnProcess( lp ); } lp = active->head; } if( pfn ) { if( -1 == pfn( arg ) ) { break; } } } }
具體步驟以下:
至此,整個libco庫的事件監聽的分析已經完成。
每一個網絡框架都會有一個相似eventloop的函數,用於輪詢註冊的io事件,libco庫也不例外,輪詢就是比較簡單粗暴,可是又是頗有效果。libco庫將socket相關的函數都進行了hook,使得調用者可使用同步的方法進行編碼,卻可以異步的執行。