iOS GCD源碼淺析

1、GCD簡介

什麼是 GCDGCD(Grand Central Dispatch) 是異步執行任務的技術之一。咱們只須要將定義的任務追加到適當的 Dispatch Queue 中,GCD 就能幫咱們生成必要的線程並執行咱們的任務並且不須要編寫任何線程管理代碼。所以使用 GCDvery easy 的,首先建立隊列,而後將任務添加到隊列,最後指定執行任務函數,搞定收工。算法

// 建立同步隊列
dispatch_queue_t queue = dispatch_queue_create("cc", DISPATCH_QUEUE_SERIAL);
// 添加任務
dispatch_block_t block = ^{
    NSLog(@"hello word");
};
// 指定執行任務函數,依賴隊列執行
dispatch_async(queue, block);
複製代碼

可是 GCD 又是否真的如此簡單呢?有興趣的話就一塊兒去 GCD 的源碼看看唄,看看又不花錢。swift

2、開啓 GCD 的大門

首先,建立兩個常見的隊列,串行隊列和併發隊列,而後分別 po 一下里面都有什麼東東,再順便把主隊列和全局隊列分別一塊兒給看了,先對隊列裏面的東西有一丟丟的印象就能夠了。數組

貌似發現點了什麼,主隊列的 width 和串行隊列的 width 是同樣的,其餘的也有不少相同的,難怪說主隊列是特殊的串行隊列。可是併發隊列和全局隊列的 width 卻相差 1,有點奇怪。接下來咱們就到源碼中一探究竟吧。在此附上 libdispatch源碼連接,但願你們能一塊兒開開心心看源碼,本文使用的是 libdispatch-1008.220.2 版本。

1. dispatch_queue_create 源碼分析

來到源碼以後,固然是全局搜索快速定位 dispatch_queue_create 位置,看它到底在裏面作了什麼壞事,而後發現隊列是經過 _dispatch_lane_create_with_target 建立的,常規接口隔離操做。_dispatch_lane_create_with_target 第一句代碼就是將咱們的外界傳進來區分串行仍是併發的參數傳進去建立了一個 dispatch_queue_attr_info_t 類型的結構體,有鬼。bash

dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
複製代碼

一查 dispatch_queue_attr_info_t 是一個結構體位域,結構體位域能夠經過一些位運算取出咱們想要的內容,過濾掉咱們不想要的數據。併發

typedef struct dispatch_queue_attr_info_s {
	dispatch_qos_t dqai_qos : 8;
	int      dqai_relpri : 8;
	uint16_t dqai_overcommit:2;
	uint16_t dqai_autorelease_frequency:2;
	uint16_t dqai_concurrent:1;
	uint16_t dqai_inactive:1;
} dispatch_queue_attr_info_t;
複製代碼

接下來咱們來看一下是怎麼建立這個結構體的吧。app

dispatch_queue_attr_info_t
_dispatch_queue_attr_to_info(dispatch_queue_attr_t dqa)
{
    dispatch_queue_attr_info_t dqai = { };
    // 串行隊列直接返回空的 dqai 結構體
    if (!dqa) return dqai;
    
    #if DISPATCH_VARIANT_STATIC
    if (dqa == &_dispatch_queue_attr_concurrent) {
    	dqai.dqai_concurrent = true;
    	return dqai;
    }
    #endif
    if (dqa < _dispatch_queue_attrs ||
    		dqa >= &_dispatch_queue_attrs[DISPATCH_QUEUE_ATTR_COUNT]) {
    	DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
    }
    
    // 蘋果的算法
    size_t idx = (size_t)(dqa - _dispatch_queue_attrs);
    
    // 併發隊列結構體位域的默認配置和賦值
    dqai.dqai_inactive = (idx % DISPATCH_QUEUE_ATTR_INACTIVE_COUNT);
    idx /= DISPATCH_QUEUE_ATTR_INACTIVE_COUNT;
    dqai.dqai_concurrent = !(idx % DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT);
    idx /= DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT;
    dqai.dqai_relpri = -(idx % DISPATCH_QUEUE_ATTR_PRIO_COUNT);
    idx /= DISPATCH_QUEUE_ATTR_PRIO_COUNT;
    dqai.dqai_qos = idx % DISPATCH_QUEUE_ATTR_QOS_COUNT;
    idx /= DISPATCH_QUEUE_ATTR_QOS_COUNT;
    dqai.dqai_autorelease_frequency =
    		idx % DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY_COUNT;
    idx /= DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY_COUNT;
    dqai.dqai_overcommit = idx % DISPATCH_QUEUE_ATTR_OVERCOMMIT_COUNT;
    idx /= DISPATCH_QUEUE_ATTR_OVERCOMMIT_COUNT;
    return dqai;
}
複製代碼

