深刻理解GCD之dispatch_queue

博客連接深刻理解GCD之dispatch_queueapi

前言

上一篇咱們介紹了GCD的結構體,這一篇咱們着重看一下GCD中隊列的構成。隊列是咱們在使用GCD中常常接觸的技術點。bash

關鍵點

主隊列和主線程

這兩個術語咱們能夠常常聽到,不知道有沒有人會把這兩個概念等同化。主隊列和主線程是有關聯,可是它們是兩個不一樣的概念。簡單地說,主隊列是主線程上的一個串行隊列,是系統自動爲咱們建立的。換言之,主線程是能夠執行除主隊列以外其餘隊列的任務。併發

隊列和線程

Concurrent Programming: APIs and Challenges中的一張圖片能夠很直觀地描述GCD與線程之間的關係:app

一個線程內可能有多個隊列,這些隊列多是串行的或者是並行的,按照同步或者異步的方式工做。less

隊列的定義

dispatch_queue_s是隊列的結構體,能夠說咱們在GCD中接觸最多的結構體了。異步

struct dispatch_queue_vtable_s {
	DISPATCH_VTABLE_HEADER(dispatch_queue_s);
};

#define DISPATCH_QUEUE_MIN_LABEL_SIZE 64

#ifdef __LP64__
#define DISPATCH_QUEUE_CACHELINE_PAD 32
#else
#define DISPATCH_QUEUE_CACHELINE_PAD 8
#endif

#define DISPATCH_QUEUE_HEADER \
	uint32_t volatile dq_running; \                    
	uint32_t dq_width; \			
	struct dispatch_object_s *volatile dq_items_tail; \
	struct dispatch_object_s *volatile dq_items_head; \ 
	unsigned long dq_serialnum; \
	dispatch_queue_t dq_specific_q;

struct dispatch_queue_s {
	DISPATCH_STRUCT_HEADER(dispatch_queue_s, dispatch_queue_vtable_s); 
	DISPATCH_QUEUE_HEADER;
	char dq_label[DISPATCH_QUEUE_MIN_LABEL_SIZE]; // must be last 	
	char _dq_pad[DISPATCH_QUEUE_CACHELINE_PAD]; // for static queues only
};
複製代碼

GCD中使用了不少的宏,不利於咱們理解代碼,咱們用對應的結構替換掉定義的宏,以下:async

struct dispatch_queue_s {
    //第一部分:DISPATCH_STRUCT_HEADER(dispatch_queue_s, dispatch_queue_vtable_s)
    const struct dispatch_queue_vtable_s *do_vtable; \ //dispatch_queue_s的操做函數:dispatch_queue_vtable_s類型的結構體
	struct dispatch_queue_s *volatile do_next; \ //鏈表的next
	unsigned int do_ref_cnt; \	 //引用計數
	unsigned int do_xref_cnt; \ //外部引用計數
	unsigned int do_suspend_cnt; \ //暫停標誌,好比延時處理中,在任務到時後,計時器處理將會將該標誌位修改,而後喚醒隊列調度
	struct dispatch_queue_s *do_targetq; \ //目標隊列,GCD容許咱們將一個隊列放在另外一個隊列裏執行任務
	void *do_ctxt; \ //上下文,用來存儲線程池相關數據,好比用於線程掛起和喚醒的信號量、線程池尺寸等
	void *do_finalizer;
	
	//第二部分:DISPATCH_QUEUE_HEADER
    uint32_t volatile dq_running; \ //是否運行中
	uint32_t dq_width; \ //最大併發數:主線程/串行中這個值爲1
	struct dispatch_object_s *volatile dq_items_tail; \ //鏈表尾節點
	struct dispatch_object_s *volatile dq_items_head; \ //鏈表頭節點
	unsigned long dq_serialnum; \ //隊列的序列號
	dispatch_queue_t dq_specific_q; //specific隊列
	
	//其餘:
    char dq_label[DISPATCH_QUEUE_MIN_LABEL_SIZE]; // must be last 說明隊列的名字要少於64個字符
    char _dq_pad[DISPATCH_QUEUE_CACHELINE_PAD]; // for static queues only
};
複製代碼

隊列的類型

隊列的類型能夠分爲主隊列管理隊列自定義隊列全局隊列4種類型。函數

主隊列

咱們在開發過程當中可使用dispatch_get_main_queue獲取主隊列,看一下它的定義:oop

#define dispatch_get_main_queue() (&_dispatch_main_q)

