libco協程庫源碼解讀

    協程,又被稱爲用戶級線程,是在應用層被調度,能夠減小由於調用系統調用而阻塞的線程切換的時間.目前有不少協程的實現,因爲微信內部大量使用了其直研的的libco協程庫,因此我選擇了騰訊開源的libco協程庫進行研究,學習協程的基本思想.html

1,基本原理微信

    協程實質上能夠當作是子程序、函數。一個線程上面能夠運行多個協程,可是同一時間只能運行一個協程,協程在線程上的切換,是因爲遇到阻塞的操做,或者主動讓出線程使用權。好比,有10個協程,當前線程正在運行協程1,而後協程1執行一個recv的阻塞操做,協程的調度器可以檢測到這個操做,將協程1切換出去,將協程2調度進來執行。若是沒有協程的調度器,此時協程1將會因爲調用recv這個系統調用且數據未到達而阻塞,進行休眠,此時操做系統將會發生線程切換,調度其餘線程執行,而線程切換很是耗時,高達幾十微秒(同事測試是20us),即使新執行的線程是用戶任務相關的,用戶任務也會多了幾十微秒的線程切換的消耗。而若是使用協程,協程之間的切換隻須要幾百納秒(同事測試爲0.35us,即350納秒),耗時不多。這就是協程發揮優點的地方。網絡

    下面講解libco的源碼部分,有一篇文章:C++開源協程庫libco-原理與應用.pdf,很是深刻的講解了libco的原理,並且不枯燥,十分推薦讀者先看看這篇文章。框架

    因爲libco是非對稱的協程機制,若是從當前協程A切換到協程B,而協程B又沒有切換到下一個協程,在協程B執行結束以後,會返回到協程A執行。異步

2,libco基本框架函數

    libco中的基本框架以下(引自C/C++協程庫libco:微信怎樣漂亮地完成異步化改造):學習

協程接口層實現了協程的基本源語。co_create、co_resume等簡單接口負責協程建立於恢復。co_cond_signal類接口能夠在協程間建立一個協程信號量,可用於協程間的同步通訊。測試

系統函數Hook層負責主要負責系統中同步API到異步執行的轉換。對於經常使用的同步網絡接口,Hook層會把本次網絡請求註冊爲異步事件,而後等待事件驅動層的喚醒執行。操作系統

事件驅動層實現了一個簡單高效的異步網路框架,裏面包含了異步網絡框架所須要的事件與超時回調。對於來源於同步系統函數Hook層的請求,事件註冊與回調實質上是協程的讓出與恢復執行。線程

本文經過講解接口層的幾個主要函數,使讀者對libco協程的框架和原理有一個大概的認識,下一篇文章將會講解libco如何處理事件循環等。

下面咱們從幾個主要的協程函數一一分析。

3,主要函數源碼解析

  • co_create     首先來開一下協程建立的函數,源碼以下:
int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{
	if( !co_get_curr_thread_env() ) 
	{
		co_init_curr_thread_env();
	}
	stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
	*ppco = co;
	return 0;
}
void co_init_curr_thread_env()
{
	pid_t pid = GetPid();	
	g_arrCoEnvPerThread[ pid ] = (stCoRoutineEnv_t*)calloc( 1,sizeof(stCoRoutineEnv_t) );
	stCoRoutineEnv_t *env = g_arrCoEnvPerThread[ pid ];

	env->iCallStackSize = 0;
	struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL );
	self->cIsMain = 1;

	env->pending_co = NULL;
	env->occupy_co = NULL;

	coctx_init( &self->ctx );

	env->pCallStack[ env->iCallStackSize++ ] = self;

	stCoEpoll_t *ev = AllocEpoll();
	SetEpoll( env,ev );
}

        co_create()的第一行判斷是當前線程初始化環境變量的判斷,若是沒進行環境初始化,那麼調用co_init_curr_thread_env() 進行環境初始化,會生成當前環境g_arrCoEnvPerThread[ GetPid() ]的第一個協程 env->pCallStack,其 cIsMain 標誌位 1,iCallStackSize表示協程層數,目前只有1層,AllocEpoll()函數中初始化當前環境env的 pstActiveList,pstTimeoutList 這兩個列表,這兩個列表分別記錄了活動協程和超時協程。環境初始化操做在一個線程中只會進行一次。在初始化完成以後,會調用co_create_env()建立一個新的協程,新協程的結構體中的env這個域始終指向當前協程環境g_arrCoEnvPerThread[ GetPid() ]。新協程建立以後,並無作什麼操做。

  • co_resume
    void co_resume( stCoRoutine_t *co )
    {
    	stCoRoutineEnv_t *env = co->env;
    	stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
    	if( !co->cStart )
    	{
    		coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
    		co->cStart = 1;
    	}
    	env->pCallStack[ env->iCallStackSize++ ] = co;
    	co_swap( lpCurrRoutine, co );
    }
    co_resume()函數是切換協程的函數,也能夠稱爲是啓動協程的函數。co_resume()函數的第一行是獲取當前線程的協程環境env,第二行獲取當前正在執行的協程,也即立刻要被切換出去的協程。接下來判斷待切換的協程co是否已經被切換過,若是沒有,那麼爲co準備上下文,cStart字段設置爲1。這裏爲co準備的上下文,就是在coctx_make()函數裏面,這個函數將函數指針CoRoutineFunc賦值給co->ctx的reg[0],未來上下文切換的時候,就能切換到reg[0]所指向的地址去執行.準備好co的上下文以後,而後將待切換的協程co入棧,置於協程環境env的協程棧的頂端,代表當前最新的協程是co。注意,這並非說協程棧中只有棧頂纔是co,可能棧中某些位置也存了co。最後,調用co_swap(),該函數將協程上下文環境切換爲co的上下文環境,並進入co指定的函數內執行,以前被切換出去的協程被掛起,直到co主動yield,讓出cpu,纔會恢復被切換出去的協程執行.注意,這裏的全部的協程都是在當前協程執行的,也就是說,全部的協程都是串行執行的,調用co_resume()以後,執行上下文就跳到co的代碼空間中去了。由於co_swap()要等co主動讓出cpu纔會返回,而co的協程內部可能會resume新的協程繼續執行下去,因此co_swap()函數調用可能要等到很長時間才能返回。
    void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co)
    {
     	stCoRoutineEnv_t* env = co_get_curr_thread_env();
    
    	//get curr stack sp
    	char c;
    	curr->stack_sp= &c;
    
    	if (!pending_co->cIsShareStack)
    	{
    		env->pending_co = NULL;
    		env->occupy_co = NULL;
    	}
    	else 
    	{
    		env->pending_co = pending_co;
    		//get last occupy co on the same stack mem
    		stCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co;
    		//set pending co to occupy thest stack mem;
    		pending_co->stack_mem->occupy_co = pending_co;
    
    		env->occupy_co = occupy_co;
    		if (occupy_co && occupy_co != pending_co)
    		{
    			save_stack_buffer(occupy_co);
    		}
    	}
    
    	//swap context
    	coctx_swap(&(curr->ctx),&(pending_co->ctx) );
    	//stack buffer may be overwrite, so get again;
    	stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();
    	stCoRoutine_t* update_occupy_co =  curr_env->occupy_co;
    	stCoRoutine_t* update_pending_co = curr_env->pending_co;
    	
    	if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co)
    	{
    		//resume stack buffer
    		if (update_pending_co->save_buffer && update_pending_co->save_size > 0)
    		{
    			memcpy(update_pending_co->stack_sp, update_pending_co->save_buffer, update_pending_co->save_size);
    		}
    	}
    }
    在co_swap()函數代碼中,因爲libco不是共享棧的模式,即pending_co->cIsShareStack爲0,因此執行了if分支,接下來執行coctx_swap(),這是一段彙編源碼,內容就是從curr的上下文跳轉到pending_co的上下文中執行,經過回調CoRoutineFunc()函數實現,此時當前線程的cpu已經開始執行pending_co協程中的代碼,直到pending_co主動讓出cpu,才接着執行coctx_swap()下面的代碼,因爲update_occupy_co爲NULL,下面的if語句沒有執行,因此至關於coctx_swap()下面沒有代碼,直接返回到curr協程中.
  • co_yield
    co_yield()與co_yield_ct()的功能是同樣的,都是使得當前協程讓出cpu.
    void co_yield_env( stCoRoutineEnv_t *env )
    {
    	
    	stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];
    	stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];
    
    	env->iCallStackSize--;
    
    	co_swap( curr, last);
    }
    co_yield_env()函數中的第二行獲取當前執行的協程,也即當前協程環境的協程棧的棧頂,函數的第一行獲取協程棧的次頂,也即上一次被切換的協程last,從這裏也能夠看出,libco的協程讓出cpu,只能讓給上一次被切換出去的協程.最後一行是co_swap()函數,前面講到,該函數會進入last協程的上下文去執行代碼,也就是回到上次co_resume()函數內部的co_swap()的地方,繼續往下走.
    當協程正常結束的時候,會繼續執行CoRoutineFunc()函數,將協程的cEnd設置爲1,表示已經結束,並執行一次co_yield_env(),讓出cpu,切換回上一次被讓出的協程繼續執行.
    這裏有一點我以前不太理解,懷疑會發生棧溢出的地方,那就是在調用co_yield_env(),進入co_swap()以後,調用coctx_swap(),切換到上一次的last協程的上下文,那麼當前協程的co_swap()函數裏面的變量,都是在棧空間上面的,切換到last協程的上下文以後,那些變量依然在棧空間上面,不會被銷燬,直到回到了main函數的協程,仍是沒有被銷燬。其實這是個誤區,這些變量其實不是在棧空間上面,而是在CPU的通用寄存器裏面,當調用coctx_swap()以後,這些寄存器變量就會保存到當前協程的棧空間中去,實際上是咱們以前co_create()函數malloc出來的一片堆空間。這是由於cpu的工做寄存器數量較多,而局部變量較少,而co_swap()函數的變量都是局部變量,直接存放在cpu的工做寄存器中,而coctx_swap()的做用就是將CPU的各個通用寄存器保存到coctx_t結構的regs[1] ~ regs[6]的位置,而後將last協程的coctx_t結構的regs[1]~regs[6]的內容加載到當前的通用寄存器中,並將執行cpu的執行順序切換到last協程中去執行。
  • co_release
    co_release()的功能比較簡單,就是釋放資源
    void co_release( stCoRoutine_t *co )
    {
    	if( co->cEnd )
    	{
    		free( co );
    	}
    }
  • co_self
    co_self()函數是獲取當前正在執行的協程,只要獲取到當前協程環境的線程棧頂的協程便可。
    stCoRoutine_t *co_self()
    {
    	return GetCurrThreadCo();
    }
    stCoRoutine_t *GetCurrThreadCo( )
    {
    	stCoRoutineEnv_t *env = co_get_curr_thread_env();
    	if( !env ) return 0;
    	return GetCurrCo(env);
    }
    stCoRoutine_t *GetCurrCo( stCoRoutineEnv_t *env )
    {
    	return env->pCallStack[ env->iCallStackSize - 1 ];
    }
  • co_enable_hook_sys
    libco封裝了系統調用,在系統調用,好比send/recv/condition_wait等函數前面加了一層hook,有了這層hook就能夠在系統調用的時候不讓線程阻塞而產生線程切換,co_enable_hook_sys()函數容許協程hook,固然也能夠不容許hook,直接使用原生的系統調用。
    void co_enable_hook_sys()
    {
    	stCoRoutine_t *co = GetCurrThreadCo();
    	if( co )
    	{
    		co->cEnableSysHook = 1;
    	}
    }
相關文章
相關標籤/搜索