從上面能夠發現串行隊列就是一個空的結構體,併發隊列會對結構體進行一系列的默認配置和賦值,重點關注 dqai.dqai_concurrent 等一些 屬性都是有值的,這也會是咱們下面的源碼的一個分水嶺。接着繼續往下分析,在忽略了一些代碼以後,終於發現了重點所在 vtable,這個應該一個表,而後咱們來看看它都作了些什麼。異步

const void *vtable;
if (dqai.dqai_concurrent) {
    // 經過 dqai.dqai_concurrent 來區分併發和串行
    vtable = DISPATCH_VTABLE(queue_concurrent);
} else {
    vtable = DISPATCH_VTABLE(queue_serial);
}
複製代碼

建立了 vtable 以後再經過 vtable 開闢內存,生成響應的對象 queueasync

dispatch_lane_t dq = _dispatch_object_alloc(vtable,
		sizeof(struct dispatch_lane_s));

_dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
		DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
		(dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
複製代碼

這個 init 構造函數的第三個參數是個三目運算符,若是是併發隊列是一個宏定義,串行隊列是 1,而後再搜搜這個宏定義發現竟然是 #define DISPATCH_QUEUE_WIDTH_MAX (0x1000ull - 2),媽耶,這不就是上面咱們打印串行隊列和併發隊列的 width 的值嘛!原來就是在這初始化的。那爲啥是減 2 而不是減 1 呢?減 1 就是咱們的全局併發隊列!函數

// 驗證
#define DISPATCH_QUEUE_WIDTH_POOL (0x1000ull - 1)
struct dispatch_queue_global_s _dispatch_mgr_root_queue = {
	DISPATCH_GLOBAL_OBJECT_HEADER(queue_global),
	.dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
	.do_ctxt = &_dispatch_mgr_root_queue_pthread_context,
	.dq_label = "com.apple.root.libdispatch-manager",
	.dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL),
	.dq_priority = DISPATCH_PRIORITY_FLAG_MANAGER |
			DISPATCH_PRIORITY_SATURATED_OVERRIDE,
	.dq_serialnum = 3,
	.dgq_thread_pool_size = 1,
};
複製代碼

好,接着往下看,又找到了 dq->do_targetq = tq;,這個 tq 又是在哪賦值的呢?我在上面找到了賦值代碼。oop

#define DISPATCH_QOS_UNSPECIFIED ((dispatch_qos_t)0)
#define DISPATCH_QOS_DEFAULT ((dispatch_qos_t)4)
// Serial queues default to overcommit!
// 若是是併發 overcommit = _dispatch_queue_attr_overcommit_disabled
// 若是是串行 overcommit = _dispatch_queue_attr_overcommit_enabled
overcommit = dqai.dqai_concurrent ?
		_dispatch_queue_attr_overcommit_disabled :
		_dispatch_queue_attr_overcommit_enabled;

if (!tq) {
    tq = _dispatch_get_root_queue(
	qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos, // 4
	overcommit == _dispatch_queue_attr_overcommit_enabled)->_as_dq; // 0 1
}

static inline dispatch_queue_global_t
_dispatch_get_root_queue(dispatch_qos_t qos, bool overcommit)
{
	if (unlikely(qos < DISPATCH_QOS_MIN || qos > DISPATCH_QOS_MAX)) {
		DISPATCH_CLIENT_CRASH(qos, "Corrupted priority");
	}
	// qos 爲 4,4-1= 3
	// 2*3 + 0或者1 = 6/7
	// 而後再去數組 _dispatch_root_queues 裏取數組的 6 或者 7 的下標指針地址
	return &_dispatch_root_queues[2 * (qos - 1) + overcommit];
}
複製代碼