struct dispatch_queue_s _dispatch_main_q = {
#if !DISPATCH_USE_RESOLVERS
	.do_vtable = &_dispatch_queue_vtable,
	.do_targetq = &_dispatch_root_queues[
			DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
#endif
	.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
	.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
	.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
	.dq_label = "com.apple.main-thread",
	.dq_running = 1,
	.dq_width = 1, //說明主隊列是一個串行隊列
	.dq_serialnum = 1,
};
複製代碼

它的幾個主要屬性:優化

1.do_vtable

const struct dispatch_queue_vtable_s _dispatch_queue_vtable = {
	.do_type = DISPATCH_QUEUE_TYPE,
	.do_kind = "queue",
	.do_dispose = _dispatch_queue_dispose,
	.do_invoke = NULL,
	.do_probe = (void *)dummy_function_r0,
	.do_debug = dispatch_queue_debug,
};
複製代碼

2.do_targetq

主隊列的目標隊列:"com.apple.root.default-overcommit-priority"這個全局隊列。這裏咱們先提早總結一下:非全局隊列的隊列類型(主隊列以及後面提到的管理隊列和自定義隊列),都須要壓入到全局隊列處理,因此須要設置do_targetq

[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
		.do_vtable = &_dispatch_queue_root_vtable,
		.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
		.do_ctxt = &_dispatch_root_queue_contexts[
				DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],

		.dq_label = "com.apple.root.default-overcommit-priority",
		.dq_running = 2,
		.dq_width = UINT32_MAX,
		.dq_serialnum = 7,
	},
複製代碼

3.do_ref_cnt和do_xref_cnt

前面提到do_ref_cntdo_xref_cnt是引用計數,主隊列的這兩個值爲DISPATCH_OBJECT_GLOBAL_REFCNT。既然是引用計數,那想必是和GCD的內存管理有關,找到和內存管理相關的代碼:

void
dispatch_retain(dispatch_object_t dou)
{
	if (slowpath(dou._do->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
		return; // global object
	}
	
	...
}

void
_dispatch_retain(dispatch_object_t dou)
{
	if (slowpath(dou._do->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
		return; // global object
	}
	
	...
}

void
dispatch_release(dispatch_object_t dou)
{
	if (slowpath(dou._do->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
		return;
	}
	
	...
}

void
_dispatch_release(dispatch_object_t dou)
{
	if (slowpath(dou._do->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
		return; // global object
	}

    ...
}
複製代碼

從上面幾個函數咱們能夠看出,主隊列的生命週期是伴隨着應用的,不會受retain和release的影響

管理隊列

_dispatch_mgr_q(管理隊列),是GCD的內部隊列,不對外公開。從名字上看,這個隊列應該是用來扮演管理的角色,GCD定時器就用到了管理隊列。

struct dispatch_queue_s _dispatch_mgr_q = {
	.do_vtable = &_dispatch_queue_mgr_vtable,
	.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
	.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
	.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
	.do_targetq = &_dispatch_root_queues[
			DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],

	.dq_label = "com.apple.libdispatch-manager",
	.dq_width = 1,
	.dq_serialnum = 2,
};
複製代碼

1.do_vtable

static const struct dispatch_queue_vtable_s _dispatch_queue_mgr_vtable = {
	.do_type = DISPATCH_QUEUE_MGR_TYPE,
	.do_kind = "mgr-queue",
	.do_invoke = _dispatch_mgr_thread,
	.do_debug = dispatch_queue_debug,
	.do_probe = _dispatch_mgr_wakeup,
};
複製代碼

2.do_targetq

管理隊列的目標隊列:"com.apple.root.high-overcommit-priority"這個全局隊列。

[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
		.do_vtable = &_dispatch_queue_root_vtable,
		.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
		.do_ctxt = &_dispatch_root_queue_contexts[
				DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],

		.dq_label = "com.apple.root.high-overcommit-priority",
		.dq_running = 2,
		.dq_width = UINT32_MAX,
		.dq_serialnum = 9,
	},
複製代碼

3.do_ref_cnt和do_xref_cnt

管理隊列的這兩個值爲DISPATCH_OBJECT_GLOBAL_REFCNT,因此和主隊列的生命週期應該是同樣的。

自定義隊列

咱們在開發中會使用dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)建立一個自定義的隊列。它的源代碼以下:

dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
	dispatch_queue_t dq;
	size_t label_len;

	if (!label) {
		label = "";
	}

	label_len = strlen(label);
	if (label_len < (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1)) {
		label_len = (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1);
	}

	// XXX switch to malloc()
	dq = calloc(1ul, sizeof(struct dispatch_queue_s) -
			DISPATCH_QUEUE_MIN_LABEL_SIZE - DISPATCH_QUEUE_CACHELINE_PAD +
			label_len + 1);
	if (slowpath(!dq)) {
		return dq;
	}
//隊列初始化數據
	_dispatch_queue_init(dq);
	strcpy(dq->dq_label, label);

	if (fastpath(!attr)) {
		return dq;
	}
	if (fastpath(attr == DISPATCH_QUEUE_CONCURRENT)) {
		dq->dq_width = UINT32_MAX;
		dq->do_targetq = _dispatch_get_root_queue(0, false);
	} else {
		dispatch_debug_assert(!attr, "Invalid attribute");
	}
	return dq;
}
複製代碼

1.slowpath(x)fastpath(x)

關於這兩個宏的定義以下:

#define fastpath(x) ((typeof(x))__builtin_expect((long)(x), ~0l))
#define slowpath(x) ((typeof(x))__builtin_expect((long)(x), 0l))
複製代碼

fastpath(x)表示x的值通常不爲0,但願編譯器進行優化。slowpath(x)表示x的值極可能爲0,但願編譯器進行優化。

2._dispatch_queue_init

static inline void
_dispatch_queue_init(dispatch_queue_t dq)
{
	dq->do_vtable = &_dispatch_queue_vtable;
	dq->do_next = DISPATCH_OBJECT_LISTLESS;
	dq->do_ref_cnt = 1;
	dq->do_xref_cnt = 1;
	// Default target queue is overcommit!
	dq->do_targetq = _dispatch_get_root_queue(0, true);
	dq->dq_running = 0;
	dq->dq_width = 1;
	dq->dq_serialnum = dispatch_atomic_inc(&_dispatch_queue_serial_numbers) - 1;
}
複製代碼

_dispatch_queue_init默認設置一個隊列爲串行隊列,它的目標隊列是_dispatch_get_root_queue(0, true)

3.do_targetq

前面對這個字段的解釋有點簡單了,do_targetq表明目的隊列。在Concurrent Programming: APIs and Challenges提到:

While custom queues are a powerful abstraction, all blocks you schedule on them will ultimately trickle down to one of the system’s global queues and its thread pool(s).

雖然自定義隊列是一個強大的抽象,但你在隊列上安排的全部Block最終都會滲透到系統的某一個全局隊列及其線程池。

看起來自定義隊列更像是全局隊列的一個代理。在自定義隊列建立的時候默認其目標隊列爲_dispatch_get_root_queue(0, true)。其中0表明優先級DISPATCH_QUEUE_PRIORITY_DEFAULTtrue表明是不是overcommitovercommit參數表示該隊列在執行block時,不管系統多忙都會新開一個線程,這樣作的目的是不會形成某個線程過載。若是是自定義併發隊列的話,do_targetq會被設置爲_dispatch_get_root_queue(0, false)

值得注意的是,主隊列的目標隊列也是一個全局隊列,全局隊列的底層就是普通的線程池(這個會在全局隊列中講到)。

4.dq_serialnum

dq_serialnum是在_dispatch_queue_serial_numbers基礎上進行原子操做加1,即從12開始累加。1到11被保留的序列號定義以下:

// skip zero
// 1 - main_q
// 2 - mgr_q
// 3 - _unused_
// 4,5,6,7,8,9,10,11 - global queues
// we use 'xadd' on Intel, so the initial value == next assigned
複製代碼

其中1用於主隊列,2用於管理隊列,3暫時沒有被使用,4~11是用於全局隊列的。因爲我找錯了看的源碼是libdispatch-187.10很老了,後面蘋果有新增了幾個隊列。

全局隊列

上面說了不少全局隊列,如今咱們來看一下全局隊列是如何定義的。

dispatch_queue_t
dispatch_get_global_queue(long priority, unsigned long flags)
{
	if (flags & ~DISPATCH_QUEUE_OVERCOMMIT) {
		return NULL;
	}
	return _dispatch_get_root_queue(priority,
			flags & DISPATCH_QUEUE_OVERCOMMIT);
}

static inline dispatch_queue_t
_dispatch_get_root_queue(long priority, bool overcommit)
{
	if (overcommit) switch (priority) {
	case DISPATCH_QUEUE_PRIORITY_LOW:
		return &_dispatch_root_queues[
				DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY];
	case DISPATCH_QUEUE_PRIORITY_DEFAULT:
		return &_dispatch_root_queues[
				DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY];
	case DISPATCH_QUEUE_PRIORITY_HIGH:
		return &_dispatch_root_queues[
				DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY];
	case DISPATCH_QUEUE_PRIORITY_BACKGROUND:
		return &_dispatch_root_queues[
				DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY];
	}
	switch (priority) {
	case DISPATCH_QUEUE_PRIORITY_LOW:
		return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY];
	case DISPATCH_QUEUE_PRIORITY_DEFAULT:
		return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY];
	case DISPATCH_QUEUE_PRIORITY_HIGH:
		return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY];
	case DISPATCH_QUEUE_PRIORITY_BACKGROUND:
		return &_dispatch_root_queues[
				DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY];
	default:
		return NULL;
	}
}

