前面的文章Hook系統函數 中介紹了微信使用的協程庫libco
,用於改造原有同步系統,利用協程實現系統的異步化,以支撐更大的併發,抵抗網絡抖動帶來的影響,同時代碼層面更加簡潔。git
libco庫經過僅有的幾個函數接口 co_create/co_resume/co_yield 再配合 co_poll,能夠支持同步或者異步的寫法,如線程庫同樣輕鬆。同時庫裏面提供了socket族函數的hook,使得後臺邏輯服務幾乎不用修改邏輯代碼就能夠完成異步化改造。
下面咱們來看一下libco
庫是如何實現協程的。github
在瞭解微信是如何實現協程以前,先了解一下stCoRoutime_t
的數據結構,該類型定義了協程的相關變量,具體可參見如下代碼的註釋。編程
struct stCoRoutine_t { stCoRoutineEnv_t *env; //協程的運行context pfn_co_routine_t pfn; // 協程的入口函數 void *arg; // 入口函數的參數 coctx_t ctx; // 保存了協程的上下文信息, 包括寄存器,棧的相關信息,用於恢復現場 char cStart; char cEnd; char cIsMain; char cEnableSysHook; char cIsShareStack; void *pvEnv; //char sRunStack[ 1024 * 128 ]; stStackMem_t* stack_mem; //save satck buffer while confilct on same stack_buffer; char* stack_sp; unsigned int save_size; char* save_buffer; stCoSpec_t aSpec[1024]; };
該結構體中,咱們只須要記住stCoRoutineEnv_t
,coctx_t
,pfn_co_routine_t
等幾個簡單的參數便可,其餘的參數能夠暫時忽略。其餘的信息主要是用於共享棧模式,這個模式咱們後續再討論。segmentfault
協程之於線程,至關於線程之於進程,一個進程能夠包含多個線程,而一個線程中能夠包含多個協程。線程中用於管理協程的結構體爲stCoRoutineEnv_t
,它在該線程中第一個協程建立的時候進行初始化。
每一個線程中都只有一個stCoRoutineEnv_t
實例,線程能夠經過該stCoRoutineEnv_t
實例瞭解如今有哪些協程,哪一個協程正在運行,以及下一個運行的協程是哪一個。微信
struct stCoRoutineEnv_t { stCoRoutine_t *pCallStack[ 128 ]; // 保存當前棧中的協程 int iCallStackSize; // 表示當前在運行的協程的下一個位置,即cur_co_runtine_index + 1 stCoEpoll_t *pEpoll; //用於協程時間片切換 //for copy stack log lastco and nextco stCoRoutine_t* pending_co; stCoRoutine_t* occupy_co; };
pCallStack[ 128 ]
這個表示協程棧最大爲128,當協程切換時,棧頂的協程就被pop出來了,所以一個線程能夠建立的協程數是能夠超過128個的,你們大膽用起來。網絡
void co_init_curr_thread_env() { pid_t pid = GetPid(); g_arrCoEnvPerThread[ pid ] = (stCoRoutineEnv_t*)calloc( 1,sizeof(stCoRoutineEnv_t) ); stCoRoutineEnv_t *env = g_arrCoEnvPerThread[ pid ]; env->iCallStackSize = 0; struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL ); self->cIsMain = 1; env->pending_co = NULL; env->occupy_co = NULL; coctx_init( &self->ctx ); env->pCallStack[ env->iCallStackSize++ ] = self; stCoEpoll_t *ev = AllocEpoll(); SetEpoll( env,ev ); }
初始化所作的事情主要是:數據結構
g_arrCoEnvPerThread
中對應於threadId的位置,這裏的GetPid()
實際上是getThreadId()
,你們不要被這個函數名給誤導了。Env_t
的main routine,用於運行該線程的主邏輯Env_t
信息初始化完畢後,將使用co_create_env
真正實現第一個協程的建立:
如今讓咱們來看一下co_create_env
的實現步驟:併發
stCoRoutine_t
結構體中的運行函數相關信息,函數入口和函數參數等co_create
建立和初始化協程相關的信息後,使用co_resume
將其啓動起來:異步
void co_resume( stCoRoutine_t *co ) { stCoRoutineEnv_t *env = co->env; stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ]; //獲取棧頂的協程 if( !co->cStart ) { coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 ); // 將即將運行的協程設置上下文信息 co->cStart = 1; } env->pCallStack[ env->iCallStackSize++ ] = co; co_swap( lpCurrRoutine, co ); }
co_swap
中主要作的事情是保存當前協程棧的信息,而後再切換協程上下文信息的切換,其餘共享棧的此處先不關心。socket
對應於co_resume
的co_yield
函數是爲了讓協程有主動放棄運行的權利。前面介紹到iCallStackSize指向 curIndex+1,所以,co_yield
是將當前運行的協程的上下文信息保存到curr
中,並切換到last
中執行。
void co_yield_env( stCoRoutineEnv_t *env ) { stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ]; stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ]; env->iCallStackSize--; co_swap( curr, last); }
協程上下文信息的結構體中包括了保存了上次退出時的寄存器信息,以及棧信息。此處咱們只討論32位系統的實現,你們對86的系統仍是比較熟悉一些。
struct coctx_t { #if defined(__i386__) void *regs[ 8 ]; #else void *regs[ 14 ]; #endif size_t ss_size; char *ss_sp; };
在介紹協程上下文切換前,咱們必須瞭解c函數調用時的棧幀的變化。若是這一塊不熟悉的話,須要本身先補一補課。
經過上圖,咱們把整個函數流程梳理一下,棧的維護是調用者Caller和被調用者Callee共同維護的。
push %ebp; mov %esp, %ebp
指令設置當前的棧底指針;並分配局部變量的棧空間mov %ebp, %esp;pop %ebp;
指令,將原來的ebp寄存器恢復,而後再調用ret
指令(至關於pop %eip
),並將返回地址pop到eip寄存器中瞭解這些後,咱們先看一下協程上下文coctx_t
的初始化:
int coctx_make( coctx_t *ctx,coctx_pfn_t pfn,const void *s,const void *s1 ) { //make room for coctx_param // 獲取(棧頂 - param size)的指針,棧頂和sp指針之間用於保存函數參數 char *sp = ctx->ss_sp + ctx->ss_size - sizeof(coctx_param_t); sp = (char*)((unsigned long)sp & -16L); // 用於16位對齊 coctx_param_t* param = (coctx_param_t*)sp ; param->s1 = s; param->s2 = s1; memset(ctx->regs, 0, sizeof(ctx->regs)); // 爲何要 - sizeof(void*)呢? 用於保存返回地址 ctx->regs[ kESP ] = (char*)(sp) - sizeof(void*); ctx->regs[ kEIP ] = (char*)pfn; return 0; }
這段代碼主要是作了什麼呢?
coctx_pfn_t
函數預留2個參數的大小,並4位地址對齊regs[kEIP]
中保存了pfn
的地址,regs[kESP]
中則保存了棧頂指針 - 4個字節的大小的地址。這預留的4個字節用於保存return address
。如今咱們來看下協程切換的核心coctx_swap
,這個函數是使用匯編實現的。主要分爲保存原來的棧空間,並恢復現有的棧空間兩個步驟。
先看一下執行彙編程序前的棧幀狀況。esp
寄存器指向return address
。
咱們先看一下當前棧空間的保存
//----- -------- // 32 bit // | regs[0]: ret | // | regs[1]: ebx | // | regs[2]: ecx | // | regs[3]: edx | // | regs[4]: edi | // | regs[5]: esi | // | regs[6]: ebp | // | regs[7]: eax | = esp coctx_swap: leal 4(%esp), %eax // eax = esp + 4 movl 4(%esp), %esp // esp = *(esp+4) = &cur_ctx leal 32(%esp), %esp // parm a : ®s[7] + sizeof(void*) // esp=®[7]+sizeof(void*) pushl %eax // cur_ctx->regs[ESP] = %eax = returnAddress + 4 pushl %ebp // cur_ctx->regs[EBX] = %ebp pushl %esi // cur_ctx->regs[ESI] = %esi pushl %edi // cur_ctx->regs[EDI] = %edi pushl %edx // cur_ctx->regs[EDX] = %edx pushl %ecx // cur_ctx->regs[ECX] = %ecx pushl %ebx // cur_ctx->regs[EBX] = %ebx pushl -4(%eax) // cur_ctx->regs[EIP] = return address
首先須要理解 leal
和movl
的區別,leal
是將算術值賦值給目標寄存器,movl 4(%esp)
則是將esp+4
算出來的值做爲地址,取該地址的值賦值給目標寄存器。movl 4(%esp), %esp
是將cur_ctx
的地址賦值給esp
。
下面是恢復pend_ctx
中的寄存器信息到cpu
寄存器中
movl 4(%eax), %esp //parm b -> ®s[0] // esp=&pend_ctx popl %eax //%eax= pend_ctx->regs[EIP] = pfunc_t地址 popl %ebx //%ebx = pend_ctx->regs[EBX] popl %ecx //%ecx = pend_ctx->regs[ECX] popl %edx //%edx = pend_ctx->regs[EDX] popl %edi //%edi = pend_ctx->regs[EDI] popl %esi //%esi = pend_ctx->regs[ESI] popl %ebp //%ebp = pend_ctx->regs[EBP] popl %esp //%ebp = pend_ctx->regs[ESP] 即 (char*) sp - sizeof(void*) pushl %eax //set ret func addr // return address = %eax = pfunc_t地址 xorl %eax, %eax ret // popl %eip 即跳轉到pfunc_t地址執行
若是是第一次執行coctx_swap
,則這部分彙編代碼就須要結合前面coctx_make
一塊兒來閱讀。
esp
指向pend_ctx
的地址regs
寄存器中的值恢復到cpu寄存器中,須要再看一下coctx_make
中的相關代碼,regs[kEIP]
和regs[kESP]
恢復到eip
和esp
中ret
指令至關於pop %eip
,所以eip
指向了pfunc_t
地址,從而開始執行協程設置的入口函數。若是是將原來已存在的協程恢復,則這部分代碼則須要根據前面保存寄存器信息的彙編代碼來一塊兒閱讀,將esp
恢復到原始位置,並將 eip
恢復成returnAddress
。
pushl %eax // cur_ctx->regs[ESP] = %eax = returnAddress + 4 pushl -4(%eax) // cur_ctx->regs[EIP] = return address
最後的棧以下圖所示:
理解這些代碼須要瞭解棧幀的建立和恢復,以及一些彙編的簡單代碼,若有不瞭解,須要善用google。關於協程的建立和管理就介紹到這裏,後續將繼續介紹協程的時間片以及共享棧的相關內容,敬請期待。