這篇文章主要經過源碼分析,介紹coobjc中的co調度。這個問題搞清楚以後,co_lauch
作了什麼,看起來就很簡單了。咱們先了解coroutine和scheduler這兩個關鍵的數據結構。bash
在協程的數據結構中和調度相關的字段。markdown
entry
: 須要執行的任務,最終指向的是co_launch(block)中的block。userdata
: 一個OC的類對象COCoroutine,這個對象持有coroutine這個數據結構,和它一一對應。context
: 是協程執行的當前上下文。pre_context
: 保存的是這個協程被掛起或者執行完成後須要回覆的上下文,coobjc經過切換上下文來實現函數的跳轉。scheduler
: co被scheduler的co_queue持有,scheduler是co調度的核心。struct coroutine { coroutine_func entry; // Process entry. void *userdata; // Userdata. void *context; // Coroutine, void *pre_context; // Coroutine's source process's Call stack data. struct coroutine_scheduler *scheduler; // The pointer to the scheduler. ... }; typedef struct coroutine coroutine_t; 複製代碼
coobjc經過yield,resume,add來操做co。這三個方法在co調度的時候也會被頻繁用到。數據結構
void coroutine_yield(coroutine_t *co) { if (co == NULL) { // if null co = coroutine_self(); } BOOL skip = false; coroutine_getcontext(co->context); if (skip) { return; } #pragma unused(skip) skip = true; co->status = COROUTINE_SUSPEND; coroutine_setcontext(co->pre_context); } 複製代碼
這個函數的做用是掛起協程。下面提到的main指的是scheduler中main_coroutine的入口函數 coroutine_scheduler_main
,im指的是coroutine_resume_im(coroutine_t *co)
這個函數會執行co的entry,main中有個for循環遍歷co_queue取出head經過im函數執行head的entry。關於scheduler下面會有詳細介紹。async
coroutine_setcontext(co->pre_context);
這一行可讓程序跳轉到coroutine_getcontext(co->pre_context)
這裏。一般狀況下的調用棧main()->im()->coroutine_getcontext(co->pre_context)
。當yeild執行的時候,程序跳轉到im函數中coroutine_getcontext(co->pre_context)
的這個位置,im函數會直接return,跳轉到main函數的for循環裏,繼續取出co_queue的head執行head的entry。當for循環再次執行到這個被掛起的co的時候,在im執行co entry 方法中,調用coroutine_setcontext(co->context)
,程序跳轉到 coroutine_yield()方法中的 coroutine_getcontext(co->context);
這一行,此時skip是yes。coroutine_yield()函數return。回到調用coroutine_yield()的地方。yield經過保存上下文,使得被掛起的co下次可以在以前的上下文環境下繼續執行。函數
void coroutine_resume(coroutine_t *co) { if (!co->is_scheduler) { coroutine_scheduler_t *scheduler = coroutine_scheduler_self_create_if_not_exists(); co->scheduler = scheduler; scheduler_queue_push(scheduler, co); if (scheduler->running_coroutine) { // resume a sub coroutine. scheduler_queue_push(scheduler, scheduler->running_coroutine); coroutine_yield(scheduler->running_coroutine); } else { // scheduler is idle coroutine_resume_im(co->scheduler->main_coroutine); } } } 複製代碼
coroutine_resume 這個方法是把co push 到scheduler的協程隊列裏面。若是當前有協程在運行的話,那個當前運行的協程就會被掛起,push到協程隊列裏面,若是co_queue中只有新添加進來的co和被掛起的co,此時新添加進來的co處於queue的head會被main函數的for循環取出執行entry。若是當前沒有協程在運行,就會執行scheduler中main_corroutine的entry函數,這個函數是一個for循環從隊列中讀取co,執行co的entry。添加到co_quue隊列中的co最終會被執行。後面的判斷若是沒有runing_coroutine,這個時候main_coroutine被掛起,for循環不執行,須要主動觸發一次main函數的調用coroutine_resume_im(co->scheduler->main_coroutine);
oop
void coroutine_add(coroutine_t *co) { if (!co->is_scheduler) { coroutine_scheduler_t *scheduler = coroutine_scheduler_self_create_if_not_exists(); co->scheduler = scheduler; if (scheduler->main_coroutine->status == COROUTINE_DEAD) { coroutine_close_ifdead(scheduler->main_coroutine); coroutine_t *main_co = coroutine_create(coroutine_scheduler_main); main_co->is_scheduler = true; main_co->scheduler = scheduler; scheduler->main_coroutine = main_co; } scheduler_queue_push(scheduler, co); if (!scheduler->running_coroutine) { coroutine_resume_im(co->scheduler->main_coroutine); } } } 複製代碼
這個方法把當前co添加到scheduler協程隊列裏面。若是main_coroutine的狀態是dead,會建立一個main_coroutine,coroutine_t *main_co = coroutine_create(coroutine_scheduler_main);
這裏能夠看到main_coroutine的entry指向的是coroutine_scheduler_main
這個函數下面還會講到,做用就是前面一直在說的for循環。沒有當前沒有running_coroutine會主動觸發main函數。源碼分析
scheduler是協程調度的核心。spa
struct coroutine_scheduler {
coroutine_t *main_coroutine;
coroutine_t *running_coroutine;
coroutine_list_t coroutine_queue;
};
typedef struct coroutine_scheduler coroutine_scheduler_t;
struct coroutine_list {
coroutine_t *head;
coroutine_t *tail;
};
typedef struct coroutine_list coroutine_list_t;
複製代碼
main_coroutine
:它的entry指向 coroutine_scheduler_main
函數,相似於線程中的runloop提供一個for循環,不斷讀取協程隊列中的head,執行head的入口函數,隊列爲空的時候main_coroutine會被掛起。running_coroutine
: 用來記錄當前正在運行中的協程,獲取或者掛起當前協程都會用到這個字段。coroutine_queue
: 是一個雙向鏈表,用來保存添加到當前scheduler的協程,當協程的入口函數執行完成後,scheduler會把它從鏈表中清除。scheduler
的建立過程。//scheduler 的建立 coroutine_scheduler_t *coroutine_scheduler_self_create_if_not_exists(void) { if (!coroutine_scheduler_key) { pthread_key_create(&coroutine_scheduler_key, coroutine_scheduler_free); } void *schedule = pthread_getspecific(coroutine_scheduler_key); if (!schedule) { schedule = coroutine_scheduler_new(); pthread_setspecific(coroutine_scheduler_key, schedule); } return schedule; } 複製代碼
pthread_setspecific
和pthread_getspecific
是線程存儲的存取函數,表面上看起來這是一個全局變量,全部線程均可以使用它,而它的值在每一個線程中都是是單獨存儲的。線程存儲的key值是pthread_key_t
類型,經過pthread_key_create
建立,pthread_key_create
須要兩個參數第一個是字符串類型的key值,第二個參數是一個清理函數,線程釋放這個key值對應的存儲空間的的時候,這個清理函數會被調用。線程存儲的建立方式保證了每個線程中只有一個scheduler,而且提供了獲取這個scheduler的入口。線程
// The main entry of the coroutine's scheduler // The scheduler is just a special coroutine, so we can use yield. void coroutine_scheduler_main(coroutine_t *scheduler_co) { coroutine_scheduler_t *scheduler = scheduler_co->scheduler; for (;;) { // Pop a coroutine from the scheduler's queue. coroutine_t *co = scheduler_queue_pop(scheduler); if (co == NULL) { // Yield the scheduler, give back cpu to origin thread. coroutine_yield(scheduler_co); // When some coroutine add to the scheduler's queue, // the scheduler will resume again, // then will resume here, continue the loop. continue; } // Set scheduler's current running coroutine. scheduler->running_coroutine = co; // Resume the coroutine coroutine_resume_im(co); // Set scheduler's current running coroutine to nil. scheduler->running_coroutine = nil; // if coroutine finished, free coroutine. if (co->status == COROUTINE_DEAD) { coroutine_close_ifdead(co); } } } 複製代碼
coroutine_scheduler_main
函數是scheduler
的runloop。一個for循環,從本身的協程隊列裏面讀取協程。當scheduler的協程隊列不爲空的時候,會從隊列中取出head執行入口函數。當隊列裏面的協程所有取出後,當前scheduler的協程隊列coroutine_queue爲空。main_coroutine會被coroutine_yield這個函數掛起。coroutine_yield會保存當前上下文,也就是說當main_coroutine下次被resume的時候,會從這裏繼續執行下去,繼續for循環。code
void coroutine_resume_im(coroutine_t *co) { switch (co->status) { case COROUTINE_READY: { co->stack_memory = coroutine_memory_malloc(co->stack_size); co->stack_top = co->stack_memory + co->stack_size - 3 * sizeof(void *); // get the pre context co->pre_context = malloc(sizeof(coroutine_ucontext_t)); BOOL skip = false; coroutine_getcontext(co->pre_context); if (skip) { // when proccess reenter(resume a coroutine), skip the remain codes, just return to pre func. return; } #pragma unused(skip) skip = true; free(co->context); co->context = calloc(1, sizeof(coroutine_ucontext_t)); coroutine_makecontext(co->context, (IMP)coroutine_main, co, (void *)co->stack_top); // setcontext coroutine_begin(co->context); break; } case COROUTINE_SUSPEND: { BOOL skip = false; coroutine_getcontext(co->pre_context); if (skip) { // when proccess reenter(resume a coroutine), skip the remain codes, just return to pre func. return; } #pragma unused(skip) skip = true; // setcontext coroutine_setcontext(co->context); break; } default: assert(false); break; } } 複製代碼
對於im這個函數,這篇文章主要介紹的是coobjc的調度,不作詳細的說明。咱們只須要執行這個函數在COROUTINE_READY
會執行im的entry。在COROUTINE_SUSPEND
狀態下會恢復以前的context也就是yield中斷的地方。
咱們來回顧一下co調度的整個流程。在一個線程中會建立惟一的數據構 scheduler。scheduler中包含main_co,running_co,co_queue。main_co的entry是一個for循環,在co_queue隊列裏面取出head co,設置head爲running_co,執行head的entry。當調用coroutine_resume(co)的時候。若是running_co存在,那麼running_co就會被yield掛起,main_co會從co_queue取出一個新的co執行它的entry,當for循環再次遍歷到這個 被掛起的co的時候,程序會跳轉到yield函數裏面繼續執行。若是若是runing_co不存在存在的話co會被添加到co_queue,同時resume會執行main_coroutine的entry,for循環開始。
co_launch 這個函數的功能相似於,dispatch_async。區別是co_launch,把須要執行的任務放到一個協程隊列裏面,dispatch_async是把執行任務放到一個線程隊列裏面執行。在調度層面經過coroutine_resume把co添加到scheduler的co_queue,在這個執行任務裏面,你能夠經過yield,resume來交出線程或者搶佔線程。
NS_INLINE COCoroutine * _Nonnull co_launch(void(^ _Nonnull block)(void)) { COCoroutine *co = [COCoroutine coroutineWithBlock:block onQueue:nil]; return [co resume]; } - (COCoroutine *)resume { COCoroutine *currentCo = [COCoroutine currentCoroutine]; BOOL isSubroutine = [currentCo.dispatch isEqualToDipatch:self.dispatch] ? YES : NO; [self.dispatch dispatch_async_block:^{ if (self.isResume) { return; } if (isSubroutine) { self.parent = currentCo; [currentCo addChild:self]; } self.isResume = YES; coroutine_resume(self.co); }]; return self; } 複製代碼