DISPATCH_CACHELINE_ALIGN
struct dispatch_queue_s _dispatch_root_queues[] = {
	[DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {
		.do_vtable = &_dispatch_queue_root_vtable,
		.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
		.do_ctxt = &_dispatch_root_queue_contexts[
				DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY],

		.dq_label = "com.apple.root.low-priority",
		.dq_running = 2,
		.dq_width = UINT32_MAX,
		.dq_serialnum = 4,
	},
	[DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {
		.do_vtable = &_dispatch_queue_root_vtable,
		.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
		.do_ctxt = &_dispatch_root_queue_contexts[
				DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY],

		.dq_label = "com.apple.root.low-overcommit-priority",
		.dq_running = 2,
		.dq_width = UINT32_MAX,
		.dq_serialnum = 5,
	},
	[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {
		.do_vtable = &_dispatch_queue_root_vtable,
		.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
		.do_ctxt = &_dispatch_root_queue_contexts[
				DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY],

		.dq_label = "com.apple.root.default-priority",
		.dq_running = 2,
		.dq_width = UINT32_MAX,
		.dq_serialnum = 6,
	},
	[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
		.do_vtable = &_dispatch_queue_root_vtable,
		.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
		.do_ctxt = &_dispatch_root_queue_contexts[
				DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],

		.dq_label = "com.apple.root.default-overcommit-priority",
		.dq_running = 2,
		.dq_width = UINT32_MAX,
		.dq_serialnum = 7,
	},
	[DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {
		.do_vtable = &_dispatch_queue_root_vtable,
		.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
		.do_ctxt = &_dispatch_root_queue_contexts[
				DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY],

		.dq_label = "com.apple.root.high-priority",
		.dq_running = 2,
		.dq_width = UINT32_MAX,
		.dq_serialnum = 8,
	},
	[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
		.do_vtable = &_dispatch_queue_root_vtable,
		.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
		.do_ctxt = &_dispatch_root_queue_contexts[
				DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],

		.dq_label = "com.apple.root.high-overcommit-priority",
		.dq_running = 2,
		.dq_width = UINT32_MAX,
		.dq_serialnum = 9,
	},
	[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {
		.do_vtable = &_dispatch_queue_root_vtable,
		.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
		.do_ctxt = &_dispatch_root_queue_contexts[
				DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY],

		.dq_label = "com.apple.root.background-priority",
		.dq_running = 2,
		.dq_width = UINT32_MAX,
		.dq_serialnum = 10,
	},
	[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {
		.do_vtable = &_dispatch_queue_root_vtable,
		.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
		.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
		.do_ctxt = &_dispatch_root_queue_contexts[
				DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY],

		.dq_label = "com.apple.root.background-overcommit-priority",
		.dq_running = 2,
		.dq_width = UINT32_MAX,
		.dq_serialnum = 11,
	},
};
複製代碼

1.do_vtable

全局隊列的do_vtable

static const struct dispatch_queue_vtable_s _dispatch_queue_root_vtable = {
	.do_type = DISPATCH_QUEUE_GLOBAL_TYPE,
	.do_kind = "global-queue",
	.do_debug = dispatch_queue_debug,
	.do_probe = _dispatch_queue_wakeup_global,
};
複製代碼

2.do_ctxt

全局隊列中有一個上下文的屬性,用來存儲線程池相關數據,好比用於線程掛起和喚醒的信號量、線程池尺寸等。

它的定義以下:

static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
	[DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
		.dgq_thread_mediator = &_dispatch_thread_mediator[
				DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY],
		.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
	},
	[DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
		.dgq_thread_mediator = &_dispatch_thread_mediator[
				DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY],
		.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
	},
	[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
		.dgq_thread_mediator = &_dispatch_thread_mediator[
				DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY],
		.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
	},
	[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
		.dgq_thread_mediator = &_dispatch_thread_mediator[
				DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
		.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
	},
	[DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
		.dgq_thread_mediator = &_dispatch_thread_mediator[
				DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY],
		.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
	},
	[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
		.dgq_thread_mediator = &_dispatch_thread_mediator[
				DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
		.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
	},
	[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
		.dgq_thread_mediator = &_dispatch_thread_mediator[
				DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY],
		.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
	},
	[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
		.dgq_thread_mediator = &_dispatch_thread_mediator[
				DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY],
		.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
	},
};
複製代碼

隊列的同步

dispatch_sync

dispatch_sync的源碼以下:

void
dispatch_sync(dispatch_queue_t dq, void (^work)(void))
{
#if DISPATCH_COCOA_COMPAT
	if (slowpath(dq == &_dispatch_main_q)) {
		return _dispatch_sync_slow(dq, work);
	}
#endif
	struct Block_basic *bb = (void *)work;
	dispatch_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
}
複製代碼

若是這個隊列是主隊列,則調用_dispatch_sync_slow,不然調用dispatch_sync_f。點開_dispatch_sync_slow返現,最終仍是調用了dispatch_sync_f方法。經過_dispatch_Block_copy或者Block_basic完成由block到function的轉換。因此block的執行底層仍是使用function

dispatch_sync_f

dispatch_sync_f的源碼以下:

void
dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
	//串行隊列
	if (fastpath(dq->dq_width == 1)) {
		return dispatch_barrier_sync_f(dq, ctxt, func);
	}
	//全局隊列
	if (slowpath(!dq->do_targetq)) {
		// the global root queues do not need strict ordering
		(void)dispatch_atomic_add2o(dq, dq_running, 2);
		return _dispatch_sync_f_invoke(dq, ctxt, func);
	}
	//併發隊列
	_dispatch_sync_f2(dq, ctxt, func);
}
複製代碼

這裏分紅了三種狀況:

  1. 若是是串行隊列,執行dispatch_barrier_sync_f
  2. 若是是全局隊列,執行_dispatch_sync_f_invoke
  3. 若是是並行隊列,執行_dispatch_sync_f2

dispatch_barrier_sync_f

若是是串行隊列壓入同步任務,那麼當前任務就必須等待前面的任務執行完成後才能執行。源代碼就會調用dispatch_barrier_sync_f函數完成上面的效果。

void
dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func)
{
	// 1) ensure that this thread hasn't enqueued anything ahead of this call // 2) the queue is not suspended //第一步:若是串行隊列中存在其餘任務或者隊列被掛起,則直接進入_dispatch_sync_f_slow函數,等待這個隊列中的其餘任務完成(信號量的方式),而後執行這個任務。 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){ return _dispatch_barrier_sync_f_slow(dq, ctxt, func); } //第二步:檢測隊列的dq_running狀態,若是有運行,進入_dispatch_barrier_sync_f_slow,等待激活。 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1))) { // global queues and main queue bound to main thread always falls into // the slow case return _dispatch_barrier_sync_f_slow(dq, ctxt, func); } //第三步:有多重隊列,尋找真正的目標隊列,其實仍是回到了dispatch_sync_f方法 if (slowpath(dq->do_targetq->do_targetq)) { return _dispatch_barrier_sync_f_recurse(dq, ctxt, func); } //第四步:執行隊列裏的任務,執行後檢測隊列有無其餘任務,若是有,釋放前面的信號量(釋放信號_dispatch_barrier_sync_f2函數中)。 _dispatch_barrier_sync_f_invoke(dq, ctxt, func); } 複製代碼

看了上面的代碼註釋後,咱們來想一下同步串行隊列死鎖問題。死鎖是怎麼產生的?先看下示例代碼:

#import "DeadLock.h"

@implementation DeadLock

- (instancetype)init {
    if (self = [super init]) {
//        [self _mianQueueDeadLock];
        [self _serialQueueDeadLock];
    }
    
    return self;
}

#pragma mark - Private

- (void)_mianQueueDeadLock {
    dispatch_sync(dispatch_get_main_queue(), ^(void){
        NSLog(@"這裏死鎖了");
    });
}

- (void)_serialQueueDeadLock {
    dispatch_queue_t queue1 = dispatch_queue_create("1serialQueue", DISPATCH_QUEUE_SERIAL);
    dispatch_queue_t queue2 = dispatch_queue_create("2serialQueue", DISPATCH_QUEUE_SERIAL);
    
    dispatch_sync(queue1, ^{
        NSLog(@"11111");
        
        dispatch_sync(queue1, ^{//若是使用queue2就不會發生死鎖,使用queue1就會死鎖
            NSLog(@"22222");
        });
    });
}

@end
複製代碼

_serialQueueDeadLock爲例:當第一次執行串行隊列任務的時候,跳到第四步,直接開始執行任務,在運行第二個dispatch_sync時候,在任務裏面經過執行第一步(隊列在運行)向這個同步隊列中壓入信號量,而後等待信號量,進入死鎖。若是主隊列則會跳轉到第二步進入死鎖。

_dispatch_sync_f_invoke

static void
_dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func)
{
	_dispatch_function_invoke(dq, ctxt, func);
	if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) == 0)) {
		_dispatch_wakeup(dq);
	}
}
複製代碼

若是當前隊列是全局隊列的話,就會調用_dispatch_sync_f_invoke。這個函數的做用:執行傳入的任務,而後根據dq_running檢測任務隊列有沒有激活,沒有激活就執行激活函數。關於激活函數_dispatch_wakeup(dq)放在隊列的異步中講解。

_dispatch_sync_f2

_dispatch_sync_f2(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
	// 1) ensure that this thread hasn't enqueued anything ahead of this call // 2) the queue is not suspended //第一步:併發隊列中有其餘任務或者隊列被掛起,壓入信號量,等待其餘線程釋放這個信號量 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){ return _dispatch_sync_f_slow(dq, ctxt, func); } //第二步:並行隊列沒激活,激活隊列後執行任務,最終仍是調用了_dispatch_sync_f_slow函數,只是多了一個激活函數 if (slowpath(dispatch_atomic_add2o(dq, dq_running, 2) & 1)) { return _dispatch_sync_f_slow2(dq, ctxt, func); } //第三步:隊列有多重隊列,尋找真正的目標隊列 if (slowpath(dq->do_targetq->do_targetq)) { return _dispatch_sync_f_recurse(dq, ctxt, func); } //第四步:並行隊列沒有其餘任務,調用並激活這個隊列 _dispatch_sync_f_invoke(dq, ctxt, func); } 複製代碼

經過上面的註釋,並行隊列同步執行是順序執行的。這種順序執行和操做隊列爲併發隊列沒有關係。而是由於這些操做均爲同步操做,因此每個操做放入隊列後都會被等待執行完成纔會放入下一操做,形成了這種順序執行的現象。

如今咱們整理一下隊列同步執行的流程,以下圖:

dispatch_sync

隊列的異步

說完了同步咱們如今看一下異步。咱們使用dispatch_async進行隊列的異步執行。

dispatch_async

dispatch_async的源碼以下:

void
dispatch_async(dispatch_queue_t dq, void (^work)(void))
{
	dispatch_async_f(dq, _dispatch_Block_copy(work),
			_dispatch_call_block_and_release);
}
複製代碼

dispatch_async主要將block從棧copy到堆上,或者增長引用計數,保證block在執行以前不會被銷燬,另外_dispatch_call_block_and_release用於銷燬block。而後調用dispatch_async_f

dispatch_async_f

void
dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
	dispatch_continuation_t dc;

	// No fastpath/slowpath hint because we simply don't know //串行隊列,執行dispatch_barrier_async_f,其實最後仍是執行任務入隊的操做 if (dq->dq_width == 1) { return dispatch_barrier_async_f(dq, ctxt, func); } //從線程私有數據中獲取一個dispatch_continuation_t的結構體 dc = fastpath(_dispatch_continuation_alloc_cacheonly()); if (!dc) { return _dispatch_async_f_slow(dq, ctxt, func); } dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT; dc->dc_func = func; dc->dc_ctxt = ctxt; // No fastpath/slowpath hint because we simply don't know
	//有目標隊列,調用_dispatch_async_f2函數進行轉發。
	if (dq->do_targetq) {
		return _dispatch_async_f2(dq, dc);
	}

	//全局隊列直接進行入隊操做
	_dispatch_queue_push(dq, dc);
}
複製代碼

從上面的源代碼中咱們能夠看出dispatch_async_f大體分爲三種狀況:

  1. 若是是串行隊列,調用dispatch_barrier_async_f
  2. 其餘隊列且有目標隊列,調用_dispatch_async_f2
  3. 若是是全局隊列的話,直接進行入隊操做。

雖然上面分三種狀況,可是歸根到底,它們最後執行都是_dispatch_queue_push來進行入隊的操做。

這裏有一點須要注意下:就是dispatch_continuation_tdo_vtable的賦值狀況。

//串行隊列,barrier
dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);

//not barrier
dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
複製代碼

libdispatch所有標識符有四種:

#define DISPATCH_OBJ_ASYNC_BIT 0x1 //異步
#define DISPATCH_OBJ_BARRIER_BIT 0x2 //阻塞
#define DISPATCH_OBJ_GROUP_BIT 0x4 //組
#define DISPATCH_OBJ_SYNC_SLOW_BIT 0x8 //同步慢
複製代碼

從上面咱們能夠知道串行隊列在異步執行的時候,經過DISPATCH_OBJ_BARRIER_BIT這個標識符實現阻塞等待的。

接着咱們分析下dispatch_barrier_async_f_dispatch_async_f2這兩個函數。

dispatch_barrier_async_f

dispatch_barrier_async_f的源碼以下:

void
dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func)
{
	dispatch_continuation_t dc;

	//從線程私有數據中獲取一個dispatch_continuation_t的結構體,dispatch_continuation_t中封裝了異步執行任務。
	dc = fastpath(_dispatch_continuation_alloc_cacheonly());
	if (!dc) {
		//return _dispatch_barrier_async_f_slow(dq, ctxt, func);
		//如下是_dispatch_barrier_async_f_slow的具體實現
		//若是沒有則從堆上獲取一個dispatch_continuation_t的結構體
		dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap();
		
		//經過do_vtable區分類型
		dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
		//將_dispatch_call_block_and_release做爲func方法
		dc->dc_func = func;
		//將傳入的block做爲上下文
		dc->dc_ctxt = ctxt;
		//入隊操做
		_dispatch_queue_push(dq, dc);
	}

	dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
	dc->dc_func = func;
	dc->dc_ctxt = ctxt;

	_dispatch_queue_push(dq, dc);
}
複製代碼

