協程coobjc源碼分析:co調度

這篇文章主要經過源碼分析,介紹coobjc中的co調度。這個問題搞清楚以後,co_lauch作了什麼,看起來就很簡單了。咱們先了解coroutine和scheduler這兩個關鍵的數據結構。bash

coroutine

在協程的數據結構中和調度相關的字段。markdown

  • entry: 須要執行的任務,最終指向的是co_launch(block)中的block。
  • userdata: 一個OC的類對象COCoroutine,這個對象持有coroutine這個數據結構,和它一一對應。
  • context: 是協程執行的當前上下文。
  • pre_context: 保存的是這個協程被掛起或者執行完成後須要回覆的上下文,coobjc經過切換上下文來實現函數的跳轉。
  • scheduler: co被scheduler的co_queue持有,scheduler是co調度的核心。
struct coroutine {
        coroutine_func entry;                   // Process entry.
        void *userdata;                         // Userdata.
        void *context;                          // Coroutine,
        void *pre_context;                      // Coroutine's source process's Call stack data.
        
        struct coroutine_scheduler *scheduler;  // The pointer to the scheduler.
        ...
    };
    typedef struct coroutine coroutine_t;
複製代碼

coobjc經過yield,resume,add來操做co。這三個方法在co調度的時候也會被頻繁用到。數據結構

coroutine_yield

void coroutine_yield(coroutine_t *co)
{
    if (co == NULL) {
        // if null
        co = coroutine_self();
    }
    BOOL skip = false;
    coroutine_getcontext(co->context);
    if (skip) {
        return;
    }
#pragma unused(skip)
    skip = true;
    co->status = COROUTINE_SUSPEND;
    coroutine_setcontext(co->pre_context);
}
複製代碼

這個函數的做用是掛起協程。下面提到的main指的是scheduler中main_coroutine的入口函數 coroutine_scheduler_main,im指的是coroutine_resume_im(coroutine_t *co) 這個函數會執行co的entry,main中有個for循環遍歷co_queue取出head經過im函數執行head的entry。關於scheduler下面會有詳細介紹。async

coroutine_setcontext(co->pre_context);這一行可讓程序跳轉到coroutine_getcontext(co->pre_context)這裏。一般狀況下的調用棧main()->im()->coroutine_getcontext(co->pre_context)。當yeild執行的時候,程序跳轉到im函數中coroutine_getcontext(co->pre_context)的這個位置,im函數會直接return,跳轉到main函數的for循環裏,繼續取出co_queue的head執行head的entry。當for循環再次執行到這個被掛起的co的時候,在im執行co entry 方法中,調用coroutine_setcontext(co->context),程序跳轉到 coroutine_yield()方法中的 coroutine_getcontext(co->context);這一行,此時skip是yes。coroutine_yield()函數return。回到調用coroutine_yield()的地方。yield經過保存上下文,使得被掛起的co下次可以在以前的上下文環境下繼續執行。函數

coroutine_resume

void coroutine_resume(coroutine_t *co) {
    if (!co->is_scheduler) {
        coroutine_scheduler_t *scheduler = coroutine_scheduler_self_create_if_not_exists();
        co->scheduler = scheduler;
        
        scheduler_queue_push(scheduler, co);
        
        if (scheduler->running_coroutine) {
            // resume a sub coroutine.
            scheduler_queue_push(scheduler, scheduler->running_coroutine);
            coroutine_yield(scheduler->running_coroutine);
        } else {
            // scheduler is idle
            coroutine_resume_im(co->scheduler->main_coroutine);
        }
    }
}

複製代碼

coroutine_resume 這個方法是把co push 到scheduler的協程隊列裏面。若是當前有協程在運行的話,那個當前運行的協程就會被掛起,push到協程隊列裏面,若是co_queue中只有新添加進來的co和被掛起的co,此時新添加進來的co處於queue的head會被main函數的for循環取出執行entry。若是當前沒有協程在運行,就會執行scheduler中main_corroutine的entry函數,這個函數是一個for循環從隊列中讀取co,執行co的entry。添加到co_quue隊列中的co最終會被執行。後面的判斷若是沒有runing_coroutine,這個時候main_coroutine被掛起,for循環不執行,須要主動觸發一次main函數的調用coroutine_resume_im(co->scheduler->main_coroutine);oop

