這篇文章主要經過源碼分析,介紹coobjc中的co調度。這個問題搞清楚以後,co_lauch
作了什麼,看起來就很簡單了。咱們先了解coroutine和scheduler這兩個關鍵的數據結構。bash
在協程的數據結構中和調度相關的字段。數據結構
entry
: 須要執行的任務,最終指向的是co_launch(block)中的block。userdata
: 一個OC的類對象COCoroutine,這個對象持有coroutine這個數據結構,和它一一對應。context
: 是協程執行的當前上下文。pre_context
: 保存的是這個協程被掛起或者執行完成後須要回覆的上下文,coobjc經過切換上下文來實現函數的跳轉。scheduler
: co被scheduler的co_queue持有,scheduler是co調度的核心。struct coroutine {
coroutine_func entry; // Process entry.
void *userdata; // Userdata.
void *context; // Coroutine,
void *pre_context; // Coroutine's source process's Call stack data.
struct coroutine_scheduler *scheduler; // The pointer to the scheduler.
...
};
typedef struct coroutine coroutine_t;
複製代碼
coobjc經過yield,resume,add來操做co。這三個方法在co調度的時候也會被頻繁用到。async
void coroutine_yield(coroutine_t *co)
{
if (co == NULL) {
// if null
co = coroutine_self();
}
BOOL skip = false;
coroutine_getcontext(co->context);
if (skip) {
return;
}
#pragma unused(skip)
skip = true;
co->status = COROUTINE_SUSPEND;
coroutine_setcontext(co->pre_context);
}
複製代碼
這個函數的做用是掛起協程。下面提到的main指的是scheduler中main_coroutine的入口函數 coroutine_scheduler_main
,im指的是coroutine_resume_im(coroutine_t *co)
這個函數會執行co的entry,main中有個for循環遍歷co_queue取出head經過im函數執行head的entry。關於scheduler下面會有詳細介紹。函數
coroutine_setcontext(co->pre_context);
這一行可讓程序跳轉到coroutine_getcontext(co->pre_context)
這裏。一般狀況下的調用棧main()->im()->coroutine_getcontext(co->pre_context)
。當yeild執行的時候,程序跳轉到im函數中coroutine_getcontext(co->pre_context)
的這個位置,im函數會直接return,跳轉到main函數的for循環裏,繼續取出co_queue的head執行head的entry。當for循環再次執行到這個被掛起的co的時候,在im執行co entry 方法中,調用coroutine_setcontext(co->context)
,程序跳轉到 coroutine_yield()方法中的 coroutine_getcontext(co->context);
這一行,此時skip是yes。coroutine_yield()函數return。回到調用coroutine_yield()的地方。yield經過保存上下文,使得被掛起的co下次可以在以前的上下文環境下繼續執行。oop
void coroutine_resume(coroutine_t *co) {
if (!co->is_scheduler) {
coroutine_scheduler_t *scheduler = coroutine_scheduler_self_create_if_not_exists();
co->scheduler = scheduler;
scheduler_queue_push(scheduler, co);
if (scheduler->running_coroutine) {
// resume a sub coroutine.
scheduler_queue_push(scheduler, scheduler->running_coroutine);
coroutine_yield(scheduler->running_coroutine);
} else {
// scheduler is idle
coroutine_resume_im(co->scheduler->main_coroutine);
}
}
}
複製代碼
coroutine_resume 這個方法是把co push 到scheduler的協程隊列裏面。若是當前有協程在運行的話,那個當前運行的協程就會被掛起,push到協程隊列裏面,若是co_queue中只有新添加進來的co和被掛起的co,此時新添加進來的co處於queue的head會被main函數的for循環取出執行entry。若是當前沒有協程在運行,就會執行scheduler中main_corroutine的entry函數,這個函數是一個for循環從隊列中讀取co,執行co的entry。添加到co_quue隊列中的co最終會被執行。後面的判斷若是沒有runing_coroutine,這個時候main_coroutine被掛起,for循環不執行,須要主動觸發一次main函數的調用coroutine_resume_im(co->scheduler->main_coroutine);
源碼分析
void coroutine_add(coroutine_t *co) {
if (!co->is_scheduler) {
coroutine_scheduler_t *scheduler = coroutine_scheduler_self_create_if_not_exists();
co->scheduler = scheduler;
if (scheduler->main_coroutine->status == COROUTINE_DEAD) {
coroutine_close_ifdead(scheduler->main_coroutine);
coroutine_t *main_co = coroutine_create(coroutine_scheduler_main);
main_co->is_scheduler = true;
main_co->scheduler = scheduler;
scheduler->main_coroutine = main_co;
}
scheduler_queue_push(scheduler, co);
if (!scheduler->running_coroutine) {
coroutine_resume_im(co->scheduler->main_coroutine);
}
}
}
複製代碼
這個方法把當前co添加到scheduler協程隊列裏面。若是main_coroutine的狀態是dead,會建立一個main_coroutine,coroutine_t *main_co = coroutine_create(coroutine_scheduler_main);
這裏能夠看到main_coroutine的entry指向的是coroutine_scheduler_main
這個函數下面還會講到,做用就是前面一直在說的for循環。沒有當前沒有running_coroutine會主動觸發main函數。ui
scheduler是協程調度的核心。spa
struct coroutine_scheduler {
coroutine_t *main_coroutine;
coroutine_t *running_coroutine;
coroutine_list_t coroutine_queue;
};
typedef struct coroutine_scheduler coroutine_scheduler_t;
struct coroutine_list {
coroutine_t *head;
coroutine_t *tail;
};
typedef struct coroutine_list coroutine_list_t;
複製代碼
main_coroutine
:它的entry指向 coroutine_scheduler_main
函數,相似於線程中的runloop提供一個for循環,不斷讀取協程隊列中的head,執行head的入口函數,隊列爲空的時候main_coroutine會被掛起。running_coroutine
: 用來記錄當前正在運行中的協程,獲取或者掛起當前協程都會用到這個字段。coroutine_queue
: 是一個雙向鏈表,用來保存添加到當前scheduler的協程,當協程的入口函數執行完成後,scheduler會把它從鏈表中清除。scheduler
的建立過程。//scheduler 的建立
coroutine_scheduler_t *coroutine_scheduler_self_create_if_not_exists(void) {
if (!coroutine_scheduler_key) {
pthread_key_create(&coroutine_scheduler_key, coroutine_scheduler_free);
}
void *schedule = pthread_getspecific(coroutine_scheduler_key);
if (!schedule) {
schedule = coroutine_scheduler_new();
pthread_setspecific(coroutine_scheduler_key, schedule);
}
return schedule;
}
複製代碼
pthread_setspecific
和pthread_getspecific
是線程存儲的存取函數,表面上看起來這是一個全局變量,全部線程均可以使用它,而它的值在每一個線程中都是是單獨存儲的。線程存儲的key值是pthread_key_t
類型,經過pthread_key_create
建立,pthread_key_create
須要兩個參數第一個是字符串類型的key值,第二個參數是一個清理函數,線程釋放這個key值對應的存儲空間的的時候,這個清理函數會被調用。線程存儲的建立方式保證了每個線程中只有一個scheduler,而且提供了獲取這個scheduler的入口。線程
// The main entry of the coroutine's scheduler // The scheduler is just a special coroutine, so we can use yield. void coroutine_scheduler_main(coroutine_t *scheduler_co) { coroutine_scheduler_t *scheduler = scheduler_co->scheduler; for (;;) { // Pop a coroutine from the scheduler's queue.
coroutine_t *co = scheduler_queue_pop(scheduler);
if (co == NULL) {
// Yield the scheduler, give back cpu to origin thread.
coroutine_yield(scheduler_co);
// When some coroutine add to the scheduler's queue, // the scheduler will resume again, // then will resume here, continue the loop. continue; } // Set scheduler's current running coroutine.
scheduler->running_coroutine = co;
// Resume the coroutine
coroutine_resume_im(co);
// Set scheduler's current running coroutine to nil. scheduler->running_coroutine = nil; // if coroutine finished, free coroutine. if (co->status == COROUTINE_DEAD) { coroutine_close_ifdead(co); } } } 複製代碼
coroutine_scheduler_main
函數是scheduler
的runloop。一個for循環,從本身的協程隊列裏面讀取協程。當scheduler的協程隊列不爲空的時候,會從隊列中取出head執行入口函數。當隊列裏面的協程所有取出後,當前scheduler的協程隊列coroutine_queue爲空。main_coroutine會被coroutine_yield這個函數掛起。coroutine_yield會保存當前上下文,也就是說當main_coroutine下次被resume的時候,會從這裏繼續執行下去,繼續for循環。code
void coroutine_resume_im(coroutine_t *co) {
switch (co->status) {
case COROUTINE_READY:
{
co->stack_memory = coroutine_memory_malloc(co->stack_size);
co->stack_top = co->stack_memory + co->stack_size - 3 * sizeof(void *);
// get the pre context
co->pre_context = malloc(sizeof(coroutine_ucontext_t));
BOOL skip = false;
coroutine_getcontext(co->pre_context);
if (skip) {
// when proccess reenter(resume a coroutine), skip the remain codes, just return to pre func.
return;
}
#pragma unused(skip)
skip = true;
free(co->context);
co->context = calloc(1, sizeof(coroutine_ucontext_t));
coroutine_makecontext(co->context, (IMP)coroutine_main, co, (void *)co->stack_top);
// setcontext
coroutine_begin(co->context);
break;
}
case COROUTINE_SUSPEND:
{
BOOL skip = false;
coroutine_getcontext(co->pre_context);
if (skip) {
// when proccess reenter(resume a coroutine), skip the remain codes, just return to pre func.
return;
}
#pragma unused(skip)
skip = true;
// setcontext
coroutine_setcontext(co->context);
break;
}
default:
assert(false);
break;
}
}
複製代碼
對於im這個函數,這篇文章主要介紹的是coobjc的調度,不作詳細的說明。咱們只須要執行這個函數在COROUTINE_READY
會執行im的entry。在COROUTINE_SUSPEND
狀態下會恢復以前的context也就是yield中斷的地方。
咱們來回顧一下co調度的整個流程。在一個線程中會建立惟一的數據構 scheduler。scheduler中包含main_co,running_co,co_queue。main_co的entry是一個for循環,在co_queue隊列裏面取出head co,設置head爲running_co,執行head的entry。當調用coroutine_resume(co)的時候。若是running_co存在,那麼running_co就會被yield掛起,main_co會從co_queue取出一個新的co執行它的entry,當for循環再次遍歷到這個 被掛起的co的時候,程序會跳轉到yield函數裏面繼續執行。若是若是runing_co不存在存在的話co會被添加到co_queue,同時resume會執行main_coroutine的entry,for循環開始。
co_launch 這個函數的功能相似於,dispatch_async。區別是co_launch,把須要執行的任務放到一個協程隊列裏面,dispatch_async是把執行任務放到一個線程隊列裏面執行。在調度層面經過coroutine_resume把co添加到scheduler的co_queue,在這個執行任務裏面,你能夠經過yield,resume來交出線程或者搶佔線程。
NS_INLINE COCoroutine * _Nonnull co_launch(void(^ _Nonnull block)(void)) {
COCoroutine *co = [COCoroutine coroutineWithBlock:block onQueue:nil];
return [co resume];
}
- (COCoroutine *)resume {
COCoroutine *currentCo = [COCoroutine currentCoroutine];
BOOL isSubroutine = [currentCo.dispatch isEqualToDipatch:self.dispatch] ? YES : NO;
[self.dispatch dispatch_async_block:^{
if (self.isResume) {
return;
}
if (isSubroutine) {
self.parent = currentCo;
[currentCo addChild:self];
}
self.isResume = YES;
coroutine_resume(self.co);
}];
return self;
}
複製代碼