_dispatch_queue_push

_dispatch_queue_push是一個宏定義,它最後會變成執行_dispatch_queue_push_list函數。

#define _dispatch_queue_push(x, y) _dispatch_queue_push_list((x), (y), (y))

#define _dispatch_queue_push_list _dispatch_trace_queue_push_list

static inline void
_dispatch_trace_queue_push_list(dispatch_queue_t dq, dispatch_object_t _head,
		dispatch_object_t _tail)
{
	if (slowpath(DISPATCH_QUEUE_PUSH_ENABLED())) {
		struct dispatch_object_s *dou = _head._do;
		do {
		  //主要是對dispatch_continuation_s結構體的處理,確保後面的使用。
			_dispatch_trace_continuation(dq, dou, DISPATCH_QUEUE_PUSH);
		} while (dou != _tail._do && (dou = dou->do_next));
	}
	_dispatch_queue_push_list(dq, _head, _tail);
}

static inline void
_dispatch_queue_push_list(dispatch_queue_t dq, dispatch_object_t _head,
		dispatch_object_t _tail)
{
	struct dispatch_object_s *prev, *head = _head._do, *tail = _tail._do;

	tail->do_next = NULL;
	dispatch_atomic_store_barrier();
	//dispatch_atomic_xchg2o實質是調用((typeof(*(p)))__sync_swap((p), (n))),它的定義是將p設爲n並返回p操做以前的值。
	//dispatch_atomic_xchg2o(dq, dq_items_tail, tail)至關於dq->dq_items_tail = tail,從新設置了隊列的尾指針
	prev = fastpath(dispatch_atomic_xchg2o(dq, dq_items_tail, tail));
	if (prev) {
		// if we crash here with a value less than 0x1000, then we are at a
		// known bug in client code for example, see _dispatch_queue_dispose
		// or _dispatch_atfork_child
		//prev是原先的隊尾,若是隊列中有其餘的元素,就將壓入的對象加在隊列的尾部。
		prev->do_next = head;
	} else {
	   //若是隊列爲空
		_dispatch_queue_push_list_slow(dq, head);
	}
}
複製代碼

