博客連接深刻理解GCD之dispatch_queueapi
上一篇咱們介紹了GCD的結構體,這一篇咱們着重看一下GCD中隊列的構成。隊列是咱們在使用GCD中常常接觸的技術點。bash
這兩個術語咱們能夠常常聽到,不知道有沒有人會把這兩個概念等同化。主隊列和主線程是有關聯,可是它們是兩個不一樣的概念。簡單地說,主隊列是主線程上的一個串行隊列,是系統自動爲咱們建立的。換言之,主線程是能夠執行除主隊列以外其餘隊列的任務。併發
Concurrent Programming: APIs and Challenges中的一張圖片能夠很直觀地描述GCD與線程之間的關係:app
一個線程內可能有多個隊列,這些隊列多是串行的或者是並行的,按照同步或者異步的方式工做。less
dispatch_queue_s
是隊列的結構體,能夠說咱們在GCD中接觸最多的結構體了。異步
struct dispatch_queue_vtable_s {
DISPATCH_VTABLE_HEADER(dispatch_queue_s);
};
#define DISPATCH_QUEUE_MIN_LABEL_SIZE 64
#ifdef __LP64__
#define DISPATCH_QUEUE_CACHELINE_PAD 32
#else
#define DISPATCH_QUEUE_CACHELINE_PAD 8
#endif
#define DISPATCH_QUEUE_HEADER \
uint32_t volatile dq_running; \
uint32_t dq_width; \
struct dispatch_object_s *volatile dq_items_tail; \
struct dispatch_object_s *volatile dq_items_head; \
unsigned long dq_serialnum; \
dispatch_queue_t dq_specific_q;
struct dispatch_queue_s {
DISPATCH_STRUCT_HEADER(dispatch_queue_s, dispatch_queue_vtable_s);
DISPATCH_QUEUE_HEADER;
char dq_label[DISPATCH_QUEUE_MIN_LABEL_SIZE]; // must be last
char _dq_pad[DISPATCH_QUEUE_CACHELINE_PAD]; // for static queues only
};
複製代碼
GCD中使用了不少的宏,不利於咱們理解代碼,咱們用對應的結構替換掉定義的宏,以下:async
struct dispatch_queue_s {
//第一部分:DISPATCH_STRUCT_HEADER(dispatch_queue_s, dispatch_queue_vtable_s)
const struct dispatch_queue_vtable_s *do_vtable; \ //dispatch_queue_s的操做函數:dispatch_queue_vtable_s類型的結構體
struct dispatch_queue_s *volatile do_next; \ //鏈表的next
unsigned int do_ref_cnt; \ //引用計數
unsigned int do_xref_cnt; \ //外部引用計數
unsigned int do_suspend_cnt; \ //暫停標誌,好比延時處理中,在任務到時後,計時器處理將會將該標誌位修改,而後喚醒隊列調度
struct dispatch_queue_s *do_targetq; \ //目標隊列,GCD容許咱們將一個隊列放在另外一個隊列裏執行任務
void *do_ctxt; \ //上下文,用來存儲線程池相關數據,好比用於線程掛起和喚醒的信號量、線程池尺寸等
void *do_finalizer;
//第二部分:DISPATCH_QUEUE_HEADER
uint32_t volatile dq_running; \ //是否運行中
uint32_t dq_width; \ //最大併發數:主線程/串行中這個值爲1
struct dispatch_object_s *volatile dq_items_tail; \ //鏈表尾節點
struct dispatch_object_s *volatile dq_items_head; \ //鏈表頭節點
unsigned long dq_serialnum; \ //隊列的序列號
dispatch_queue_t dq_specific_q; //specific隊列
//其餘:
char dq_label[DISPATCH_QUEUE_MIN_LABEL_SIZE]; // must be last 說明隊列的名字要少於64個字符
char _dq_pad[DISPATCH_QUEUE_CACHELINE_PAD]; // for static queues only
};
複製代碼
隊列的類型能夠分爲主隊列、管理隊列、自定義隊列、全局隊列4種類型。函數
咱們在開發過程當中可使用dispatch_get_main_queue
獲取主隊列,看一下它的定義:oop
#define dispatch_get_main_queue() (&_dispatch_main_q)
struct dispatch_queue_s _dispatch_main_q = {
#if !DISPATCH_USE_RESOLVERS
.do_vtable = &_dispatch_queue_vtable,
.do_targetq = &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
#endif
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.dq_label = "com.apple.main-thread",
.dq_running = 1,
.dq_width = 1, //說明主隊列是一個串行隊列
.dq_serialnum = 1,
};
複製代碼
它的幾個主要屬性:優化
1.do_vtable
const struct dispatch_queue_vtable_s _dispatch_queue_vtable = {
.do_type = DISPATCH_QUEUE_TYPE,
.do_kind = "queue",
.do_dispose = _dispatch_queue_dispose,
.do_invoke = NULL,
.do_probe = (void *)dummy_function_r0,
.do_debug = dispatch_queue_debug,
};
複製代碼
2.do_targetq
主隊列的目標隊列:"com.apple.root.default-overcommit-priority"這個全局隊列。這裏咱們先提早總結一下:非全局隊列的隊列類型(主隊列以及後面提到的管理隊列和自定義隊列),都須要壓入到全局隊列處理,因此須要設置do_targetq
。
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
.dq_label = "com.apple.root.default-overcommit-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 7,
},
複製代碼
3.do_ref_cnt和do_xref_cnt
前面提到do_ref_cnt
和do_xref_cnt
是引用計數,主隊列的這兩個值爲DISPATCH_OBJECT_GLOBAL_REFCNT
。既然是引用計數,那想必是和GCD的內存管理有關,找到和內存管理相關的代碼:
void
dispatch_retain(dispatch_object_t dou)
{
if (slowpath(dou._do->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
return; // global object
}
...
}
void
_dispatch_retain(dispatch_object_t dou)
{
if (slowpath(dou._do->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
return; // global object
}
...
}
void
dispatch_release(dispatch_object_t dou)
{
if (slowpath(dou._do->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
return;
}
...
}
void
_dispatch_release(dispatch_object_t dou)
{
if (slowpath(dou._do->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
return; // global object
}
...
}
複製代碼
從上面幾個函數咱們能夠看出,主隊列的生命週期是伴隨着應用的,不會受retain和release的影響。
_dispatch_mgr_q
(管理隊列),是GCD的內部隊列,不對外公開。從名字上看,這個隊列應該是用來扮演管理的角色,GCD定時器就用到了管理隊列。
struct dispatch_queue_s _dispatch_mgr_q = {
.do_vtable = &_dispatch_queue_mgr_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_targetq = &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
.dq_label = "com.apple.libdispatch-manager",
.dq_width = 1,
.dq_serialnum = 2,
};
複製代碼
1.do_vtable
static const struct dispatch_queue_vtable_s _dispatch_queue_mgr_vtable = {
.do_type = DISPATCH_QUEUE_MGR_TYPE,
.do_kind = "mgr-queue",
.do_invoke = _dispatch_mgr_thread,
.do_debug = dispatch_queue_debug,
.do_probe = _dispatch_mgr_wakeup,
};
複製代碼
2.do_targetq
管理隊列的目標隊列:"com.apple.root.high-overcommit-priority"這個全局隊列。
[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
.dq_label = "com.apple.root.high-overcommit-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 9,
},
複製代碼
3.do_ref_cnt和do_xref_cnt
管理隊列的這兩個值爲DISPATCH_OBJECT_GLOBAL_REFCNT
,因此和主隊列的生命週期應該是同樣的。
咱們在開發中會使用dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
建立一個自定義的隊列。它的源代碼以下:
dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
dispatch_queue_t dq;
size_t label_len;
if (!label) {
label = "";
}
label_len = strlen(label);
if (label_len < (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1)) {
label_len = (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1);
}
// XXX switch to malloc()
dq = calloc(1ul, sizeof(struct dispatch_queue_s) -
DISPATCH_QUEUE_MIN_LABEL_SIZE - DISPATCH_QUEUE_CACHELINE_PAD +
label_len + 1);
if (slowpath(!dq)) {
return dq;
}
//隊列初始化數據
_dispatch_queue_init(dq);
strcpy(dq->dq_label, label);
if (fastpath(!attr)) {
return dq;
}
if (fastpath(attr == DISPATCH_QUEUE_CONCURRENT)) {
dq->dq_width = UINT32_MAX;
dq->do_targetq = _dispatch_get_root_queue(0, false);
} else {
dispatch_debug_assert(!attr, "Invalid attribute");
}
return dq;
}
複製代碼
1.slowpath(x)
和fastpath(x)
關於這兩個宏的定義以下:
#define fastpath(x) ((typeof(x))__builtin_expect((long)(x), ~0l))
#define slowpath(x) ((typeof(x))__builtin_expect((long)(x), 0l))
複製代碼
fastpath(x)
表示x的值通常不爲0,但願編譯器進行優化。slowpath(x)
表示x的值極可能爲0,但願編譯器進行優化。
2._dispatch_queue_init
static inline void
_dispatch_queue_init(dispatch_queue_t dq)
{
dq->do_vtable = &_dispatch_queue_vtable;
dq->do_next = DISPATCH_OBJECT_LISTLESS;
dq->do_ref_cnt = 1;
dq->do_xref_cnt = 1;
// Default target queue is overcommit!
dq->do_targetq = _dispatch_get_root_queue(0, true);
dq->dq_running = 0;
dq->dq_width = 1;
dq->dq_serialnum = dispatch_atomic_inc(&_dispatch_queue_serial_numbers) - 1;
}
複製代碼
_dispatch_queue_init
默認設置一個隊列爲串行隊列,它的目標隊列是_dispatch_get_root_queue(0, true)
。
3.do_targetq
前面對這個字段的解釋有點簡單了,do_targetq
表明目的隊列。在Concurrent Programming: APIs and Challenges提到:
While custom queues are a powerful abstraction, all blocks you schedule on them will ultimately trickle down to one of the system’s global queues and its thread pool(s).
雖然自定義隊列是一個強大的抽象,但你在隊列上安排的全部Block最終都會滲透到系統的某一個全局隊列及其線程池。
看起來自定義隊列更像是全局隊列的一個代理。在自定義隊列建立的時候默認其目標隊列爲_dispatch_get_root_queue(0, true)
。其中0
表明優先級DISPATCH_QUEUE_PRIORITY_DEFAULT
,true
表明是不是overcommit
。overcommit
參數表示該隊列在執行block時,不管系統多忙都會新開一個線程,這樣作的目的是不會形成某個線程過載。若是是自定義併發隊列的話,do_targetq
會被設置爲_dispatch_get_root_queue(0, false)
。
值得注意的是,主隊列的目標隊列也是一個全局隊列,全局隊列的底層就是普通的線程池(這個會在全局隊列中講到)。
4.dq_serialnum
dq_serialnum
是在_dispatch_queue_serial_numbers
基礎上進行原子操做加1,即從12開始累加。1到11被保留的序列號定義以下:
// skip zero
// 1 - main_q
// 2 - mgr_q
// 3 - _unused_
// 4,5,6,7,8,9,10,11 - global queues
// we use 'xadd' on Intel, so the initial value == next assigned
複製代碼
其中1用於主隊列,2用於管理隊列,3暫時沒有被使用,4~11是用於全局隊列的。因爲我找錯了看的源碼是libdispatch-187.10
很老了,後面蘋果有新增了幾個隊列。
上面說了不少全局隊列,如今咱們來看一下全局隊列是如何定義的。
dispatch_queue_t
dispatch_get_global_queue(long priority, unsigned long flags)
{
if (flags & ~DISPATCH_QUEUE_OVERCOMMIT) {
return NULL;
}
return _dispatch_get_root_queue(priority,
flags & DISPATCH_QUEUE_OVERCOMMIT);
}
static inline dispatch_queue_t
_dispatch_get_root_queue(long priority, bool overcommit)
{
if (overcommit) switch (priority) {
case DISPATCH_QUEUE_PRIORITY_LOW:
return &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_DEFAULT:
return &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_HIGH:
return &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_BACKGROUND:
return &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY];
}
switch (priority) {
case DISPATCH_QUEUE_PRIORITY_LOW:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_DEFAULT:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_HIGH:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_BACKGROUND:
return &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY];
default:
return NULL;
}
}
DISPATCH_CACHELINE_ALIGN
struct dispatch_queue_s _dispatch_root_queues[] = {
[DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY],
.dq_label = "com.apple.root.low-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 4,
},
[DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY],
.dq_label = "com.apple.root.low-overcommit-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 5,
},
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY],
.dq_label = "com.apple.root.default-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 6,
},
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
.dq_label = "com.apple.root.default-overcommit-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 7,
},
[DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY],
.dq_label = "com.apple.root.high-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 8,
},
[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
.dq_label = "com.apple.root.high-overcommit-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 9,
},
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY],
.dq_label = "com.apple.root.background-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 10,
},
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY],
.dq_label = "com.apple.root.background-overcommit-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 11,
},
};
複製代碼
1.do_vtable
全局隊列的do_vtable
:
static const struct dispatch_queue_vtable_s _dispatch_queue_root_vtable = {
.do_type = DISPATCH_QUEUE_GLOBAL_TYPE,
.do_kind = "global-queue",
.do_debug = dispatch_queue_debug,
.do_probe = _dispatch_queue_wakeup_global,
};
複製代碼
2.do_ctxt
全局隊列中有一個上下文的屬性,用來存儲線程池相關數據,好比用於線程掛起和喚醒的信號量、線程池尺寸等。
它的定義以下:
static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
[DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
};
複製代碼
dispatch_sync
的源碼以下:
void
dispatch_sync(dispatch_queue_t dq, void (^work)(void))
{
#if DISPATCH_COCOA_COMPAT
if (slowpath(dq == &_dispatch_main_q)) {
return _dispatch_sync_slow(dq, work);
}
#endif
struct Block_basic *bb = (void *)work;
dispatch_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
}
複製代碼
若是這個隊列是主隊列,則調用_dispatch_sync_slow
,不然調用dispatch_sync_f
。點開_dispatch_sync_slow
返現,最終仍是調用了dispatch_sync_f
方法。經過_dispatch_Block_copy
或者Block_basic
完成由block到function的轉換。因此block的執行底層仍是使用function。
dispatch_sync_f
的源碼以下:
void
dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
//串行隊列
if (fastpath(dq->dq_width == 1)) {
return dispatch_barrier_sync_f(dq, ctxt, func);
}
//全局隊列
if (slowpath(!dq->do_targetq)) {
// the global root queues do not need strict ordering
(void)dispatch_atomic_add2o(dq, dq_running, 2);
return _dispatch_sync_f_invoke(dq, ctxt, func);
}
//併發隊列
_dispatch_sync_f2(dq, ctxt, func);
}
複製代碼
這裏分紅了三種狀況:
dispatch_barrier_sync_f
;_dispatch_sync_f_invoke
;_dispatch_sync_f2
。若是是串行隊列壓入同步任務,那麼當前任務就必須等待前面的任務執行完成後才能執行。源代碼就會調用dispatch_barrier_sync_f
函數完成上面的效果。
void
dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func)
{
// 1) ensure that this thread hasn't enqueued anything ahead of this call // 2) the queue is not suspended //第一步:若是串行隊列中存在其餘任務或者隊列被掛起,則直接進入_dispatch_sync_f_slow函數,等待這個隊列中的其餘任務完成(信號量的方式),而後執行這個任務。 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){ return _dispatch_barrier_sync_f_slow(dq, ctxt, func); } //第二步:檢測隊列的dq_running狀態,若是有運行,進入_dispatch_barrier_sync_f_slow,等待激活。 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1))) { // global queues and main queue bound to main thread always falls into // the slow case return _dispatch_barrier_sync_f_slow(dq, ctxt, func); } //第三步:有多重隊列,尋找真正的目標隊列,其實仍是回到了dispatch_sync_f方法 if (slowpath(dq->do_targetq->do_targetq)) { return _dispatch_barrier_sync_f_recurse(dq, ctxt, func); } //第四步:執行隊列裏的任務,執行後檢測隊列有無其餘任務,若是有,釋放前面的信號量(釋放信號_dispatch_barrier_sync_f2函數中)。 _dispatch_barrier_sync_f_invoke(dq, ctxt, func); } 複製代碼
看了上面的代碼註釋後,咱們來想一下同步串行隊列死鎖問題。死鎖是怎麼產生的?先看下示例代碼:
#import "DeadLock.h"
@implementation DeadLock
- (instancetype)init {
if (self = [super init]) {
// [self _mianQueueDeadLock];
[self _serialQueueDeadLock];
}
return self;
}
#pragma mark - Private
- (void)_mianQueueDeadLock {
dispatch_sync(dispatch_get_main_queue(), ^(void){
NSLog(@"這裏死鎖了");
});
}
- (void)_serialQueueDeadLock {
dispatch_queue_t queue1 = dispatch_queue_create("1serialQueue", DISPATCH_QUEUE_SERIAL);
dispatch_queue_t queue2 = dispatch_queue_create("2serialQueue", DISPATCH_QUEUE_SERIAL);
dispatch_sync(queue1, ^{
NSLog(@"11111");
dispatch_sync(queue1, ^{//若是使用queue2就不會發生死鎖,使用queue1就會死鎖
NSLog(@"22222");
});
});
}
@end
複製代碼
以_serialQueueDeadLock
爲例:當第一次執行串行隊列任務的時候,跳到第四步,直接開始執行任務,在運行第二個dispatch_sync
時候,在任務裏面經過執行第一步(隊列在運行)向這個同步隊列中壓入信號量,而後等待信號量,進入死鎖。若是主隊列則會跳轉到第二步進入死鎖。
static void
_dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func)
{
_dispatch_function_invoke(dq, ctxt, func);
if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) == 0)) {
_dispatch_wakeup(dq);
}
}
複製代碼
若是當前隊列是全局隊列的話,就會調用_dispatch_sync_f_invoke
。這個函數的做用:執行傳入的任務,而後根據dq_running檢測任務隊列有沒有激活,沒有激活就執行激活函數。關於激活函數_dispatch_wakeup(dq)
放在隊列的異步中講解。
_dispatch_sync_f2(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
// 1) ensure that this thread hasn't enqueued anything ahead of this call // 2) the queue is not suspended //第一步:併發隊列中有其餘任務或者隊列被掛起,壓入信號量,等待其餘線程釋放這個信號量 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){ return _dispatch_sync_f_slow(dq, ctxt, func); } //第二步:並行隊列沒激活,激活隊列後執行任務,最終仍是調用了_dispatch_sync_f_slow函數,只是多了一個激活函數 if (slowpath(dispatch_atomic_add2o(dq, dq_running, 2) & 1)) { return _dispatch_sync_f_slow2(dq, ctxt, func); } //第三步:隊列有多重隊列,尋找真正的目標隊列 if (slowpath(dq->do_targetq->do_targetq)) { return _dispatch_sync_f_recurse(dq, ctxt, func); } //第四步:並行隊列沒有其餘任務,調用並激活這個隊列 _dispatch_sync_f_invoke(dq, ctxt, func); } 複製代碼
經過上面的註釋,並行隊列同步執行是順序執行的。這種順序執行和操做隊列爲併發隊列沒有關係。而是由於這些操做均爲同步操做,因此每個操做放入隊列後都會被等待執行完成纔會放入下一操做,形成了這種順序執行的現象。
如今咱們整理一下隊列同步執行的流程,以下圖:
說完了同步咱們如今看一下異步。咱們使用dispatch_async
進行隊列的異步執行。
dispatch_async
的源碼以下:
void
dispatch_async(dispatch_queue_t dq, void (^work)(void))
{
dispatch_async_f(dq, _dispatch_Block_copy(work),
_dispatch_call_block_and_release);
}
複製代碼
dispatch_async
主要將block從棧copy到堆上,或者增長引用計數,保證block在執行以前不會被銷燬,另外_dispatch_call_block_and_release
用於銷燬block。而後調用dispatch_async_f
。
void
dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
dispatch_continuation_t dc;
// No fastpath/slowpath hint because we simply don't know //串行隊列,執行dispatch_barrier_async_f,其實最後仍是執行任務入隊的操做 if (dq->dq_width == 1) { return dispatch_barrier_async_f(dq, ctxt, func); } //從線程私有數據中獲取一個dispatch_continuation_t的結構體 dc = fastpath(_dispatch_continuation_alloc_cacheonly()); if (!dc) { return _dispatch_async_f_slow(dq, ctxt, func); } dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT; dc->dc_func = func; dc->dc_ctxt = ctxt; // No fastpath/slowpath hint because we simply don't know
//有目標隊列,調用_dispatch_async_f2函數進行轉發。
if (dq->do_targetq) {
return _dispatch_async_f2(dq, dc);
}
//全局隊列直接進行入隊操做
_dispatch_queue_push(dq, dc);
}
複製代碼
從上面的源代碼中咱們能夠看出dispatch_async_f
大體分爲三種狀況:
dispatch_barrier_async_f
;_dispatch_async_f2
;雖然上面分三種狀況,可是歸根到底,它們最後執行都是_dispatch_queue_push
來進行入隊的操做。
這裏有一點須要注意下:就是dispatch_continuation_t
中do_vtable
的賦值狀況。
//串行隊列,barrier
dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
//not barrier
dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
複製代碼
在libdispatch
所有標識符有四種:
#define DISPATCH_OBJ_ASYNC_BIT 0x1 //異步
#define DISPATCH_OBJ_BARRIER_BIT 0x2 //阻塞
#define DISPATCH_OBJ_GROUP_BIT 0x4 //組
#define DISPATCH_OBJ_SYNC_SLOW_BIT 0x8 //同步慢
複製代碼
從上面咱們能夠知道串行隊列在異步執行的時候,經過DISPATCH_OBJ_BARRIER_BIT
這個標識符實現阻塞等待的。
接着咱們分析下dispatch_barrier_async_f
和_dispatch_async_f2
這兩個函數。
dispatch_barrier_async_f
的源碼以下:
void
dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_continuation_t dc;
//從線程私有數據中獲取一個dispatch_continuation_t的結構體,dispatch_continuation_t中封裝了異步執行任務。
dc = fastpath(_dispatch_continuation_alloc_cacheonly());
if (!dc) {
//return _dispatch_barrier_async_f_slow(dq, ctxt, func);
//如下是_dispatch_barrier_async_f_slow的具體實現
//若是沒有則從堆上獲取一個dispatch_continuation_t的結構體
dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap();
//經過do_vtable區分類型
dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
//將_dispatch_call_block_and_release做爲func方法
dc->dc_func = func;
//將傳入的block做爲上下文
dc->dc_ctxt = ctxt;
//入隊操做
_dispatch_queue_push(dq, dc);
}
dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
dc->dc_func = func;
dc->dc_ctxt = ctxt;
_dispatch_queue_push(dq, dc);
}
複製代碼
_dispatch_queue_push
是一個宏定義,它最後會變成執行_dispatch_queue_push_list
函數。
#define _dispatch_queue_push(x, y) _dispatch_queue_push_list((x), (y), (y))
#define _dispatch_queue_push_list _dispatch_trace_queue_push_list
static inline void
_dispatch_trace_queue_push_list(dispatch_queue_t dq, dispatch_object_t _head,
dispatch_object_t _tail)
{
if (slowpath(DISPATCH_QUEUE_PUSH_ENABLED())) {
struct dispatch_object_s *dou = _head._do;
do {
//主要是對dispatch_continuation_s結構體的處理,確保後面的使用。
_dispatch_trace_continuation(dq, dou, DISPATCH_QUEUE_PUSH);
} while (dou != _tail._do && (dou = dou->do_next));
}
_dispatch_queue_push_list(dq, _head, _tail);
}
static inline void
_dispatch_queue_push_list(dispatch_queue_t dq, dispatch_object_t _head,
dispatch_object_t _tail)
{
struct dispatch_object_s *prev, *head = _head._do, *tail = _tail._do;
tail->do_next = NULL;
dispatch_atomic_store_barrier();
//dispatch_atomic_xchg2o實質是調用((typeof(*(p)))__sync_swap((p), (n))),它的定義是將p設爲n並返回p操做以前的值。
//dispatch_atomic_xchg2o(dq, dq_items_tail, tail)至關於dq->dq_items_tail = tail,從新設置了隊列的尾指針
prev = fastpath(dispatch_atomic_xchg2o(dq, dq_items_tail, tail));
if (prev) {
// if we crash here with a value less than 0x1000, then we are at a
// known bug in client code for example, see _dispatch_queue_dispose
// or _dispatch_atfork_child
//prev是原先的隊尾,若是隊列中有其餘的元素,就將壓入的對象加在隊列的尾部。
prev->do_next = head;
} else {
//若是隊列爲空
_dispatch_queue_push_list_slow(dq, head);
}
}
複製代碼
_dispatch_queue_push_list_slow
若是隊列爲空,調用_dispatch_queue_push_list_slow
方法。
_dispatch_queue_push_list_slow(dispatch_queue_t dq,
struct dispatch_object_s *obj)
{
//dq->dq_items_head設置爲dc,而後喚醒這個隊列。由於此時隊列爲空,沒有任務在執行,處於休眠狀態,因此須要喚醒
_dispatch_retain(dq);
dq->dq_items_head = obj;
_dispatch_wakeup(dq);
_dispatch_release(dq);
}
複製代碼
不管是同步仍是異步中都調用了_dispatch_wakeup
,這個函數的做用就是喚醒當前隊列。
_dispatch_wakeup
的源碼:
dispatch_queue_t
_dispatch_wakeup(dispatch_object_t dou)
{
dispatch_queue_t tq;
if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou._do))) {
return NULL;
}
//這裏比較隱晦,這裏實際上是走全局隊列的喚醒邏輯調用_dispatch_queue_wakeup_global,若是喚醒失敗且對尾指針爲空,返回NULL;若是是管理隊列的的話,則執行_dispatch_mgr_wakeup函數
if (!dx_probe(dou._do) && !dou._dq->dq_items_tail) {
return NULL;
}
// _dispatch_source_invoke() relies on this testing the whole suspend count
// word, not just the lock bit. In other words, no point taking the lock
// if the source is suspended or canceled.
if (!dispatch_atomic_cmpxchg2o(dou._do, do_suspend_cnt, 0,
DISPATCH_OBJECT_SUSPEND_LOCK)) {
#if DISPATCH_COCOA_COMPAT
if (dou._dq == &_dispatch_main_q) {
//傳入主隊列,會進入到 _dispatch_queue_wakeup_main() 函數中
_dispatch_queue_wakeup_main();
}
#endif
return NULL;
}
_dispatch_retain(dou._do);
//有目標隊列,繼續向目標隊列壓入這個隊列
tq = dou._do->do_targetq;
_dispatch_queue_push(tq, dou._do);
return tq; // libdispatch does not need this, but the Instrument DTrace
// probe does
}
複製代碼
從上面的代碼能夠看出_dispatch_wakeup
分爲四種狀況:
_dispatch_queue_wakeup_main()
。_dispatch_queue_wakeup_global
。_dispatch_mgr_wakeup
,這裏主要是爲了dispatch_source而服務的。void
_dispatch_queue_wakeup_main(void)
{
kern_return_t kr;
dispatch_once_f(&_dispatch_main_q_port_pred, NULL,
_dispatch_main_q_port_init);
//喚醒主線程,這裏已經點不進去了,關於主線的喚醒主要靠mach_port和在runloop中註冊相對應的source1
kr = _dispatch_send_wakeup_main_thread(main_q_port, 0);
switch (kr) {
case MACH_SEND_TIMEOUT:
case MACH_SEND_TIMED_OUT:
case MACH_SEND_INVALID_DEST:
break;
default:
(void)dispatch_assume_zero(kr);
break;
}
_dispatch_safe_fork = false;
}
複製代碼
上面提到dx_probe(dou._do)
這裏走的是全局隊列的喚醒。前面提到全局隊列的do_vtable
:
static const struct dispatch_queue_vtable_s _dispatch_queue_root_vtable = {
.do_type = DISPATCH_QUEUE_GLOBAL_TYPE,
.do_kind = "global-queue",
.do_debug = dispatch_queue_debug,
.do_probe = _dispatch_queue_wakeup_global,
};
複製代碼
_dispatch_queue_wakeup_global
的源碼:
static bool
_dispatch_queue_wakeup_global(dispatch_queue_t dq)
{
static dispatch_once_t pred;
struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
int r;
if (!dq->dq_items_tail) {
return false;
}
_dispatch_safe_fork = false;
dispatch_debug_queue(dq, __PRETTY_FUNCTION__);
dispatch_once_f(&pred, NULL, _dispatch_root_queues_init);
#if HAVE_PTHREAD_WORKQUEUES
#if DISPATCH_ENABLE_THREAD_POOL
//隊列上下文的dgq_kworkqueue存在,則調用pthread_workqueue_additem_np函數,該函數使用workq_kernreturn系統調用,通知workqueue增長應當執行的項目。根據該通知,XNU內核基於系統狀態判斷是否要生成線程,若是是overcommit優先級的隊列,workqueue則始終生成線程,以後線程執行_dispatch_worker_thread2函數。
//工做隊列,是一個用於建立內核線程的接口,經過它建立的內核線程來執行內核其餘模塊排列到隊列裏的工做。不一樣優先級的dispatch queue對應着對應優先級的workqueue。GCD初始化的時候,使用pthread_workqueue_create_np建立pthread_workqueue
if (qc->dgq_kworkqueue)
#endif
{
if (dispatch_atomic_cmpxchg2o(qc, dgq_pending, 0, 1)) {
pthread_workitem_handle_t wh;
unsigned int gen_cnt;
_dispatch_debug("requesting new worker thread");
r = pthread_workqueue_additem_np(qc->dgq_kworkqueue,
_dispatch_worker_thread2, dq, &wh, &gen_cnt);
(void)dispatch_assume_zero(r);
} else {
_dispatch_debug("work thread request still pending on global "
"queue: %p", dq);
}
goto out;
}
#endif // HAVE_PTHREAD_WORKQUEUES
#if DISPATCH_ENABLE_THREAD_POOL
//經過發送一個信號量使線程保活
if (dispatch_semaphore_signal(qc->dgq_thread_mediator)) {
goto out;
}
pthread_t pthr;
int t_count;
do {
t_count = qc->dgq_thread_pool_size;
if (!t_count) {
_dispatch_debug("The thread pool is full: %p", dq);
goto out;
}
} while (!dispatch_atomic_cmpxchg2o(qc, dgq_thread_pool_size, t_count,
t_count - 1));//若是線程池可用則減1
//這裏說明線程池不夠用了,使用pthread建立一個線程,並執行_dispatch_worker_thread,_dispatch_worker_thread最終會調用到_dispatch_worker_thread2
while ((r = pthread_create(&pthr, NULL, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
sleep(1);
}
//保證pthr能被自動回收掉
r = pthread_detach(pthr);
(void)dispatch_assume_zero(r);
#endif // DISPATCH_ENABLE_THREAD_POOL
out:
return false;
}
複製代碼
_dispatch_worker_thread2
的代碼以下:
_dispatch_worker_thread2(void *context)
{
struct dispatch_object_s *item;
dispatch_queue_t dq = context;
struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
if (_dispatch_thread_getspecific(dispatch_queue_key)) {
DISPATCH_CRASH("Premature thread recycling");
}
//把dq設置爲剛啓動的這個線程的TSD
_dispatch_thread_setspecific(dispatch_queue_key, dq);
qc->dgq_pending = 0;
#if DISPATCH_COCOA_COMPAT
(void)dispatch_atomic_inc(&_dispatch_worker_threads);
// ensure that high-level memory management techniques do not leak/crash
if (dispatch_begin_thread_4GC) {
dispatch_begin_thread_4GC();
}
void *pool = _dispatch_begin_NSAutoReleasePool();
#endif
#if DISPATCH_PERF_MON
uint64_t start = _dispatch_absolute_time();
#endif
//_dispatch_queue_concurrent_drain_one用來取出隊列的一個內容
while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) {
// 用來對取出的內容進行處理(若是是任務,則執行任務)
_dispatch_continuation_pop(item);
}
#if DISPATCH_PERF_MON
_dispatch_queue_merge_stats(start);
#endif
#if DISPATCH_COCOA_COMPAT
_dispatch_end_NSAutoReleasePool(pool);
dispatch_end_thread_4GC();
if (!dispatch_atomic_dec(&_dispatch_worker_threads) &&
dispatch_no_worker_threads_4GC) {
dispatch_no_worker_threads_4GC();
}
#endif
_dispatch_thread_setspecific(dispatch_queue_key, NULL);
_dispatch_force_cache_cleanup();
}
複製代碼
這裏有兩個比較重要的方法:
_dispatch_queue_concurrent_drain_one
這個方法用來取出隊列中的一個內容。_dispatch_continuation_pop
這個方法用來處理取出的內容_dispatch_queue_concurrent_drain_one
struct dispatch_object_s *
_dispatch_queue_concurrent_drain_one(dispatch_queue_t dq)
{
struct dispatch_object_s *head, *next, *const mediator = (void *)~0ul;
// The mediator value acts both as a "lock" and a signal
head = dispatch_atomic_xchg(&dq->dq_items_head, mediator);
if (slowpath(head == NULL)) {
//隊列是空的
dispatch_atomic_cmpxchg(&dq->dq_items_head, mediator, NULL);
_dispatch_debug("no work on global work queue");
return NULL;
}
if (slowpath(head == mediator)) {
// 該線程在現線程競爭中失去了對隊列的擁有權,這意味着libdispatch的效率很糟糕,
// 這種狀況意味着在線程池中有太多的線程,這個時候應該建立一個pengding線程,
// 而後退出該線程,內核會在負載減弱的時候建立一個新的線程
_dispatch_queue_wakeup_global(dq);
return NULL;
}
// 在返回以前將head指針的do_next保存下來,若是next爲NULL,這意味着item是最後一個
next = fastpath(head->do_next);
if (slowpath(!next)) {
dq->dq_items_head = NULL;
if (dispatch_atomic_cmpxchg(&dq->dq_items_tail, head, NULL)) {
// head 和 tail頭尾指針均爲空
goto out;
}
// 此時必定有item,該線程不會等待過久.
while (!(next = head->do_next)) {
_dispatch_hardware_pause();
}
}
// 繼續調度
dq->dq_items_head = next;
_dispatch_queue_wakeup_global(dq);
out:
// 返回隊列的頭指針
return head;
}
複製代碼
_dispatch_continuation_pop
static inline void
_dispatch_continuation_pop(dispatch_object_t dou)
{
dispatch_continuation_t dc = dou._dc;
dispatch_group_t dg;
_dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dou);
//檢測是否是隊列,若是是,就進入_dispatch_queue_invoke 處理隊列
if (DISPATCH_OBJ_IS_VTABLE(dou._do)) {
return _dispatch_queue_invoke(dou._dq);
}
// Add the item back to the cache before calling the function. This
// allows the 'hot' continuation to be used for a quick callback.
//
// The ccache version is per-thread.
// Therefore, the object has not been reused yet.
// This generates better assembly.
if ((long)dc->do_vtable & DISPATCH_OBJ_ASYNC_BIT) {
_dispatch_continuation_free(dc);
}
//判斷是不是group
if ((long)dc->do_vtable & DISPATCH_OBJ_GROUP_BIT) {
dg = dc->dc_group;
} else {
dg = NULL;
}
//是任務封裝的 dispatch_continuation_t 結構體,直接執行任務。
//到這裏咱們知道了隊列執行的時候,block被調用的時機
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
if (dg) {
//group須要進行調用dispatch_group_leave並釋放信號
dispatch_group_leave(dg);
_dispatch_release(dg);
}
}
複製代碼
從上面的函數中能夠發現,壓入隊列的不只是任務,還有多是隊列。若是是隊列,直接執行了_dispatch_queue_invoke
,不然執行dc->dc_func(dc->dc_ctxt)
。
_dispatch_queue_invoke
void
_dispatch_queue_invoke(dispatch_queue_t dq)
{
if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) &&
fastpath(dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1))) {
dispatch_atomic_acquire_barrier();
dispatch_queue_t otq = dq->do_targetq, tq = NULL;
_dispatch_queue_drain(dq);
if (dq->do_vtable->do_invoke) {
// Assume that object invoke checks it is executing on correct queue
tq = dx_invoke(dq);
} else if (slowpath(otq != dq->do_targetq)) {
// An item on the queue changed the target queue
tq = dq->do_targetq;
}
// We do not need to check the result.
// When the suspend-count lock is dropped, then the check will happen.
dispatch_atomic_release_barrier();
//dq_running減1,由於任務要麼被直接執行了,要麼被壓到target隊列了
(void)dispatch_atomic_dec2o(dq, dq_running);
if (tq) {
return _dispatch_queue_push(tq, dq);
}
}
dq->do_next = DISPATCH_OBJECT_LISTLESS;
if (!dispatch_atomic_sub2o(dq, do_suspend_cnt,
DISPATCH_OBJECT_SUSPEND_LOCK)) {
//隊列處於空閒狀態,須要喚醒
if (dq->dq_running == 0) {
_dispatch_wakeup(dq); // verify that the queue is idle
}
}
//釋放隊列
_dispatch_release(dq); // added when the queue is put on the list
}
複製代碼
如今咱們整理一下隊列異步執行的流程,以下圖:
dispatch_queue
經過結構體和鏈表,被實現爲FIFO
(先進先出)隊列。不管串行隊列和併發隊列,都是符合FIFO
的原則。二者的主要區別是:執行順序不一樣,以及開啓線程數不一樣。
dispatch_sync
函數通常都在當前線程執行,利用與線程綁定的信號量來實現串行。
Block並非直接添加到隊列上,而是先構成一個dispatch_continuation
結構體。結構體包含了這個Block還有一些上下文信息。隊列會將這些dispatch_continuation
結構體添加隊列的鏈表中。不管這些隊列是什麼類型的,最終都是和全局隊列相關的。在全局隊列執行Block的時候,libdispatch從全局隊列中取出dispatch_continuation
,調用pthread_workqueue_additem_np
函數,將該全局隊列自身、符合其優先級的workqueue信息以及dispatch_continuation
結構體的回調函數傳遞給參數。pthread_workqueue_additem_np
函數使用workq_kernreturn
系統調用,通知workqueue增長應當執行的項目。根據該同志,XNU內核基於系統狀態判斷是否要生成線程。若是是overcommit優先級的全局隊列workqueue則會始終生成線程。workqueue的線程執行pthread_workqueue
函數,該函數調用libdispatch的回調函數。在該函數中執行加入到dispatch_continuation
的Block
dispatch_async
分發到主隊列的任務由Runloop
處理,而分發到其餘隊列的任務由線程池處理。
GCD死鎖是隊列致使的而不是線程致使,緣由是_dispatch_barrier_sync_f_slow
函數中使用了線程對應的信號量而且調用wait方法,從而致使線程死鎖。
dispatch_barrier_async
適用的場景隊列必須是用DISPATCH_QUEUE_CONCURRENT
屬性建立的隊列,而使用全局併發隊列的時候,其表現就和dispatch_async
同樣。緣由:dispatch_barrier_async
若是傳入的是全局隊列,在喚醒隊列時會執行_dispatch_queue_wakeup_global
函數,其執行效果同dispatch_async
一致,而若是是自定義的隊列的時候,_dispatch_continuation_pop
中會執行dispatch_queue_invoke
。在while循環中依次取出任務並調用_dispatch_continuation_redirect
函數,使得block併發執行。當遇到DISPATCH_OBJ_BARRIER_BIT
標記時,會修改do_suspend_cnt
標誌以保證後續while循環時直接goto out。barrier block的任務執行完以後_dispatch_queue_class_invoke
會將do_suspend_cnt
重置回去,因此barrier block以後的任務會繼續執行。
隊列操做和線程開啓的關係:dispatch_sync
添加任務到隊列,不會建立新的線程。全部任務都是在當前線程中處理的。dispatch_async
添加任務到隊列分爲三種狀況:主隊列不建立線程,在主線程中串行執行;全局隊列和自定義並行隊列根據任務系統決定開闢線程個數;自定義串行隊列建立一個線程,串行進行。