因爲 tq 的值是經過 _dispatch_root_queues 數組取出來的,直接到數組裏面看就一目瞭然了。我就只列舉了串行隊列、併發隊列(全局和併發是同樣的)和主隊列的。由此能夠發現 tq 就是 dq_label 的值,也就是外面隊列 target 的值。

_DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
	.dq_label = "com.apple.root.utility-qos.overcommit",
	.dq_serialnum = 9,
),
_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_FALLBACK,
	.dq_label = "com.apple.root.default-qos",
	.dq_serialnum = 10,
),
_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT,
		DISPATCH_PRIORITY_FLAG_FALLBACK | DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
	.dq_label = "com.apple.root.default-qos.overcommit",
	.dq_serialnum = 11,
),
複製代碼

既然串行隊列和併發隊列的 target 信息是從 _dispatch_root_queues 結構體數組取出來的,那麼 _dispatch_root_queues 又是在哪建立的呢?咱們來到最早初始化的 libdispatcdispatch_queue_createh_init 裏的查找,最終在 _dispatch_introspection_init 找到了一些代碼。

隊列是經過 for 循環,調用 _dispatch_trace_queue_create,再取出 _dispatch_root_queues 裏的地址指針一個一個建立出來的。

  • 小擴展:在 allocinit 裏面的源碼我都發現了 dispatch_object_t 這個類型,而後一搜竟然是一個聯合體,這個就有意思了,經過聯合體同一時間只能一個有值的特性給函數帶來了多態性。
typedef union {
	struct _os_object_s *_os_obj;
	struct dispatch_object_s *_do;
	struct dispatch_queue_s *_dq;
	struct dispatch_queue_attr_s *_dqa;
	struct dispatch_group_s *_dg;
	struct dispatch_source_s *_ds;
	struct dispatch_mach_s *_dm;
	struct dispatch_mach_msg_s *_dmsg;
	struct dispatch_semaphore_s *_dsema;
	struct dispatch_data_s *_ddata;
	struct dispatch_io_s *_dchannel;
} dispatch_object_t DISPATCH_TRANSPARENT_UNION;
複製代碼

到此,GCD 的大門就已經打開,接下來咱們來點深刻一點的東西,也讓你們不枉此行。

3、GCD 深刻了解

當咱們瞭解隊列的建立以後,添加任務就是一個 blockblock 參數的類型是 dispatch_block_t,它是一個沒有參數,沒有返回值的 block:typedef void (^dispatch_block_t)(void); 沒什麼太多的好研究的,接下來就來看看執行任務的函數都在底層作了些什麼吧。接下來的內容會比較的乾巴巴,可是看的越深,對本身幫助會更大。

1.執行任務的函數

執行任務的函數分爲兩種,同步和異步函數(即同步線程和異步線程)。

  • 同步函數:即 dispatch_sync,必須等待當前語句執行完畢,纔會執行下一條語 句,不會開啓線程,在當前執行任務。
  • 異步函數,即 dispatch_async。不用等待當前語句執行完畢,就能夠執行下一條語句,會開啓線程執行任務。

2. dispatch_sync 源碼分析

在分析同步函數源碼以前,先提出兩個疑問,同步是怎麼執行的呢?死鎖是在同步的狀況下形成的,那底層源碼又是怎麼樣的呢?帶着這兩個疑問點咱們接着分析dispatch_sync 源碼。 咱們來到 dispatch_sync 函數實現:

void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
    uintptr_t dc_flags = DC_FLAG_BLOCK;
    if (unlikely(_dispatch_block_has_private_data(work))) {
    	return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
    }
    _dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
複製代碼

dispatch_sync 裏的 unlikely 的意思是基本上不會走,而後就來到 _dispatch_sync_f 函數,_dispatch_sync_f 的第三個參數是將 block 包裝了一下,具體包裝以下:

#define _dispatch_Block_invoke(bb) \
		((dispatch_function_t)((struct Block_layout *)bb)->invoke)
複製代碼