_dispatch_queue_push_list_slow

若是隊列爲空,調用_dispatch_queue_push_list_slow方法。

_dispatch_queue_push_list_slow(dispatch_queue_t dq,
		struct dispatch_object_s *obj)
{
	//dq->dq_items_head設置爲dc,而後喚醒這個隊列。由於此時隊列爲空,沒有任務在執行,處於休眠狀態,因此須要喚醒
	_dispatch_retain(dq);
	dq->dq_items_head = obj;
	_dispatch_wakeup(dq);
	_dispatch_release(dq);
}
複製代碼

_dispatch_wakeup

不管是同步仍是異步中都調用了_dispatch_wakeup,這個函數的做用就是喚醒當前隊列。

_dispatch_wakeup的源碼:

dispatch_queue_t
_dispatch_wakeup(dispatch_object_t dou)
{
	dispatch_queue_t tq;

	if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou._do))) {
		return NULL;
	}
	//這裏比較隱晦,這裏實際上是走全局隊列的喚醒邏輯調用_dispatch_queue_wakeup_global,若是喚醒失敗且對尾指針爲空,返回NULL;若是是管理隊列的的話,則執行_dispatch_mgr_wakeup函數
	if (!dx_probe(dou._do) && !dou._dq->dq_items_tail) {
		return NULL;
	}

	// _dispatch_source_invoke() relies on this testing the whole suspend count
	// word, not just the lock bit. In other words, no point taking the lock
	// if the source is suspended or canceled.
	if (!dispatch_atomic_cmpxchg2o(dou._do, do_suspend_cnt, 0,
			DISPATCH_OBJECT_SUSPEND_LOCK)) {
#if DISPATCH_COCOA_COMPAT
		if (dou._dq == &_dispatch_main_q) {
			//傳入主隊列,會進入到 _dispatch_queue_wakeup_main() 函數中
			_dispatch_queue_wakeup_main();
		}
#endif
		return NULL;
	}
	_dispatch_retain(dou._do);
	//有目標隊列,繼續向目標隊列壓入這個隊列
	tq = dou._do->do_targetq;
	_dispatch_queue_push(tq, dou._do);
	return tq;	// libdispatch does not need this, but the Instrument DTrace
				// probe does
}
複製代碼

