前言
首先提出一些問題:程序員
dispatch_async
函數如何實現,分發到主隊列和全局隊列有什麼區別,必定會新建線程執行任務麼?dispatch_sync
函數如何實現,爲何說 GCD 死鎖是隊列致使的而不是線程,死鎖不是操做系統的概念麼?- 信號量是如何實現的,有哪些使用場景?
dispatch_group
的等待與通知、dispatch_once
如何實現?dispatch_source
用來作定時器如何實現,有什麼優勢和用途?dispatch_suspend
和dispatch_resume
如何實現,隊列的的暫停和計時器的暫停有區別麼?
以上問題基本都是對 GCD 經常使用 API 的追問與思考,深刻理解這些問題有助於更好地使用 GCD,好比如下代碼的執行結果是什麼?macos
- (void)viewDidLoad { [super viewDidLoad]; dispatch_queue_t queue = dispatch_queue_create("com.bestswifter.queue", nil); dispatch_sync(queue, ^{ NSLog(@"current thread = %@", [NSThread currentThread]); dispatch_sync(dispatch_get_main_queue(), ^{ NSLog(@"current thread = %@", [NSThread currentThread]); }); }); }
如下內容爲我的的學習總結,僅供參考,不必定適合新手入門。最好的學習方法仍是本身下載一份源碼並仔細閱讀學習。swift
文章主要分析了常見 API 的實現原理,因水平所限,不可避免的有理解錯誤的地方,歡迎指出。若是對具體分析不感興趣,能夠直接跳到文章末尾的「總結」部分。api
知識儲備
閱讀 GCD 源碼以前,須要瞭解一些相關知識,這樣才能在讀到源碼時不至於一臉懵逼,進而影響理解。數據結構
DISPATCH_DECL
GCD 中對變量的定義大多遵循以下格式:併發
#define DISPATCH_DECL(name) typedef struct name##_s *name##_t
好比說很是常見的 DISPATCH_DECL(dispatch_queue);
,它的展開形式是:app
typedef struct dispatch_queue_s *dispatch_queue_t;
這行代碼定義了一個 類型的指針,指向一個 類型的結構體。dispatch_queue_tdispatch_queue_s
TSD
TSD(Thread-Specific Data) 表示線程私有數據。在 C++ 中,全局變量能夠被全部線程訪問,局部變量只有函數內部能夠訪問。而 TSD 的做用就是可以在同一個線程的不一樣函數中被訪問。在不一樣線程中,雖然名字相同,可是獲取到的數據隨線程不一樣而不一樣。框架
一般,咱們能夠利用 POSIX 庫提供的 API 來實現 TSD:異步
int pthread_key_create(pthread_key_t *key, void (*destr_function) (void *))
這個函數用來建立一個 key,在線程退出時會將 key 對應的數據傳入 函數中進行清理。destr_function
咱們分別使用 get/set 方法來訪問/修改 key 對應的數據:async
int pthread_setspecific(pthread_key_t key, const void *pointer) void * pthread_getspecific(pthread_key_t key)
在 GCD 中定義了六個 key,根據名字大概能猜出各自的含義:
pthread_key_t dispatch_queue_key;
pthread_key_t dispatch_sema4_key;
pthread_key_t dispatch_cache_key;
pthread_key_t dispatch_io_key;
pthread_key_t dispatch_apply_key;
pthread_key_t dispatch_bcounter_key;
fastpath && slowpath
這是定義在 internal.h
中的兩個宏:
爲了理解所謂的快路徑和慢路徑,咱們須要先學習一點計算機基礎知識。好比這段很是簡單的代碼:
#define fastpath(x) ((typeof(x))__builtin_expect((long)(x), ~0l)) #define slowpath(x) ((typeof(x))__builtin_expect((long)(x), 0l))
if (x) return 1; else return 39;
因爲計算機並不是一次只讀取一條指令,而是讀取多條指令,因此在讀到 if 語句時也會把 return 1
讀取進來。若是 x 爲 0,那麼會從新讀取 return 39
,重讀指令相對來講比較耗時。
如過 x 有很是大的機率是 0,那麼 return 1
這條指令每次不可避免的會被讀取,而且實際上幾乎沒有機會執行, 形成了沒必要要的指令重讀。固然,最簡單的優化就是:
if (!x) return 39; else return 1;
然而對程序員來講,每次都作這樣的判斷很是燒腦,並且容易出錯。因而 GCC 提供了一個內置函數 __builtin_expect
:
long __builtin_expect (long EXP, long C)
它的返回值就是整個函數的返回值,參數 C 表明預計的值,表示程序員知道 EXP 的值極可能就是 C。好比上文中的例子能夠這樣寫:
if (__builtin_expect(x, 0)) return 1; else return 39;
雖然寫法邏輯不變,可是編譯器會把彙編代碼優化成 if(!x)
的形式。
所以,在蘋果定義的兩個宏中,fastpath(x)
依然返回 x,只是告訴編譯器 x 的值通常不爲 0,從而編譯器能夠進行優化。同理,slowpath(x)
表示 x 的值極可能爲 0,但願編譯器進行優化。
dispatch_queue_t
以 dispatch_queue_create
的源碼爲例:
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr) { // 省略 label 相關的操做 dispatch_queue_t dq; dq = _dispatch_alloc(DISPATCH_VTABLE(queue), sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_MIN_LABEL_SIZE - DISPATCH_QUEUE_CACHELINE_PAD + label_len + 1); _dispatch_queue_init(dq); if (fastpath(!attr)) { return dq; } if (fastpath(attr == DISPATCH_QUEUE_CONCURRENT)) { dq->dq_width = UINT32_MAX; dq->do_targetq = _dispatch_get_root_queue(0, false); } else { dispatch_debug_assert(!attr, "Invalid attribute"); } return dq; }
咱們知道建立隊列時, attr 屬性有三個值可選,nil
、DISPATCH_QUEUE_SERIAL
(實際上就是 nil) 或 DISPATCH_QUEUE_CONCURRENT
。第一個 if 判斷中,蘋果認爲串行隊列,或者 NULL
參數更常見,所以 !attr
的值頗有可能不爲 0,這與上文的結論一致。
第二個判斷中,參數幾乎有隻多是 DISPATCH_QUEUE_CONCURRENT
,所以 attr == DISPATCH_QUEUE_CONCURRENT
這個判斷機會不會爲 0,依然與 fastpath
的做用一致。
_dispatch_get_root_queue
會獲取一個全局隊列,它有兩個參數,分別表示優先級和是否支持 overcommit。一共有四個優先級,LOW
、DEFAULT
、HIGH
和 BACKGROUND
,所以共有 8 個全局隊列。帶有 overcommit 的隊列表示每當有任務提交時,系統都會新開一個線程處理,這樣就不會形成某個線程過載(overcommit)。
這 8 個全局隊列的序列號是 4-11,序列號爲 1 的隊列是主隊列,2 是 manager 隊列,用來管理 GCD 內部的任務(好比下文介紹的定時器),3 這個序列號暫時沒有使用。隊列 的 dq_width
被設置爲 UINT32_MAX
,表示這些隊列不限制併發數。
做爲對比,在 _dispatch_queue_init
中,併發數限制爲 1,也就是串行隊列的默認設置:
static inline void _dispatch_queue_init(dispatch_queue_t dq) {
dq->do_next = DISPATCH_OBJECT_LISTLESS; dq->do_targetq = _dispatch_get_root_queue(0, true); dq->dq_running = 0; dq->dq_width = 1; }
注意這行代碼: dq->do_targetq = _dispatch_get_root_queue(0, true);
,它涉及到 GCD 隊列與 block 的一個重要模型,target_queue
。向任何隊列中提交的 block,都會被放到它的目標隊列中執行,而普通串行隊列的目標隊列就是一個支持 overcommit 的全局隊列,全局隊列的底層則是一個線程池。
借用 objc 的文章 中的圖片來表示:
![](http://static.javashuo.com/static/loading.gif)
dispatch_async
直接上函數實現:
dispatch_async(dispatch_queue_t queue, dispatch_block_t block) {
dispatch_async_f(dq, _dispatch_Block_copy(work), _dispatch_call_block_and_release);
}
隊列其實就是一個用來提交 block 的對象,當 block 提交到隊列中後,將按照 「先入先出(FIFO)」 的順序進行處理。系統在 GCD 的底層會維護一個線程池,用來執行這些 block。
block 參數的類型是 dispatch_block_t
,它是一個沒有參數,沒有返回值的 block:
typedef void (^dispatch_block_t)(void);
dispatch_async
的函數很簡單,它將 block 複製了一份,而後調用另外一個函數 dispatch_async_f
:
dispatch_async_f(dispatch_queue_t queue, void *context, dispatch_function_t work);
work 參數是一個函數,在實際調用時,會把第二參數 context 做爲參數傳入,以 爲例:_dispatch_call_block_and_release
void _dispatch_call_block_and_release(void *block) { void (^b)(void) = block; b(); Block_release(b); }
省略各類分支後的 dispatch_async_f
函數實現以下:
void dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) { dispatch_continuation_t dc; if (dq->dq_width == 1) { return dispatch_barrier_async_f(dq, ctxt, func); } dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT; dc->dc_func = func; dc->dc_ctxt = ctxt; if (dq->do_targetq) { return _dispatch_async_f2(dq, dc); } _dispatch_queue_push(dq, dc); }
可見若是是串行隊列 (dq_width = 1),會調用 dispatch_barrier_async_f
函數處理,這個後文會有介紹。若是有 do_targetq
則進行轉發,不然調用 _dispatch_queue_push
入隊。
這裏的 dispatch_continuation_t
實際上是對 block 的封裝,而後調用 _dispatch_queue_push
這個宏將封裝好的 block 放入隊列中。
把這個宏展開,而後依次分析調用棧,選擇一條主幹調用線,結果以下:
_dispatch_queue_push
└──_dispatch_trace_queue_push
└──_dispatch_queue_push
└──_dispatch_queue_push_slow
└──_dispatch_queue_push_list_slow2
└──_dispatch_wakeup
└──dx_probe
隊列中保存了一個鏈表,咱們首先將新的 block 添加到鏈表尾部,而後調用 dx_probe
宏,它依賴於 vtable 數據結構,GCD 中的大部分對象,好比隊列等,都具備這個數據結構。它定義了對象在不一樣操做下該執行的方法,好比在這裏的 probe
操做下,實際上會執行 _dispatch_queue_wakeup_global
方法,調用棧以下
_dispatch_queue_wakeup_global
└──_dispatch_queue_wakeup_global2
└──_dispatch_queue_wakeup_global_slow
在 _dispatch_queue_wakeup_global_slow
咱們見到了熟悉的老朋友,pthread 線程:
static void _dispatch_queue_wakeup_global_slow(dispatch_queue_t dq, unsigned int n) { // 若是線程池已滿,則直接調用 _dispatch_worker_thread // 不然建立線程池 pthread_t pthr; while ((r = pthread_create(&pthr, NULL, _dispatch_worker_thread, dq))) { if (r != EAGAIN) { (void)dispatch_assume_zero(r); } sleep(1); } r = pthread_detach(pthr); (void)dispatch_assume_zero(r); }
因而可知這裏確實使用了線程池。建立線程後會執行 _dispatch_worker_thread
回調:
_dispatch_worker_thread
└──_dispatch_worker_thread4
└──_dispatch_continuation_pop
在 pop 函數中,咱們拿到了最先加入的任務,而後執行:
static inline void _dispatch_continuation_pop(dispatch_object_t dou) { // ... _dispatch_client_callout(dc->dc_ctxt, dc->dc_func); if (dg) { dispatch_group_leave(dg); _dispatch_release(dg); } }
dispatch_async
的實現比較複雜,主要是由於其中的數據結構較多,分支流程控制比較複雜。但思路其實很簡單,用鏈表保存全部提交的 block,而後在底層線程池中,依次取出 block 並執行。
若是熟悉了相關數據結構和調用流程,接下來研究 GCD 的其餘 API 就比較輕鬆了。
dispatch_sync
同步方法的實現相對來講和異步相似,並且更簡單,調用棧以下:
dispatch_sync └──dispatch_sync_f └──_dispatch_sync_f2 └──_dispatch_sync_f_slow static void _dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) { _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore(); struct dispatch_sync_slow_s { DISPATCH_CONTINUATION_HEADER(sync_slow); } dss = { .do_vtable = (void*)DISPATCH_OBJ_SYNC_SLOW_BIT, .dc_ctxt = (void*)sema, }; _dispatch_queue_push(dq, (void *)&dss); _dispatch_thread_semaphore_wait(sema); _dispatch_put_thread_semaphore(sema); // ... }
這裏利用了線程專屬信號量,保證了每次只有一個 block 被執行。
這條調用棧有多個分支,若是向當前串行隊列提交任務就會走到上述分支,致使死鎖。若是是向其它串行隊列提交 block,則會利用原子性操做來實現,所以不會有死鎖問題。
dispatch_semaphore
關於信號量的 API 很少,主要是三個,create
、wait
和 signal
。
信號量在初始化時要指定 value,隨後內部將這個 value 存儲起來。實際操做時會存兩個 value,一個是當前的 value,一個是記錄初始 value。
信號的 wait
和 signal
是互逆的兩個操做。若是 value 大於 0,前者將 value 減一,此時若是 value 小於零就一直等待。
初始 value 必須大於等於 0,若是爲 0 並隨後調用 wait
方法,線程將被阻塞直到別的線程調用了 signal
方法。
dispatch_semaphore_wait
首先從這個函數的源碼看起:
long dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout) { long value = dispatch_atomic_dec2o(dsema, dsema_value); dispatch_atomic_acquire_barrier(); if (fastpath(value >= 0)) { return 0; } return _dispatch_semaphore_wait_slow(dsema, timeout); }
第一行的 dispatch_atomic_dec2o
是一個宏,會調用 GCC 內置的函數 __sync_sub_and_fetch
,實現減法的原子性操做。所以這一行的意思是將 dsema 的值減一,並把新的值賦給 value。
若是減一後的 value 大於等於 0 就馬上返回,沒有任何操做,不然進入等待狀態。
_dispatch_semaphore_wait_slow
函數針對不一樣的 timeout 參數,分了三種狀況考慮:
case DISPATCH_TIME_NOW: while ((orig = dsema->dsema_value) < 0) { if (dispatch_atomic_cmpxchg2o(dsema, dsema_value, orig, orig + 1)) { return KERN_OPERATION_TIMED_OUT; } }
這種狀況下會馬上判斷 dsema->dsema_value
與 orig
是否相等。若是 while 判斷成立,內部的 if 判斷必定也成立,此時會將 value 加一(也就是變爲 0) 並返回。加一的緣由是爲了抵消 wait
函數一開始的減一操做。此時函數調用方會獲得返回值 KERN_OPERATION_TIMED_OUT
,表示因爲等待時間超時而返回。
實際上 while 判斷必定會成立,由於若是 value 大於等於 0,在上一個函數 dispatch_semaphore_wait
中就已經返回了。
第二種狀況是 DISPATCH_TIME_FOREVER
這個 case:
case DISPATCH_TIME_FOREVER: do { kr = semaphore_wait(dsema->dsema_port); } while (kr == KERN_ABORTED); break;
進入 do-while 循環後會調用系統的 semaphore_wait
方法,KERN_ABORTED
表示調用者被一個與信號量系統無關的緣由喚醒。所以一旦發生這種狀況,仍是要繼續等待,直到收到 signal
調用。
在其餘狀況下(default 分支),咱們指定一個超時時間,這和 DISPATCH_TIME_FOREVER
的處理比較相似,不一樣的是咱們調用了內核提供的 semaphore_timedwait
方法能夠指定超時時間。
整個函數的框架以下:
static long _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout) { again: while ((orig = dsema->dsema_sent_ksignals)) { if (dispatch_atomic_cmpxchg2o(dsema, dsema_sent_ksignals, orig, orig - 1)) { return 0; } } switch (timeout) { default: /* semaphore_timedwait */ case DISPATCH_TIME_NOW: /* KERN_OPERATION_TIMED_OUT */ case DISPATCH_TIME_FOREVER: /* semaphore_wait */ } goto again; }
可見信號量被喚醒後,會回到最開始的地方,進入 while 循環。這個判斷條件通常都會成立,極端狀況下因爲內核存在 bug,致使 orig
和 dsema_sent_ksignals
不相等,也就是收到虛假 signal
信號時會忽略。
進入 while 循環後,if 判斷必定成立,所以返回 0,正如文檔所說,返回 0 表示成功,不然表示超時。
dispatch_semaphore_signal
這個函數的實現相對來講比較簡單,由於它不須要阻塞,只用喚醒。簡化版源碼以下:
long dispatch_semaphore_signal(dispatch_semaphore_t dsema) { long value = dispatch_atomic_inc2o(dsema, dsema_value); if (fastpath(value > 0)) { return 0; } return _dispatch_semaphore_signal_slow(dsema); }
首先會調用原子方法讓 value 加一,若是大於零就馬上返回 0,不然返回 _dispatch_semaphore_signal_slow
:
long _dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema) { (void)dispatch_atomic_inc2o(dsema, dsema_sent_ksignals); _dispatch_semaphore_create_port(&dsema->dsema_port); kern_return_t kr = semaphore_signal(dsema->dsema_port); return 1; }
它的做用僅僅是調用內核的 semaphore_signal
函數喚醒信號量,而後返回 1。這也符合文檔中的描述:「若是喚醒了線程,返回非 0,不然返回 0」。
dispatch_group
有了上面的鋪墊,group 是一個很是容易理解的概念,咱們先看看如何建立 group:
dispatch_group_t dispatch_group_create(void) { dispatch_group_t dg = _dispatch_alloc(DISPATCH_VTABLE(group), sizeof(struct dispatch_semaphore_s)); _dispatch_semaphore_init(LONG_MAX, dg); return dg; }
沒錯,group 就是一個 value 爲 LONG_MAX
的信號量。
dispatch_group_async
它僅僅是 dispatch_group_async_f
的封裝:
void dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt, dispatch_function_t func) { dispatch_continuation_t dc; dispatch_group_enter(dg); dc = _dispatch_continuation_alloc(); dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_GROUP_BIT); dc->dc_func = func; dc->dc_ctxt = ctxt; dc->dc_data = dg; _dispatch_queue_push(dq, dc); }
這個函數和 dispatch_async_f
的實現高度一致,主要的不一樣在於調用了 dispatch_group_enter
方法:
這個方法也沒作什麼,就是調用 wait
方法讓信號量的 value 減一而已。
void dispatch_group_enter(dispatch_group_t dg) { dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg; (void)dispatch_semaphore_wait(dsema, DISPATCH_TIME_FOREVER); }
dispatch_group_wait
這個方法用於等待 group 中全部任務執行完成,能夠理解爲信號量 wait 的封裝:
long dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout) { dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg; if (dsema->dsema_value == dsema->dsema_orig) { return 0; } if (timeout == 0) { return KERN_OPERATION_TIMED_OUT; } return _dispatch_group_wait_slow(dsema, timeout); }
若是當前 value 和原始 value 相同,代表任務已經所有完成,直接返回 0,若是 timeout 爲 0 也會馬上返回,不然調用 _dispatch_group_wait_slow
。這個方法等等待部分和 _dispatch_semaphore_signal_slow
幾乎一致,區別在於等待結束後它不是 return,而是調用 _dispatch_group_wake
去喚醒這個 group。
static long _dispatch_group_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout) { again: _dispatch_group_wake(dsema); switch (timeout) {/* 三種狀況分類 */} goto again; }
這裏咱們暫時跳過 _dispatch_group_wake
,後面會有詳細分析。只要知道這個函數在 group 中全部事件執行完後會被調用便可。
dispatch_group_notify
老習慣,這個函數僅僅是封裝了 dispatch_group_notify_f
:
void dispatch_group_notify_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt, void (*func)(void *)) { dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg; struct dispatch_sema_notify_s *dsn, *prev; dsn->dsn_queue = dq; dsn->dsn_ctxt = ctxt; dsn->dsn_func = func; prev = dispatch_atomic_xchg2o(dsema, dsema_notify_tail, dsn); if (fastpath(prev)) { prev->dsn_next = dsn; } else {/* ... */} }
這種結構的代碼咱們已經遇到屢次了,它其實就是在鏈表的尾部續上新的元素。因此 notify 方法並無作過多的處理,只是是用鏈表把全部回調通知保存起來,等待調用。
dispatch_group_leave
在介紹 dispatch_async
函數時,咱們看到任務在被執行時,還會調用 dispatch_group_leave
函數:
void dispatch_group_leave(dispatch_group_t dg) { dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg; long value = dispatch_atomic_inc2o(dsema, dsema_value); if (slowpath(value == dsema->dsema_orig)) { (void)_dispatch_group_wake(dsema); } }
當 group 的 value 變爲初始值時,表示全部任務都已執行完,開始調用 _dispatch_group_wake
處理回調。
_dispatch_group_wake
static long _dispatch_group_wake(dispatch_semaphore_t dsema) { struct dispatch_sema_notify_s *next, *head, *tail = NULL; long rval; head = dispatch_atomic_xchg2o(dsema, dsema_notify_head, NULL); if (head) { tail = dispatch_atomic_xchg2o(dsema, dsema_notify_tail, NULL); } rval = dispatch_atomic_xchg2o(dsema, dsema_group_waiters, 0); if (rval) { _dispatch_semaphore_create_port(&dsema->dsema_waiter_port); do { kern_return_t kr = semaphore_signal(dsema->dsema_waiter_port); } while (--rval); }
if (head) { // async group notify blocks do { dispatch_async_f(head->dsn_queue, head->dsn_ctxt, head->dsn_func); next = fastpath(head->dsn_next); if (!next && head != tail) { while (!(next = fastpath(head->dsn_next))) { _dispatch_hardware_pause(); } } free(head); } while ((head = next)); } return 0; }
這個函數主要分爲兩部分,首先循環調用 semaphore_signal
告知喚醒當初等待 group 的信號量,所以 dispatch_group_wait
函數得以返回。
而後獲取鏈表,依次調用 dispatch_async_f
異步執行在 notify 函數中註冊的回調。
dispatch_once
dispatch_once
僅僅是一個包裝,內部直接調用了 dispatch_once_f
:
void dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func) { struct _dispatch_once_waiter_s * volatile *vval = (struct _dispatch_once_waiter_s**)val; struct _dispatch_once_waiter_s dow = { NULL, 0 }; struct _dispatch_once_waiter_s *tail, *tmp; _dispatch_thread_semaphore_t sema; if (dispatch_atomic_cmpxchg(vval, NULL, &dow)) { _dispatch_client_callout(ctxt, func); tmp = dispatch_atomic_xchg(vval, DISPATCH_ONCE_DONE); tail = &dow; while (tail != tmp) { while (!tmp->dow_next) { _dispatch_hardware_pause(); }
sema = tmp->dow_sema; tmp = (struct _dispatch_once_waiter_s*)tmp->dow_next; _dispatch_thread_semaphore_signal(sema); } } else { dow.dow_sema = _dispatch_get_thread_semaphore(); for (;;) { tmp = *vval; if (tmp == DISPATCH_ONCE_DONE) { break; } dispatch_atomic_store_barrier(); if (dispatch_atomic_cmpxchg(vval, tmp, &dow)) { dow.dow_next = tmp; _dispatch_thread_semaphore_wait(dow.dow_sema); } } _dispatch_put_thread_semaphore(dow.dow_sema); } }
這段代碼比較長,咱們考慮三個場景:
- 第一次調用: 此時外部傳進來的 onceToken 仍是空指針,因此 vval 爲 NULL,if 判斷成立。首先執行 block,而後讓將 vval 的值設爲
DISPATCH_ONCE_DONE
表示任務已經完成,同時用 tmp 保存先前的 vval。此時,dow 也爲空,所以 while 判斷不成立,代碼執行結束。 - 同一線程第二次調用: 因爲 vval 已經變成了
DISPATCH_ONCE_DONE
,所以 if 判斷不成立,進入 else 分支的 for 循環。因爲 tmp 就是DISPATCH_ONCE_DONE
,因此循環退出,沒有作任何事。 - 多個線程同時調用: 因爲 if 判斷中是一個原子性操做,因此必然只有一個線程能進入 if 分支,其餘的進入 else 分支。因爲其餘線程在調用函數時,vval 還不是
DISPATCH_ONCE_DONE
,因此進入到 for 循環的後半部分。這裏構造了一個鏈表,鏈表的每一個節點上都調用了信號量的wait
方法並阻塞,而在 if 分支中,則會依次遍歷全部的節點並調用signal
方法,喚醒全部等待中的信號量。
dispatch_barrier_async
它調用了 dispatch_barrier_async_f
函數,實現原理也和 dispatch_async_f
相似:
void dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) { dispatch_continuation_t dc; dc = fastpath(_dispatch_continuation_alloc_cacheonly()); dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT); dc->dc_func = func; dc->dc_ctxt = ctxt; _dispatch_queue_push(dq, dc); }
區別在於 do_vtable
被設置了兩個標誌位,多了一個 DISPATCH_OBJ_BARRIER_BIT
標記。這個標記在從隊列中取出任務時被用到:
static _dispatch_thread_semaphore_t _dispatch_queue_drain(dispatch_queue_t dq) { while (dq->dq_items_tail) { /* ... */ if (!DISPATCH_OBJ_IS_VTABLE(dc) && (long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) { if (dq->dq_running > 1) { goto out; } } else { _dispatch_continuation_redirect(dq, dc); continue; } } out: /* 不完整的 drain,須要清理現場 */ return sema; // 返回空的信號量 }
這裏原來是一個循環,會拿出全部的任務,依次調用 ,最終並行處理。一旦遇到 這個標記,就會終止循環。_dispatch_continuation_redirectDISPATCH_OBJ_BARRIER_BIT
在 out
標籤後面,返回了一個空的信號量,隨後方法的調用者會把它單獨放入隊列,等待下一次執行:
void _dispatch_queue_invoke(dispatch_queue_t dq) { _dispatch_thread_semaphore_t sema = _dispatch_queue_drain(dq); if (sema) { _dispatch_thread_semaphore_signal(sema); } else if (tq) { return _dispatch_queue_push(tq, dq); } }
所以 barrier 方法能等待此前全部任務執行完之後執行 _dispatch_queue_push
,同時保證本身執行完之後才執行後續的操做。
dispatch_source
source 是一種資源,相似於 生產者/消費者模式中的生產者,而隊列則是消費者。當有新的資源(source) 產生時,他們被放到對應的隊列上被執行(消費)。
dispatch_source
最多見的用途之一就是用來實現定時器,舉一個小例子:
dispatch_source_t timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue); dispatch_source_set_timer(timer, dispatch_walltime(NULL, 0), 10*NSEC_PER_SEC, 1*NSEC_PER_SEC); //每10秒觸發timer,偏差1秒 dispatch_source_set_event_handler(timer, ^{ // 定時器觸發時執行的 block }); dispatch_resume(timer);
使用 GCD Timer 的好處在於不依賴 runloop,所以任何線程均可以使用。因爲使用了 block,不會忘記避免循環引用。此外,定時器能夠自由控制精度,隨時修改間隔時間等。
dispatch_source_create
下面從底層源碼的角度來研究這幾行代碼的做用。首先是 dispatch_source_create
函數,它和以前見到的 create 函數都差很少,對 dispatch_source_t 對象作了一些初始化工做:
dispatch_source_t ds = NULL; ds = _dispatch_alloc(DISPATCH_VTABLE(source), sizeof(struct dispatch_source_s)); _dispatch_queue_init((dispatch_queue_t)ds); ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL; ds->do_targetq = &_dispatch_mgr_q; dispatch_set_target_queue(ds, q); return ds;
這裏涉及到兩個隊列,其中 q 是用戶指定的隊列,表示事件觸發的回調在哪一個隊列執行。而 _dispatch_mgr_q
則表示由哪一個隊列來管理這個 source,mgr 是 manager 的縮寫,也是上文提到的序列號爲 2 的內部隊列。
dispatch_source_set_timer
在這個函數中,首先會有參數處理,過濾掉不符合要求的參數。隨後建立了 dispatch_set_timer_params
類型的指針 params:
struct dispatch_set_timer_params { dispatch_source_t ds; uintptr_t ident; struct dispatch_timer_source_s values; };
這個 params 負責綁定定時器對象與他的參數(存儲在 valus
屬性中),最後調用:
dispatch_barrier_async_f((dispatch_queue_t)ds, params, _dispatch_source_set_timer2);
這裏是把 source 當作隊列來使用,所以其實是調用了 _dispatch_source_set_timer2(params)
方法:
static void _dispatch_source_set_timer2(void *context) { // Called on the source queue struct dispatch_set_timer_params *params = context; dispatch_suspend(params->ds); dispatch_barrier_async_f(&_dispatch_mgr_q, params, _dispatch_source_set_timer3); }
這裏首先暫停了隊列,避免了修改的過程當中定時器被觸發。而後在 manager 隊列上執行 _dispatch_source_set_timer3(params)
:
static void _dispatch_source_set_timer3(void *context) { struct dispatch_set_timer_params *params = context; dispatch_source_t ds = params->ds; // ... _dispatch_timer_list_update(ds); dispatch_resume(ds); }
_dispatch_timer_list_update
函數的做用是根據下一次觸發時間將 timer 排序。
接下來,當初分發到 manager 隊列的 block 將要被執行,走到 _dispatch_mgr_invoke
函數,其中有以下代碼:
timeoutp = _dispatch_get_next_timer_fire(&timeout); r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL, sel_timeoutp);
可見 GCD 的定時器是由系統的 select 方法實現的。
當內層的 manager 隊列被喚醒後,還會進一步喚醒外層的隊列(當初用戶指定的那個),並在隊列上執行 timer 觸發時的 block。
dispatch_resume/suspend
GCD 對象的暫停和恢復由 do_suspend_cnt
決定,暫停時經過原子操做將改屬性的值加 2,對應的在恢復時經過原子操做將該屬性減二。
它有兩個默認值:
#define DISPATCH_OBJECT_SUSPEND_LOCK 1u #define DISPATCH_OBJECT_SUSPEND_INTERVAL 2u
在喚醒隊列時有以下代碼:
void _dispatch_queue_invoke(dispatch_queue_t dq) { if (!dispatch_atomic_sub2o(dq, do_suspend_cnt, DISPATCH_OBJECT_SUSPEND_LOCK)) { if (dq->dq_running == 0) { _dispatch_wakeup(dq); // verify that the queue is idle } } }
可見可以喚醒隊列的前提是 dq->do_suspend_cnt - 1 = 0
,也就是要求 do_suspend_cnt
的值就是 DISPATCH_OBJECT_SUSPEND_LOCK
。
觀察 8 個全局隊列和主隊列的定義就會發現,他們的 do_suspend_cnt
值確實爲 DISPATCH_OBJECT_SUSPEND_LOCK
,所以默認處於啓動狀態。
而 dispatch_source
的 create 方法中,do_suspend_cnt
的初始值爲 DISPATCH_OBJECT_SUSPEND_INTERVAL
,所以默認處於暫停狀態,須要手動開啓。
dispatch_after
dispatch_after
其實依賴於定時器的實現,函數內部調用了 dispatch_after_f
:
void dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt, dispatch_function_t func) { uint64_t delta; struct _dispatch_after_time_s *datc = NULL; dispatch_source_t ds; // 若是延遲爲 0,直接調用 dispatch_async delta = _dispatch_timeout(when); if (delta == 0) { return dispatch_async_f(queue, ctxt, func); } ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue); dispatch_assert(ds); datc = malloc(sizeof(*datc)); dispatch_assert(datc); datc->datc_ctxt = ctxt; datc->datc_func = func; datc->ds = ds;
dispatch_set_context(ds, datc); dispatch_source_set_event_handler_f(ds, _dispatch_after_timer_callback); dispatch_source_set_timer(ds, when, DISPATCH_TIME_FOREVER, 0); dispatch_resume(ds); }
首先將延遲執行的 block 封裝在 _dispatch_after_time_s
這個結構體中,而且做爲上下文,與 timer 綁定,而後啓動 timer。
到時之後,執行 _dispatch_after_timer_callback
回調,並取出上下文中的 block:
static void _dispatch_after_timer_callback(void *ctxt) { struct _dispatch_after_time_s *datc = ctxt; _dispatch_client_callout(datc->datc_ctxt, datc->datc_func); // 清理工做 }
總結
本文主要整理了 GCD 中常見的 API 以及底層的實現原理。對於隊列來講,須要理解它的數據結構,轉發機制,以及底層的線程池模型。
dispatch_async
會把任務添加到隊列的一個鏈表中,添加完後會喚醒隊列,根據 vtable 中的函數指針,調用 wakeup 方法。在 wakeup 方法中,從線程池裏取出工做線程(若是沒有就新建),而後在工做線程中取出鏈表頭部指向的 block 並執行。
dispatch_sync
的實現略簡單一些,它不涉及線程池(所以通常都在當前線程執行),而是利用與線程綁定的信號量來實現串行。
分發到不一樣隊列時,代碼進入的分支也不同,好比 dispatch_async
到主隊列的任務由 runloop 處理,而分發到其餘隊列的任務由線程池處理。
在當前串行隊列中執行 dispatch_sync
時,因爲 dq_running
屬性(表示在運行的任務數量) 爲 1,因此如下判斷成立:
if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1))) { return _dispatch_barrier_sync_f_slow(dq, ctxt, func); }
在 _dispatch_barrier_sync_f_slow
函數中使用了線程對應的信號量而且調用 wait
方法,從而致使線程死鎖。
若是向其它隊列同步提交 block,最終進入 _dispatch_barrier_sync_f_invoke
,它只是保證了 block 執行的原子性,但沒有使用線程對應的信號量。
對於信號量來講,它主要使用 signal
和 wait
這兩個接口,底層分別調用了內核提供的方法。在調用 signal
方法後,先將 value 減一,若是大於零馬上返回,不然陷入等待。signal
方法將信號量加一,若是 value 大於零馬上返回,不然說明喚醒了某一個等待線程,此時由系統決定哪一個線程的等待方法能夠返回。
dispatch_group
的本質就是一個 value 很是大的信號量,等待 group 完成實際上就是等待 value 恢復初始值。而 notify 的做用是將全部註冊的回調組裝成一個鏈表,在 dispatch_async
完成時判斷 value 是否是恢復初始值,若是是則調用 dispatch_async
異步執行全部註冊的回調。
dispatch_once
經過一個靜態變量來標記 block 是否已被執行,同時使用信號量確保只有一個線程能執行,執行完 block 後會喚醒其餘全部等待的線程。
dispatch_barrier_async
改變了 block 的 vtable
標記位,當它將要被取出執行時,會等待前面的 block 都執行完,而後在下一次循環中被執行。
dispatch_source
能夠用來實現定時器。全部的 source 會被提交到用戶指定的隊列,而後提交到 manager 隊列中,按照觸發時間排好序。隨後找到最近觸發的定時器,調用內核的 select
方法等待。等待結束後,依次喚醒 manager 隊列和用戶指定隊列,最終觸發一開始設置的回調 block。
GCD 中的對象用 do_suspend_cnt
來表示是否暫停。隊列默認處於啓動狀態,而 dispatch_source
須要手動啓動。
dispatch_after
函數依賴於 dispatch_source
定時器,它只是註冊了一個定時器,而後在回調函數中執行 block。