省略各類分支以後就會來到 _dispatch_sync_f_inline 函數,實現以下:

static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func, uintptr_t dc_flags)
{
    // 串行就會走這下面
    if (likely(dq->dq_width == 1)) {
    	return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
    }
        // ... 下面代碼暫時省略
}
複製代碼

常常層層難關,發現串行隊列好像就是調用了 _dispatch_barrier_sync_f 函數,看到這個函數是否是想起了一些什麼,這是否是就是柵欄函數,那分析完同步以後,柵欄也就迎刃而解了,爽歪歪。繼續往裏分析,最終來到 _dispatch_barrier_sync_f_inline:

static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func, uintptr_t dc_flags)
{
    // 獲取線程ID
    dispatch_tid tid = _dispatch_tid_self();
    
    if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
    	DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
    }
    dispatch_lane_t dl = upcast(dq)._dl;
    
    // 死鎖
    if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
    	return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
    			DC_FLAG_BARRIER | dc_flags);
    }
    
    if (unlikely(dl->do_targetq->do_targetq)) {
    	return _dispatch_sync_recurse(dl, ctxt, func,
    			DC_FLAG_BARRIER | dc_flags);
    }
    _dispatch_introspection_sync_begin(dl);
    _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
    		DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
    				dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
複製代碼

在這個函數裏會先獲取線程 id,由於隊列須要綁定到線程而後依賴執行,而死鎖的緣由在於同步線程裏的任務出現你等我,我等你的現象,因此只有 _dispatch_queue_try_acquire_barrier_sync 用到了線程 id,在裏面果不其然發現了祕密:

static inline bool
_dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_lane_t dq,
		uint32_t tid, uint64_t suspend_count)
{
    uint64_t init  = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
    uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
    		_dispatch_lock_value_from_tid(tid) |
    		(suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL);
    uint64_t old_state, new_state;
    // 從 os 底層獲取信息,也就是經過線程和當前隊列獲取 new_state 返回出去
    return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
    	uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
    	if (old_state != (init | role)) {
    		os_atomic_rmw_loop_give_up(break);
    	}
    	new_state = value | role;
    });
}
複製代碼

os 底層獲取到了一個 new_state 以後,就會執行 _dispatch_sync_f_slow,若是你碰見過死鎖,對這個函數確定不陌生吧,我們就來看看裏面到底作了什麼,是否是有點火燒眉毛!

static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
		dispatch_function_t func, uintptr_t top_dc_flags,
		dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
	dispatch_queue_t top_dq = top_dqu._dq;
	dispatch_queue_t dq = dqu._dq;
	if (unlikely(!dq->do_targetq)) {
		return _dispatch_sync_function_invoke(dq, ctxt, func);
	}

	pthread_priority_t pp = _dispatch_get_priority();
	// 初始化保存 block 以及其餘信息的結構體
	struct dispatch_sync_context_s dsc = {
		.dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
		.dc_func     = _dispatch_async_and_wait_invoke,
		.dc_ctxt     = &dsc,
		.dc_other    = top_dq,
		.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
		.dc_voucher  = _voucher_get(),
		.dsc_func    = func,
		.dsc_ctxt    = ctxt,
		.dsc_waiter  = _dispatch_tid_self(),
	};
	// 將 block push 到 queue 裏面去
	_dispatch_trace_item_push(top_dq, &dsc);
	// 死鎖的最終函數
	__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);

	// ... 忽略了一些暫時不須要分析的代碼
}
複製代碼

經過 _dispatch_trace_item_push 函數能夠發現隊列其實就是一個用來提交 block 的對象,當 block push 到隊列中後,將按照 先入先出(FIFO) 的順序進行處理,系統在 GCD 的底層會維護一個線程池,用來執行這些 block。那這些 block 又是在哪調用的呢? 因爲步驟有點多,我就不一一分析了,把大體的流程圖寫在下面,你們能夠本身去跟一下,體驗一下源碼的樂趣。

dispatch_sync  
└──_dispatch_sync_f
    └──_dispatch_sync_invoke_and_complete
        └──_dispatch_sync_function_invoke_inline
           └──_dispatch_client_callout
              └──f(ctxt);
複製代碼