從上面的代碼能夠看出_dispatch_wakeup分爲四種狀況:

  1. 主隊列調用_dispatch_queue_wakeup_main()
  2. 全局隊列調用_dispatch_queue_wakeup_global
  3. 其餘隊列像目標隊列壓入這個隊列,繼續作入隊操做。
  4. 管理隊列調用_dispatch_mgr_wakeup,這裏主要是爲了dispatch_source而服務的。

_dispatch_queue_wakeup_main

void
_dispatch_queue_wakeup_main(void)
{
	kern_return_t kr;

	dispatch_once_f(&_dispatch_main_q_port_pred, NULL,
			_dispatch_main_q_port_init);
	//喚醒主線程,這裏已經點不進去了,關於主線的喚醒主要靠mach_port和在runloop中註冊相對應的source1
	kr = _dispatch_send_wakeup_main_thread(main_q_port, 0);

	switch (kr) {
	case MACH_SEND_TIMEOUT:
	case MACH_SEND_TIMED_OUT:
	case MACH_SEND_INVALID_DEST:
		break;
	default:
		(void)dispatch_assume_zero(kr);
		break;
	}

	_dispatch_safe_fork = false;
}
複製代碼

_dispatch_queue_wakeup_global

上面提到dx_probe(dou._do)這裏走的是全局隊列的喚醒。前面提到全局隊列的do_vtable

static const struct dispatch_queue_vtable_s _dispatch_queue_root_vtable = {
    .do_type = DISPATCH_QUEUE_GLOBAL_TYPE,
    .do_kind = "global-queue",
    .do_debug = dispatch_queue_debug,
    .do_probe = _dispatch_queue_wakeup_global,
};
複製代碼

_dispatch_queue_wakeup_global的源碼:

static bool
_dispatch_queue_wakeup_global(dispatch_queue_t dq)
{
	static dispatch_once_t pred;
	struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
	int r;

	if (!dq->dq_items_tail) {
		return false;
	}

	_dispatch_safe_fork = false;

	dispatch_debug_queue(dq, __PRETTY_FUNCTION__);

	dispatch_once_f(&pred, NULL, _dispatch_root_queues_init);

#if HAVE_PTHREAD_WORKQUEUES
#if DISPATCH_ENABLE_THREAD_POOL
	//隊列上下文的dgq_kworkqueue存在,則調用pthread_workqueue_additem_np函數,該函數使用workq_kernreturn系統調用,通知workqueue增長應當執行的項目。根據該通知,XNU內核基於系統狀態判斷是否要生成線程,若是是overcommit優先級的隊列,workqueue則始終生成線程,以後線程執行_dispatch_worker_thread2函數。
	//工做隊列,是一個用於建立內核線程的接口,經過它建立的內核線程來執行內核其餘模塊排列到隊列裏的工做。不一樣優先級的dispatch queue對應着對應優先級的workqueue。GCD初始化的時候,使用pthread_workqueue_create_np建立pthread_workqueue
	if (qc->dgq_kworkqueue)
#endif
	{
		if (dispatch_atomic_cmpxchg2o(qc, dgq_pending, 0, 1)) {
			pthread_workitem_handle_t wh;
			unsigned int gen_cnt;
			_dispatch_debug("requesting new worker thread");

			r = pthread_workqueue_additem_np(qc->dgq_kworkqueue,
					_dispatch_worker_thread2, dq, &wh, &gen_cnt);
			(void)dispatch_assume_zero(r);
		} else {
			_dispatch_debug("work thread request still pending on global "
					"queue: %p", dq);
		}
		goto out;
	}
#endif // HAVE_PTHREAD_WORKQUEUES
#if DISPATCH_ENABLE_THREAD_POOL
	//經過發送一個信號量使線程保活
	if (dispatch_semaphore_signal(qc->dgq_thread_mediator)) {
		goto out;
	}

	pthread_t pthr;
	int t_count;
	do {
		t_count = qc->dgq_thread_pool_size;
		if (!t_count) {
			_dispatch_debug("The thread pool is full: %p", dq);
			goto out;
		}
	} while (!dispatch_atomic_cmpxchg2o(qc, dgq_thread_pool_size, t_count,
			t_count - 1));//若是線程池可用則減1

	//這裏說明線程池不夠用了,使用pthread建立一個線程,並執行_dispatch_worker_thread,_dispatch_worker_thread最終會調用到_dispatch_worker_thread2
	while ((r = pthread_create(&pthr, NULL, _dispatch_worker_thread, dq))) {
		if (r != EAGAIN) {
			(void)dispatch_assume_zero(r);
		}
		sleep(1);
	}
	
	//保證pthr能被自動回收掉
	r = pthread_detach(pthr);
	(void)dispatch_assume_zero(r);
#endif // DISPATCH_ENABLE_THREAD_POOL

out:
	return false;
}
複製代碼

_dispatch_worker_thread2

_dispatch_worker_thread2的代碼以下:

_dispatch_worker_thread2(void *context)
{
	struct dispatch_object_s *item;
	dispatch_queue_t dq = context;
	struct dispatch_root_queue_context_s *qc = dq->do_ctxt;


	if (_dispatch_thread_getspecific(dispatch_queue_key)) {
		DISPATCH_CRASH("Premature thread recycling");
	}

	//把dq設置爲剛啓動的這個線程的TSD
	_dispatch_thread_setspecific(dispatch_queue_key, dq);
	qc->dgq_pending = 0;

#if DISPATCH_COCOA_COMPAT
	(void)dispatch_atomic_inc(&_dispatch_worker_threads);
	// ensure that high-level memory management techniques do not leak/crash
	if (dispatch_begin_thread_4GC) {
		dispatch_begin_thread_4GC();
	}
	void *pool = _dispatch_begin_NSAutoReleasePool();
#endif

#if DISPATCH_PERF_MON
	uint64_t start = _dispatch_absolute_time();
#endif
	//_dispatch_queue_concurrent_drain_one用來取出隊列的一個內容
	while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) {
		// 用來對取出的內容進行處理(若是是任務,則執行任務)
		_dispatch_continuation_pop(item);
	}
#if DISPATCH_PERF_MON
	_dispatch_queue_merge_stats(start);
#endif

#if DISPATCH_COCOA_COMPAT
	_dispatch_end_NSAutoReleasePool(pool);
	dispatch_end_thread_4GC();
	if (!dispatch_atomic_dec(&_dispatch_worker_threads) &&
			dispatch_no_worker_threads_4GC) {
		dispatch_no_worker_threads_4GC();
	}
#endif

	_dispatch_thread_setspecific(dispatch_queue_key, NULL);

	_dispatch_force_cache_cleanup();

}
複製代碼

這裏有兩個比較重要的方法:

  1. _dispatch_queue_concurrent_drain_one這個方法用來取出隊列中的一個內容。
  2. _dispatch_continuation_pop這個方法用來處理取出的內容

_dispatch_queue_concurrent_drain_one

struct dispatch_object_s *
_dispatch_queue_concurrent_drain_one(dispatch_queue_t dq)
{
    struct dispatch_object_s *head, *next, *const mediator = (void *)~0ul;

    // The mediator value acts both as a "lock" and a signal
    head = dispatch_atomic_xchg(&dq->dq_items_head, mediator);

    if (slowpath(head == NULL)) {
        //隊列是空的
        dispatch_atomic_cmpxchg(&dq->dq_items_head, mediator, NULL);
        _dispatch_debug("no work on global work queue");
        return NULL;
    }

    
    if (slowpath(head == mediator)) {
        // 該線程在現線程競爭中失去了對隊列的擁有權,這意味着libdispatch的效率很糟糕,
        // 這種狀況意味着在線程池中有太多的線程,這個時候應該建立一個pengding線程,
        // 而後退出該線程,內核會在負載減弱的時候建立一個新的線程
        _dispatch_queue_wakeup_global(dq);
        return NULL;
    }

    // 在返回以前將head指針的do_next保存下來,若是next爲NULL,這意味着item是最後一個
    next = fastpath(head->do_next);

    if (slowpath(!next)) {
        dq->dq_items_head = NULL;
        
        if (dispatch_atomic_cmpxchg(&dq->dq_items_tail, head, NULL)) {
            // head 和 tail頭尾指針均爲空
            goto out;
        }

        // 此時必定有item,該線程不會等待過久.
        while (!(next = head->do_next)) {
            _dispatch_hardware_pause();
        }
    }

        // 繼續調度
    dq->dq_items_head = next;
    _dispatch_queue_wakeup_global(dq);
out:
    // 返回隊列的頭指針
    return head;
}
複製代碼

_dispatch_continuation_pop