coroutine_add

void coroutine_add(coroutine_t *co) {
    if (!co->is_scheduler) {
        coroutine_scheduler_t *scheduler = coroutine_scheduler_self_create_if_not_exists();
        co->scheduler = scheduler;
        if (scheduler->main_coroutine->status == COROUTINE_DEAD) {
            coroutine_close_ifdead(scheduler->main_coroutine);
            coroutine_t *main_co = coroutine_create(coroutine_scheduler_main);
            main_co->is_scheduler = true;
            main_co->scheduler = scheduler;
            scheduler->main_coroutine = main_co;
        }
        scheduler_queue_push(scheduler, co);
        
        if (!scheduler->running_coroutine) {
            coroutine_resume_im(co->scheduler->main_coroutine);
        }
    }
}

複製代碼

這個方法把當前co添加到scheduler協程隊列裏面。若是main_coroutine的狀態是dead,會建立一個main_coroutine,coroutine_t *main_co = coroutine_create(coroutine_scheduler_main);這裏能夠看到main_coroutine的entry指向的是coroutine_scheduler_main這個函數下面還會講到,做用就是前面一直在說的for循環。沒有當前沒有running_coroutine會主動觸發main函數。源碼分析

scheduler

scheduler是協程調度的核心。spa

scheduler的數據結構

struct coroutine_scheduler {
        coroutine_t         *main_coroutine;
        coroutine_t         *running_coroutine;
        coroutine_list_t     coroutine_queue;
    };
    typedef struct coroutine_scheduler coroutine_scheduler_t;
    
        struct coroutine_list {
        coroutine_t *head;
        coroutine_t *tail;
    };
    typedef struct coroutine_list coroutine_list_t;
複製代碼
  • main_coroutine:它的entry指向 coroutine_scheduler_main函數,相似於線程中的runloop提供一個for循環,不斷讀取協程隊列中的head,執行head的入口函數,隊列爲空的時候main_coroutine會被掛起。
  • running_coroutine : 用來記錄當前正在運行中的協程,獲取或者掛起當前協程都會用到這個字段。
  • coroutine_queue: 是一個雙向鏈表,用來保存添加到當前scheduler的協程,當協程的入口函數執行完成後,scheduler會把它從鏈表中清除。

scheduler的建立過程。

//scheduler 的建立
coroutine_scheduler_t *coroutine_scheduler_self_create_if_not_exists(void) {
    
    if (!coroutine_scheduler_key) {
        pthread_key_create(&coroutine_scheduler_key, coroutine_scheduler_free);
    }
    
    void *schedule = pthread_getspecific(coroutine_scheduler_key);
    if (!schedule) {
        schedule = coroutine_scheduler_new();
        pthread_setspecific(coroutine_scheduler_key, schedule);
    }
    return schedule;
}
複製代碼

pthread_setspecificpthread_getspecific是線程存儲的存取函數,表面上看起來這是一個全局變量,全部線程均可以使用它,而它的值在每一個線程中都是是單獨存儲的。線程存儲的key值是pthread_key_t類型,經過pthread_key_create建立,pthread_key_create須要兩個參數第一個是字符串類型的key值,第二個參數是一個清理函數,線程釋放這個key值對應的存儲空間的的時候,這個清理函數會被調用。線程存儲的建立方式保證了每個線程中只有一個scheduler,而且提供了獲取這個scheduler的入口。線程

coroutine_scheduler_main