第一個同步是怎麼執行的問題解決了,接下來解決第二個死鎖問題,要想解決這個問題,就須要來到 __DISPATCH_WAIT_FOR_QUEUE__ 裏一探究竟。

static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
    // 獲取隊列的狀態,看是不是處於等待狀態
    uint64_t dq_state = _dispatch_wait_prepare(dq);
    if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
    	DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
    			"dispatch_sync called on queue "
    			"already owned by current thread");
    }
    // ... 忽略了一些暫時不須要分析的代碼
}
複製代碼

獲取到隊列的狀態以後,又調用 _dq_state_drain_locked_by 函數最後來到下面這個函數,這個函數就是死鎖的關鍵所在。

static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{   // lock_value 爲隊列狀態,tid 爲線程 id
    // ^ (異或運算法) 兩個相同就會出現 0 不然爲1
    return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
複製代碼

從上面能夠看出,若是隊列和線程都是處於等待狀態,最終的返回結果爲 YES,就會來到 DISPATCH_CLIENT_CRASH 形成 crash

3. dispatch_async 源碼分析

分析完同步函數以後,接下來就該來分析分析異步函數了,異步函數會須要開啓線程去執行任務,因此這應該會是一個重點,找到重點以後咱們就能夠繼續研究了,同樣先來到異步函數的實現:

void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
	dispatch_continuation_t dc = _dispatch_continuation_alloc();
	uintptr_t dc_flags = DC_FLAG_CONSUME;
	dispatch_qos_t qos;

	qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
	_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
複製代碼

異步函數會經過 _dispatch_continuation_init 先對 block 進行包裝即函數式保存,由於異步須要在合適的時機進行線程回調 block ,以後就調用 _dispatch_continuation_async 進行異步處理,因爲異步的分支比較多,我下面列出我所分析的步驟:

dispatch_async  
└──_dispatch_continuation_async
    └──dx_push
        └──dq_push
           └──_dispatch_root_queue_push
              └──_dispatch_root_queue_push_inline
                 └──_dispatch_root_queue_poke
                    └──_dispatch_root_queue_poke_slow
複製代碼

_dispatch_root_queue_poke_slow 裏就能找到開闢線程相關的代碼了,那咱們就來分析一波吧:

static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
    int remaining = n;
    int r = ENOSYS;
    
    _dispatch_root_queues_init();
    _dispatch_debug_root_queue(dq, __func__);
    _dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);
    #if !DISPATCH_USE_INTERNAL_WORKQUEUE
    #if DISPATCH_USE_PTHREAD_ROOT_QUEUES
    if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
    #endif
    {
    	_dispatch_root_queue_debug("requesting new worker thread for global "
    			"queue: %p", dq);
    	r = _pthread_workqueue_addthreads(remaining,
    			_dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
    	(void)dispatch_assume_zero(r);
    	return;
    }
    #endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
    #if DISPATCH_USE_PTHREAD_POOL
    dispatch_pthread_root_queue_context_t pqc = dq->do_ctxt;
    if (likely(pqc->dpq_thread_mediator.do_vtable)) {
    	while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
    		_dispatch_root_queue_debug("signaled sleeping worker for "
    				"global queue: %p", dq);
    		if (!--remaining) {
    			return;
    		}
    	}
    }
    
    bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    if (overcommit) {
        // 串行隊列
    	os_atomic_add2o(dq, dgq_pending, remaining, relaxed);
    } else {
    	if (!os_atomic_cmpxchg2o(dq, dgq_pending, 0, remaining, relaxed)) {
    		_dispatch_root_queue_debug("worker thread request still pending for "
    				"global queue: %p", dq);
    		return;
    	}
    }
    
    // floor 爲 0,remaining 是根據隊列任務的狀況處理的
    int can_request, t_count;
    // 獲取線程池的大小
    t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
    do {
        // 計算能夠請求的數量
        can_request = t_count < floor ? 0 : t_count - floor;
        if (remaining > can_request) {
    	    _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
    	    		remaining, can_request);
    	    os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
    	    remaining = can_request;
        }
        if (remaining == 0) {
            // 線程池滿了,就會報出異常的狀況
            _dispatch_root_queue_debug("pthread pool is full for root queue: "
    				"%p", dq);
                return;
    	}
    } while (!os_atomic_cmpxchgvw2o(dq, dgq_thread_pool_size, t_count,
    		t_count - remaining, &t_count, acquire));
    
    pthread_attr_t *attr = &pqc->dpq_thread_attr;
    pthread_t tid, *pthr = &tid;
    #if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
    if (unlikely(dq == &_dispatch_mgr_root_queue)) {
    	pthr = _dispatch_mgr_root_queue_init();
    }
    #endif
    do {
    	_dispatch_retain(dq); 
    	// 開闢線程
    	while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
    		if (r != EAGAIN) {
    			(void)dispatch_assume_zero(r);
    		}
    		_dispatch_temporary_resource_shortage();
    	}
    } while (--remaining);
    #else
    (void)floor;
    #endif // DISPATCH_USE_PTHREAD_POOL
}
複製代碼