static inline void
_dispatch_continuation_pop(dispatch_object_t dou)
{
	dispatch_continuation_t dc = dou._dc;
	dispatch_group_t dg;

	_dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dou);
	//檢測是否是隊列,若是是,就進入_dispatch_queue_invoke 處理隊列
	if (DISPATCH_OBJ_IS_VTABLE(dou._do)) {
		return _dispatch_queue_invoke(dou._dq);
	}

	// Add the item back to the cache before calling the function. This
	// allows the 'hot' continuation to be used for a quick callback.
	//
	// The ccache version is per-thread.
	// Therefore, the object has not been reused yet.
	// This generates better assembly.
	if ((long)dc->do_vtable & DISPATCH_OBJ_ASYNC_BIT) {
		_dispatch_continuation_free(dc);
	}
	//判斷是不是group
	if ((long)dc->do_vtable & DISPATCH_OBJ_GROUP_BIT) {
		dg = dc->dc_group;
	} else {
		dg = NULL;
	}
	//是任務封裝的 dispatch_continuation_t 結構體,直接執行任務。
	//到這裏咱們知道了隊列執行的時候,block被調用的時機
	_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
	if (dg) {
		//group須要進行調用dispatch_group_leave並釋放信號
		dispatch_group_leave(dg);
		_dispatch_release(dg);
	}
}
複製代碼

從上面的函數中能夠發現,壓入隊列的不只是任務,還有多是隊列。若是是隊列,直接執行了_dispatch_queue_invoke,不然執行dc->dc_func(dc->dc_ctxt)

_dispatch_queue_invoke

void
_dispatch_queue_invoke(dispatch_queue_t dq)
{
	if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) &&
			fastpath(dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1))) {
		dispatch_atomic_acquire_barrier();
		dispatch_queue_t otq = dq->do_targetq, tq = NULL;
		_dispatch_queue_drain(dq);
		if (dq->do_vtable->do_invoke) {
			// Assume that object invoke checks it is executing on correct queue
			tq = dx_invoke(dq);
		} else if (slowpath(otq != dq->do_targetq)) {
			// An item on the queue changed the target queue
			tq = dq->do_targetq;
		}
		// We do not need to check the result.
		// When the suspend-count lock is dropped, then the check will happen.
		dispatch_atomic_release_barrier();
		//dq_running減1,由於任務要麼被直接執行了,要麼被壓到target隊列了
		(void)dispatch_atomic_dec2o(dq, dq_running);
		if (tq) {
			return _dispatch_queue_push(tq, dq);
		}
	}

	dq->do_next = DISPATCH_OBJECT_LISTLESS;
	if (!dispatch_atomic_sub2o(dq, do_suspend_cnt,
			DISPATCH_OBJECT_SUSPEND_LOCK)) {
		//隊列處於空閒狀態,須要喚醒
		if (dq->dq_running == 0) {
			_dispatch_wakeup(dq); // verify that the queue is idle
		}
	}
	//釋放隊列
	_dispatch_release(dq); // added when the queue is put on the list
}
複製代碼

如今咱們整理一下隊列異步執行的流程,以下圖:

dispatch_async

總結

  1. dispatch_queue經過結構體和鏈表,被實現爲FIFO(先進先出)隊列。不管串行隊列和併發隊列,都是符合FIFO的原則。二者的主要區別是:執行順序不一樣,以及開啓線程數不一樣。

  2. dispatch_sync函數通常都在當前線程執行,利用與線程綁定的信號量來實現串行。

  3. Block並非直接添加到隊列上,而是先構成一個dispatch_continuation結構體。結構體包含了這個Block還有一些上下文信息。隊列會將這些dispatch_continuation結構體添加隊列的鏈表中。不管這些隊列是什麼類型的,最終都是和全局隊列相關的。在全局隊列執行Block的時候,libdispatch從全局隊列中取出dispatch_continuation,調用pthread_workqueue_additem_np函數,將該全局隊列自身、符合其優先級的workqueue信息以及dispatch_continuation結構體的回調函數傳遞給參數。pthread_workqueue_additem_np函數使用workq_kernreturn系統調用,通知workqueue增長應當執行的項目。根據該同志,XNU內核基於系統狀態判斷是否要生成線程。若是是overcommit優先級的全局隊列workqueue則會始終生成線程。workqueue的線程執行pthread_workqueue函數,該函數調用libdispatch的回調函數。在該函數中執行加入到dispatch_continuation的Block

  4. dispatch_async分發到主隊列的任務由Runloop處理,而分發到其餘隊列的任務由線程池處理。

  5. GCD死鎖是隊列致使的而不是線程致使,緣由是_dispatch_barrier_sync_f_slow函數中使用了線程對應的信號量而且調用wait方法,從而致使線程死鎖。

  6. dispatch_barrier_async適用的場景隊列必須是用DISPATCH_QUEUE_CONCURRENT屬性建立的隊列,而使用全局併發隊列的時候,其表現就和dispatch_async同樣。緣由:dispatch_barrier_async若是傳入的是全局隊列,在喚醒隊列時會執行_dispatch_queue_wakeup_global函數,其執行效果同dispatch_async一致,而若是是自定義的隊列的時候,_dispatch_continuation_pop中會執行dispatch_queue_invoke。在while循環中依次取出任務並調用_dispatch_continuation_redirect函數,使得block併發執行。當遇到DISPATCH_OBJ_BARRIER_BIT標記時,會修改do_suspend_cnt標誌以保證後續while循環時直接goto out。barrier block的任務執行完以後_dispatch_queue_class_invoke會將do_suspend_cnt重置回去,因此barrier block以後的任務會繼續執行。

  7. 隊列操做和線程開啓的關係:dispatch_sync添加任務到隊列,不會建立新的線程。全部任務都是在當前線程中處理的。dispatch_async添加任務到隊列分爲三種狀況:主隊列不建立線程,在主線程中串行執行;全局隊列和自定義並行隊列根據任務系統決定開闢線程個數;自定義串行隊列建立一個線程,串行進行。

相關文章
相關標籤/搜索