// The main entry of the coroutine's scheduler // The scheduler is just a special coroutine, so we can use yield. void coroutine_scheduler_main(coroutine_t *scheduler_co) { coroutine_scheduler_t *scheduler = scheduler_co->scheduler; for (;;) { // Pop a coroutine from the scheduler's queue.
        coroutine_t *co = scheduler_queue_pop(scheduler);
        if (co == NULL) {
            // Yield the scheduler, give back cpu to origin thread.
            coroutine_yield(scheduler_co);
            
            // When some coroutine add to the scheduler's queue, // the scheduler will resume again, // then will resume here, continue the loop. continue; } // Set scheduler's current running coroutine.
        scheduler->running_coroutine = co;
        // Resume the coroutine
        coroutine_resume_im(co);
        
        // Set scheduler's current running coroutine to nil. scheduler->running_coroutine = nil; // if coroutine finished, free coroutine. if (co->status == COROUTINE_DEAD) { coroutine_close_ifdead(co); } } } 複製代碼

coroutine_scheduler_main函數是scheduler的runloop。一個for循環,從本身的協程隊列裏面讀取協程。當scheduler的協程隊列不爲空的時候,會從隊列中取出head執行入口函數。當隊列裏面的協程所有取出後,當前scheduler的協程隊列coroutine_queue爲空。main_coroutine會被coroutine_yield這個函數掛起。coroutine_yield會保存當前上下文,也就是說當main_coroutine下次被resume的時候,會從這裏繼續執行下去,繼續for循環。code

coroutine_resume_im

void coroutine_resume_im(coroutine_t *co) {
    switch (co->status) {
        case COROUTINE_READY:
        {
            co->stack_memory = coroutine_memory_malloc(co->stack_size);
            co->stack_top = co->stack_memory + co->stack_size - 3 * sizeof(void *);
            // get the pre context
            co->pre_context = malloc(sizeof(coroutine_ucontext_t));
            BOOL skip = false;
            coroutine_getcontext(co->pre_context);
            if (skip) {
                // when proccess reenter(resume a coroutine), skip the remain codes, just return to pre func.
                return;
            }
#pragma unused(skip)
            skip = true;
            
            free(co->context);
            co->context = calloc(1, sizeof(coroutine_ucontext_t));
            coroutine_makecontext(co->context, (IMP)coroutine_main, co, (void *)co->stack_top);
            // setcontext
            coroutine_begin(co->context);
            
            break;
        }
        case COROUTINE_SUSPEND:
        {
            BOOL skip = false;
            coroutine_getcontext(co->pre_context);
            if (skip) {
                // when proccess reenter(resume a coroutine), skip the remain codes, just return to pre func.
                return;
            }
#pragma unused(skip)
            skip = true;
            // setcontext
            coroutine_setcontext(co->context);
            
            break;
        }
        default:
            assert(false);
            break;
    }
}
複製代碼

對於im這個函數,這篇文章主要介紹的是coobjc的調度,不作詳細的說明。咱們只須要執行這個函數在COROUTINE_READY會執行im的entry。在COROUTINE_SUSPEND狀態下會恢復以前的context也就是yield中斷的地方。

咱們來回顧一下co調度的整個流程。在一個線程中會建立惟一的數據構 scheduler。scheduler中包含main_co,running_co,co_queue。main_co的entry是一個for循環,在co_queue隊列裏面取出head co,設置head爲running_co,執行head的entry。當調用coroutine_resume(co)的時候。若是running_co存在,那麼running_co就會被yield掛起,main_co會從co_queue取出一個新的co執行它的entry,當for循環再次遍歷到這個 被掛起的co的時候,程序會跳轉到yield函數裏面繼續執行。若是若是runing_co不存在存在的話co會被添加到co_queue,同時resume會執行main_coroutine的entry,for循環開始。

co_lauch

co_launch 這個函數的功能相似於,dispatch_async。區別是co_launch,把須要執行的任務放到一個協程隊列裏面,dispatch_async是把執行任務放到一個線程隊列裏面執行。在調度層面經過coroutine_resume把co添加到scheduler的co_queue,在這個執行任務裏面,你能夠經過yield,resume來交出線程或者搶佔線程。

NS_INLINE COCoroutine * _Nonnull  co_launch(void(^ _Nonnull block)(void)) {
    COCoroutine *co = [COCoroutine coroutineWithBlock:block onQueue:nil];
    return [co resume];
}

- (COCoroutine *)resume {
    COCoroutine *currentCo = [COCoroutine currentCoroutine];
    BOOL isSubroutine = [currentCo.dispatch isEqualToDipatch:self.dispatch] ? YES : NO;
    [self.dispatch dispatch_async_block:^{
        if (self.isResume) {
            return;
        }
        if (isSubroutine) {
            self.parent = currentCo;
            [currentCo addChild:self];
        }
        self.isResume = YES;
        coroutine_resume(self.co);
    }];
    return self;
}
複製代碼
相關文章
相關標籤/搜索