好了,同步和異步暫時分析到這,接下來分析的內容相對比較簡單,semaphoregroup

4. semaphore 源碼分析

關於信號量的 API 很少,主要是三個,createwaitsignal。 信號量在初始化時要指定 value,隨後內部將這個 value 存儲起來。實際操做時會存兩個 value,一個是當前的value,一個是記錄初始 value。信號的 waitsignal 是互逆的兩個操做,若是 value 大於 0,前者將 value 減一,此時若是 value 小於零就一直等待。初始 value 必須大於等於 0,若是爲 0 並隨後調用 wait 方法,線程將被阻塞直到別的線程調用了 signal 方法。簡單的介紹了一下使用方法,接下來直接來看源碼吧。

(1) dispatch_semaphore_create

這個函數就是去初始化 dsema,並設置 value 的值。

dispatch_semaphore_t
dispatch_semaphore_create(long value)
{
    dispatch_semaphore_t dsema;
    // 若是 value 小於 0 直接返回 0
    if (value < 0) {
    	return DISPATCH_BAD_INPUT;
    }
    dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
    		sizeof(struct dispatch_semaphore_s));
    dsema->do_next = DISPATCH_OBJECT_LISTLESS;
    dsema->do_targetq = _dispatch_get_default_queue(false);
    dsema->dsema_value = value;
    _dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
    dsema->dsema_orig = value;
    return dsema;
}
複製代碼
(2) dispatch_semaphore_wait
long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
    long value = os_atomic_dec2o(dsema, dsema_value, acquire);
    if (likely(value >= 0)) {
	    return 0;
    }
    return _dispatch_semaphore_wait_slow(dsema, timeout);
}
複製代碼

dispatch_atomic_dec2o 是一個宏,會調用 GCC 內置的函數 __sync_sub_and_fetch,實現減法的原子性操做。所以這一行的意思是將 dsema 的值減一,並把新的值賦給 value。若是減一後的 value 大於等於 0 就馬上返回,沒有任何操做,不然進入等待狀態。

_dispatch_semaphore_wait_slow 函數針對不一樣的 timeout 參數,分了三種狀況考慮:

case DISPATCH_TIME_NOW:
    orig = dsema->dsema_value;
    while (orig < 0) {
    	if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
    			&orig, relaxed)) {
    		return _DSEMA4_TIMEOUT();
    	}
    }
複製代碼

這種狀況 while 判斷必定會成立,由於若是 value 大於等於 0,在上一個函數 dispatch_semaphore_wait 中就已經返回了,判斷成立,內部的 if 判斷必定也成立,此時會將 value 加一(也就是變爲 0) 並返回。加一的緣由是爲了抵消 wait 函數一開始的減一操做。此時函數調用方會獲得返回值 KERN_OPERATION_TIMED_OUT,表示因爲等待時間超時而返回。

第二種狀況是 DISPATCH_TIME_FOREVER 這個 case:

case DISPATCH_TIME_FOREVER:
    _dispatch_sema4_wait(&dsema->dsema_sema);
    break;
}
複製代碼

這種狀況會調用系統的 semaphore_wait 方法繼續等待,直到收到 signal 調用。

第三種就是一個默認的狀況:

default:
    if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
        break;
    }
複製代碼

