代碼來源
libco是微信網絡框架svrkit/summer的協程庫。在網絡IO操做較多的服務下,協程可以幫助提升服務的併發。在進行網絡io操做的時候,讓出cpu,服務更多的請求。數組
重要的數據結構
更大的圖,見https://drive.google.com/file/d/1hyxb-5kwo2ezX8iRj5wjh0dJNy5oZXjv/view?usp=sharing微信
協程上下文
上下文定義,實際上,在這裏對於X86_64系統,定義了14個寄存器,關於應該保存的寄存器應該有多少,在這篇文章用戶態調度要保存些什麼中有寫。網絡
struct coctx_t { #if defined(__i386__) void *regs[ 8 ]; #else // 14個寄存器 void *regs[ 14 ]; #endif size_t ss_size; // char *ss_sp; };
對於協程的抽象,有以下幾個接口:數據結構
// 將整個結構體初始化0 int coctx_init( coctx_t *ctx ) { memset( ctx,0,sizeof(*ctx)); return 0; } typedef void* (*coctx_pfn_t)( void* s, void* s2 ); // 真正的初始化協程上下文 int coctx_make( coctx_t *ctx, coctx_pfn_t pfn, const void *s, const void *s1 ) { // 棧頂的指針是從高地址到低地址 char *sp = ctx->ss_sp + ctx->ss_size; sp = (char*) ((unsigned long)sp & -16LL ); // 初始化全部寄存器內容爲0 memset(ctx->regs, 0, sizeof(ctx->regs)); ctx->regs[ kRSP ] = sp - 8; // 返回函數地址 ctx->regs[ kRETAddr] = (char*)pfn; // 設置第一個參數 ctx->regs[ kRDI ] = (char*)s; // 設置第二個參數 ctx->regs[ kRSI ] = (char*)s1; return 0; },
看看寄存器在ctx->regs的分配狀況併發
//------------- // 64 bit //low | regs[0]: r15 | // | regs[1]: r14 | // | regs[2]: r13 | // | regs[3]: r12 | // | regs[4]: r9 | // | regs[5]: r8 | // | regs[6]: rbp | // | regs[7]: rdi | // | regs[8]: rsi | // | regs[9]: ret | //ret func addr // | regs[10]: rdx | // | regs[11]: rcx | // | regs[12]: rbx | //hig | regs[13]: rsp |
咱們看看各個寄存器分配使用狀況: 框架
看看定義的協程接口:函數
int co_create( stCoRoutine_t **co,const stCoRoutineAttr_t *attr, void *(*routine)(void*),void *arg ); void co_resume( stCoRoutine_t *co ); void co_yield( stCoRoutine_t *co ); void co_yield_ct(); //ct = current thread void co_release( stCoRoutine_t *co );
咱們看一下協程定義的環境結構體:google
struct stCoRoutineEnv_t { stCoRoutine_t *pCallStack[ 128 ]; // 協程棧,棧頂元素指向當前正在運行的協程 int iCallStackSize; // 棧的大小 stCoEpoll_t *pEpoll; //for copy stack log lastco and nextco stCoRoutine_t* pending_co; stCoRoutine_t* occupy_co; };
咱們看看co_create的使用spa
int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr, pfn_co_routine_t pfn,void *arg ) { // 獲取當前進程的環境,若是該進程沒有,那麼分配內存, if( !co_get_curr_thread_env() ) { // 這裏是分配stCroutine_t的空間 co_init_curr_thread_env(); } // 這裏應該初始化stCoRoutine的 stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg ); *ppco = co; return 0; } stCoRoutineEnv_t *co_get_curr_thread_env() { return g_arrCoEnvPerThread[ GetPid() ]; }
下面的代碼是用來初始化一個協程線程
struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr, pfn_co_routine_t pfn, void *arg ) { stCoRoutineAttr_t at; if ( attr ) { memcpy( &at,attr,sizeof(at) ); } if( at.stack_size <= 0 ) { // 默認使用 128 K 字節 at.stack_size = 128 * 1024; } // 最多使用8M else if( at.stack_size > 1024 * 1024 * 8 ) { at.stack_size = 1024 * 1024 * 8; } // 這裏應該是地址對齊 if( at.stack_size & 0xFFF ) { at.stack_size &= ~0xFFF; at.stack_size += 0x1000; } // 建立協程結構體 stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) ); memset( lp,0,(long)(sizeof(stCoRoutine_t))); // 設置協程的環境 lp->env = env; lp->pfn = pfn; lp->arg = arg; stStackMem_t* stack_mem = NULL; if( at.share_stack ) { stack_mem = co_get_stackmem( at.share_stack); at.stack_size = at.share_stack->stack_size; } else { stack_mem = co_alloc_stackmem(at.stack_size); } // 分配棧空間 lp->stack_mem = stack_mem; // 修改上下文的棧地址 lp->ctx.ss_sp = stack_mem->stack_buffer; // 修改棧大小 lp->ctx.ss_size = at.stack_size; lp->cStart = 0; lp->cEnd = 0; lp->cIsMain = 0; lp->cEnableSysHook = 0; lp->cIsShareStack = at.share_stack != NULL; lp->save_size = 0; lp->save_buffer = NULL; return lp; }
co_resume源碼,這個是重點,表示切換協程
void co_resume( stCoRoutine_t *co ) { // 獲取當前協程所在線程的環境 stCoRoutineEnv_t *env = co->env; // 獲取當前運行的協程 stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ]; // 若是要切換的協程沒有開始運行過 if( !co->cStart ) { // 初始化協程的上下文 coctx_make( &co->ctx, (coctx_pfn_t)CoRoutineFunc, co, 0 ); co->cStart = 1; } // 設置當前運行的協程爲co env->pCallStack[ env->iCallStackSize++ ] = co; // 切換上下文,執行co co_swap( lpCurrRoutine, co ); }
co_swap的定義
void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co);
實現:
// 交換上下文,這是關鍵 void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co) { // 獲取當前線程的環境變量 stCoRoutineEnv_t* env = co_get_curr_thread_env(); // get curr stack sp char c; // 獲取當前線程棧底指針 curr->stack_sp= &c; if (!pending_co->cIsShareStack) { env->pending_co = NULL; env->occupy_co = NULL; } else { // 設置下一個協程 env->pending_co = pending_co; //get last occupy co on the same stack mem stCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co; //set pending co to occupy thest stack mem; // 設置當前棧空間的協程爲pending_co pending_co->stack_mem->occupy_co = pending_co; // 設置以前的協程,記錄下來 env->occupy_co = occupy_co; if (occupy_co && occupy_co != pending_co) { // 拷貝共享棧中的棧空間到本身的私有棧。 save_stack_buffer(occupy_co); } } // swap context // 並執行pending_co->ctx coctx_swap(&(curr->ctx),&(pending_co->ctx) ); // stack buffer may be overwrite, so get again; stCoRoutineEnv_t* curr_env = co_get_curr_thread_env(); stCoRoutine_t* update_occupy_co = curr_env->occupy_co; stCoRoutine_t* update_pending_co = curr_env->pending_co; // 切進來的協程執行完畢,要將以前在save_buffer中保存協程上下文恢復過來。 if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co) { //resume stack buffer if (update_pending_co->save_buffer && update_pending_co->save_size > 0) { memcpy(update_pending_co->stack_sp, update_pending_co->save_buffer, update_pending_co->save_size); } } }
棧空間的保存:
void save_stack_buffer(stCoRoutine_t* occupy_co) { ///copy out stStackMem_t* stack_mem = occupy_co->stack_mem; int len = stack_mem->stack_bp - occupy_co->stack_sp; // 以前已經保存過,那麼釋放以前保存的上下文。 if (occupy_co->save_buffer) { // 刪除釋放 free(occupy_co->save_buffer), occupy_co->save_buffer = NULL; } occupy_co->save_buffer = (char*)malloc(len); //malloc buf; occupy_co->save_size = len; // 將當前棧空間的內容拷貝過來。注意這裏的是將當前的棧空間的內容保存到 // save_buffer只能,這裏要更重的是stack_bp是何時改變的。 memcpy(occupy_co->save_buffer, occupy_co->stack_sp, len); }
上下文切換
對於這部分的代碼其實是彙編寫的,咱們看看
.globl coctx_swap #if !defined( __APPLE__ ) && !defined( __FreeBSD__ ) .type coctx_swap, @function #endif coctx_swap: #if defined(__i386__) ..... #elif defined(__x86_64__) leaq 8(%rsp),%rax leaq 112(%rdi),%rsp pushq %rax pushq %rbx pushq %rcx pushq %rdx pushq -8(%rax) //ret func addr pushq %rsi pushq %rdi pushq %rbp pushq %r8 pushq %r9 pushq %r12 pushq %r13 pushq %r14 pushq %r15 movq %rsi, %rsp popq %r15 popq %r14 popq %r13 popq %r12 popq %r9 popq %r8 popq %rbp popq %rdi popq %rsi popq %rax //ret func addr popq %rdx popq %rcx popq %rbx popq %rsp pushq %rax xorl %eax, %eax ret #endif
leaq 用於把其第一個參數的值賦值給第二個寄存器參數。第一條語句用來把 8(%rsp) 的自己的值存入到 %rax 中,注意這裏使用的並非 8(%rsp) 指向的值,而是把 8(%rsp) 表示的地址賦值給了 %rax。這一地址是父函數棧幀中除返回地址外棧幀頂的位置。
在第二條語句leaq 112(%rdi), %rsp中,%rdi 存放的是coctx_swap第一個參數的值,這一參數是指向 coctx_t 類型的指針,表示當前要切出的協程,這一類型的定義以下:
struct coctx_t { #if defined(__i386__) void *regs[ 8 ]; #else // 14個寄存器 void *regs[ 14 ]; #endif size_t ss_size; // char *ss_sp; };
於是 112(%rdi) 表示的就是第一個協程的 coctx_t 中 regs[14] 數組的下一個64位地址。而接下來的語句:
pushq %rax pushq %rbx pushq %rcx pushq %rdx pushq -8(%rax) //ret func addr pushq %rsi pushq %rdi pushq %rbp pushq %r8 pushq %r9 pushq %r12 pushq %r13 pushq %r14 pushq %r15
第一條語句 pushq %rax 用於把 %rax 的值放入到 regs[13] 中,resg[13] 用來存儲第一個協程的 %rsp 的值。這時 %rax 中的值是第一個協程 coctx_swap 父函數棧幀除返回地址外棧幀頂的地址。因爲 regs[] 中有單獨的元素存儲返回地址,棧中再保存返回地址是無心義的,於是把父棧幀中除返回地址外的棧幀頂做爲要保存的 %rsp 值是合理的。當協程恢復時,把保存的 regs[13] 的值賦值給 %rsp 便可恢復本協程 coctx_swap 父函數堆棧指針的位置。第一條語句以後的語句就是用pushq 把各CPU 寄存器的值依次從 regs 尾部向前壓入。即經過調整%rsp 把 regs[14] 看成堆棧,而後利用 pushq 把寄存器的值和返回地址存儲到 regs[14] 整個數組中。regs[14] 數組中各元素與其要存儲的寄存器對應關係以下:
//------------- // 64 bit //low | regs[0]: r15 | // | regs[1]: r14 | // | regs[2]: r13 | // | regs[3]: r12 | // | regs[4]: r9 | // | regs[5]: r8 | // | regs[6]: rbp | // | regs[7]: rdi | // | regs[8]: rsi | // | regs[9]: ret | //ret func addr, 對應 rax // | regs[10]: rdx | // | regs[11]: rcx | // | regs[12]: rbx | //hig | regs[13]: rsp |
接下來的彙編語句:
movq %rsi, %rsp popq %r15 popq %r14 popq %r13 popq %r12 popq %r9 popq %r8 popq %rbp popq %rdi popq %rsi popq %rax //ret func addr popq %rdx popq %rcx popq %rbx popq %rsp
這裏用的方法仍是經過改變%rsp 的值,把某塊內存看成棧來使用。第一句 movq %rsi, %rsp 就是讓%rsp 指向 coctx_swap 第二個參數,這一參數表示要進入的協程。而第二個參數也是coctx_t 類型的指針,即執行完 movq 語句後,%rsp 指向了第二個參數 coctx_t 中 regs[0],而以後的pop 語句就是用 regs[0-13] 中的值填充cpu 的寄存器,這裏須要注意的是popq 會使得 %rsp 的值增長而不是減小,這一點保證了會從 regs[0] 到regs[13] 依次彈出到 cpu 寄存器中。在執行完最後一句 popq %rsp 後,%rsp 已經指向了新協程要恢復的棧指針(即新協程以前調用 coctx_swap 時父函數的棧幀頂指針),因爲每一個協程都有一個本身的棧空間,能夠認爲這一語句使得%rsp 指向了要進入協程的棧空間。
coctx_swap 中最後三條語句以下:
pushq %rax xorl %eax, %eax ret
pushq %rax 用來把**%rax 的值壓入到新協程的棧中**,這時 %rax 是要進入的目標協程的返回地址,即要恢復的執行點。而後用 xorl 把 %rax 低32位清0以實現地址對齊。最後ret 語句用來彈出棧的內容,並跳轉到彈出的內容表示的地址處,而彈出的內容正好是上面 pushq %rax 時壓入的 %rax 的值,即以前保存的此協程的返回地址。即最後這三條語句實現了轉移到新協程返回地址處執行,從而完成了兩個協程的切換。能夠看出,這裏經過調整%rsp 的值來恢復新協程的棧,並利用了 ret 語句來實現修改指令寄存器 %rip 的目的,經過修改 %rip 來實現程序運行邏輯跳轉。注意%rip 的值不能直接修改,只能經過 call 或 ret 之類的指令來間接修改。