default 分支下,咱們指定一個超時時間,在 _dispatch_sema4_timedwait 裏面會去判斷當前操做是否超時,這和 DISPATCH_TIME_FOREVER 的處理比較相似,不一樣的是咱們調用了內核提供的 semaphore_timedwait 方法能夠指定超時時間。

(3) dispatch_semaphore_signal

這個函數的實現相對來講比較簡單,由於它不須要阻塞,只用喚醒。

long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
    long value = os_atomic_inc2o(dsema, dsema_value, release);
    if (likely(value > 0)) {
    	return 0;
    }
    if (unlikely(value == LONG_MIN)) {
    	DISPATCH_CLIENT_CRASH(value,
    			"Unbalanced call to dispatch_semaphore_signal()");
    }
    return _dispatch_semaphore_signal_slow(dsema);
}
複製代碼

5. group 源碼分析

dispatch_group_create

當咱們瞭解了信號量以後,group 是一個很是容易理解的概念,咱們先看看如何建立 group:

dispatch_group_t
dispatch_group_create(void)
{
    return _dispatch_group_create_with_count(0);
}

static inline dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
    dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
    		sizeof(struct dispatch_group_s));
    dg->do_next = DISPATCH_OBJECT_LISTLESS;
    dg->do_targetq = _dispatch_get_default_queue(false);
    if (n) {
    	os_atomic_store2o(dg, dg_bits,
    			-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
    	os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
    }
    return dg;
}
複製代碼

這個方法就是開闢了內存空間,可是從 os_atomic_store2o 能夠看出 group 底層也維護了一個 value 值。

dispatch_group_enter

enter 能夠看出,當進組的時候會經過 os_atomic_sub_orig2ovalue 減 4。

void
dispatch_group_enter(dispatch_group_t dg)
{
    uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
    		DISPATCH_GROUP_VALUE_INTERVAL, acquire);
    uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
    if (unlikely(old_value == 0)) {
    	_dispatch_retain(dg); // <rdar://problem/22318411>
    }
    if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
    	DISPATCH_CLIENT_CRASH(old_bits,
    			"Too many nested calls to dispatch_group_enter()");
    }
}
複製代碼
dispatch_group_leave

出組的時候會對 value 進行加值,若是 new_stateold_state 相等,就會調用 _dispatch_group_wake 繼續後面代碼的執行,這個函數後面會說到。

void
dispatch_group_leave(dispatch_group_t dg)
{   
    uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
    		DISPATCH_GROUP_VALUE_INTERVAL, release);
    uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
    
    if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
    	old_state += DISPATCH_GROUP_VALUE_INTERVAL;
    	do {
    		new_state = old_state;
    		if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
    			new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
    			new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
    		} else {
    			new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
    		}
    		if (old_state == new_state) break;
    	} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
    			old_state, new_state, &old_state, relaxed)));
    	return _dispatch_group_wake(dg, old_state, true);
    }
    
    if (unlikely(old_value == 0)) {
    	DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
    			"Unbalanced call to dispatch_group_leave()");
    }
}
複製代碼
dispatch_group_async

dispatch_group_async 就是對 enterleave 的封裝,當 block 調用完成以後進行 callout 以後就出組了。

void
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_block_t db)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
    dispatch_qos_t qos;
    // 保存任務 
    qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
    _dispatch_continuation_group_async(dg, dq, dc, qos);
}

static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_continuation_t dc, dispatch_qos_t qos)
{   // 進組
    dispatch_group_enter(dg);
    dc->dc_data = dg;
    _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}

static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
    struct dispatch_object_s *dou = dc->dc_data;
    unsigned long type = dx_type(dou);
    if (type == DISPATCH_GROUP_TYPE) {
    	_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
    	_dispatch_trace_item_complete(dc);
    	// 出組
    	dispatch_group_leave((dispatch_group_t)dou);
    } else {
    	DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
    }
}
複製代碼
dispatch_group_wait

這個方法用於等待 group 中全部任務執行完成,能夠理解爲信號量 wait 的封裝:

long
dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout)
{
    uint64_t old_state, new_state;
    
    os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, relaxed, {
    	if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
    		os_atomic_rmw_loop_give_up_with_fence(acquire, return 0);
    	}
    	if (unlikely(timeout == 0)) {
    		os_atomic_rmw_loop_give_up(return _DSEMA4_TIMEOUT());
    	}
    	new_state = old_state | DISPATCH_GROUP_HAS_WAITERS;
    	if (unlikely(old_state & DISPATCH_GROUP_HAS_WAITERS)) {
    		os_atomic_rmw_loop_give_up(break);
    	}
    });
    
    return _dispatch_group_wait_slow(dg, _dg_state_gen(new_state), timeout);
}
複製代碼

若是當前 value 和原始 value 相同,代表任務已經所有完成,直接返回 0,若是 timeout 爲 0 也會馬上返回,不然調用 _dispatch_group_wait_slow。 在 _dispatch_group_wait_slow 會一直等到任務完成返回 0 ,固然若是一直沒有完成就會返回 timeout

_dispatch_group_wake

這個函數主要作的就是循環調用 dispatch_async_f 異步執行在 notify 函數中註冊的回調。

static void
_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
    uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
    
    if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
    	dispatch_continuation_t dc, next_dc, tail;
    
    	// Snapshot before anything is notified/woken
    	dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
    	do {
    		dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
    		next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
    		_dispatch_continuation_async(dsn_queue, dc,
    				_dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
    		_dispatch_release(dsn_queue);
    	} while ((dc = next_dc));
    
    	refs++;
    }
    if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
    	_dispatch_wake_by_address(&dg->dg_gen);
    }
    if (refs) _dispatch_release_n(dg, refs);
}
複製代碼

6. 小擴展: dispatch_once 的源碼分析

dispatch_once 僅僅是一個包裝,內部直接調用了 dispatch_once_f:

void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
    dispatch_once_gate_t l = (dispatch_once_gate_t)val;
    #if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
    if (likely(v == DLOCK_ONCE_DONE)) {
    	return;
    }
    #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    if (likely(DISPATCH_ONCE_IS_GEN(v))) {
    	return _dispatch_once_mark_done_if_quiesced(l, v);
    }
    #endif
    #endif
    if (_dispatch_once_gate_tryenter(l)) {
    	return _dispatch_once_callout(l, ctxt, func);
    }
    return _dispatch_once_wait(l);
}
複製代碼

第一次調用時外部傳進來的 onceToken 仍是空指針,因此 valNULL_dispatch_once_gate_tryenter(l) 判斷 l->dgo_once 是否標記爲 DLOCK_ONCE_UNLOCKED,可是 #define DLOCK_ONCE_UNLOCKED ((uintptr_t)0),因此 if 判斷是成立的,就會進行 block 回調,而後在經過 _dispatch_once_gate_broadcastl->dgo_once 標記爲 DLOCK_ONCE_DONE,下次再進來的時候就會直接返回保證代碼只執行一次。

4、總結

本文主要整理了 GCD 中常見的 API 以及底層的實現原理。

  • dispatch_sync 將任務 block 經過 push 到隊列中,而後按照 FIFO 去執行。
  • dispatch_async 會把任務包裝並保存,以後就會開闢相應線程去執行已保存的任務。
  • semaphore 主要使用 signalwait 這兩個接口,底層分別調用了內核提供的方法。在調用 signal 方法後,先將 value 減一,若是大於零馬上返回,不然陷入等待。signal 方法將信號量加一,若是 value 大於零馬上返回,不然說明喚醒了某一個等待線程,此時由系統決定哪一個線程的等待方法能夠返回。
  • dispatch_group 底層也是維護了一個 value 的值,等待 group 完成實際上就是等待 value 恢復初始值。而 notify 的做用是將全部註冊的回調組裝成一個鏈表,在 dispatch_async 完成時判斷 value 是否是恢復初始值,若是是則調用 dispatch_async 異步執行全部註冊的回調。
  • dispatch_once 經過一個靜態變量來標記 block 是否已被執行,同時使用加鎖確保只有一個線程能執行,執行完 block 後會喚醒其餘全部等待的線程。

最後在此感謝 深刻了解GCD 對個人啓發。

相關文章
相關標籤/搜索