iOS探索 多線程之GCD底層分析

歡迎閱讀iOS探索系列(按序閱讀食用效果更加)c++

寫在前面

因爲源碼的篇幅較大、邏輯分支、宏定義較多,使得源碼變得晦澀難懂,讓開發者們望而卻步。但若是帶着疑問、有目的性的去看源碼,就能減小難度,忽略無關的代碼。首先提出本文分析的幾個問題:面試

  • 底層隊列是如何建立的
  • 死鎖的產生
  • dispatch_block任務的執行
  • 同步函數
  • 異步函數
  • 信號量的原理
  • 調度組的原理
  • 單例的原理

本文篇幅會比較大,函數之間的跳轉也比較多,但只對核心流程代碼作了研究,相信看下來應該會有所收穫算法

源碼的選擇判斷

分析源碼首先得獲取到GCD源碼,以前已經分析過objcmallocdyld源碼,那麼GCD內容是在哪份源碼中呢?數組

這裏分享一個小技巧,因爲已知要研究GCD,因此有如下幾種選擇源碼的方法sass

  • Baidu/Google
  • 下符號斷點dispatch_queue_create
  • 僅使用Debug->Debug Workflow->Always show Disassembly查看彙編也能看到

這樣子就找到了咱們須要的libdispatch源碼bash

1、底層隊列是如何建立的

上層使用dispatch_queue_create,全局進行搜索。可是會出現搜索結果衆多的狀況(66 results in 17 files),這時候就考驗一個開發者閱讀源碼的經驗了多線程

  • 新手會一個個找過去,寧肯錯殺一千不可放過一個
  • 老司機則會根據上層使用修改搜索條件
    • 因爲建立隊列代碼爲dispatch_queue_create("", NULL),因此搜索dispatch_queue_create(——將篩選結果降至(21 results in 6 files)
    • 因爲第一個參數爲字符串,在c語言中用const修飾,因此搜索dispatch_queue_create(const——將篩選結果降至(2 results in 2 files)

1.dispatch_queue_create

常規中間層封裝——便於代碼迭代不改變上層使用併發

dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
	return _dispatch_lane_create_with_target(label, attr,
			DISPATCH_TARGET_QUEUE_DEFAULT, true);
}
複製代碼

有時候也須要注意下源碼中函數中的傳參:app

  • 此時label是上層的逆序全程域名,主要用在崩潰調試
  • attrNULL/DISPATCH_QUEUE_SERIALDISPATCH_QUEUE_CONCURRENT,用於區分隊列是異步仍是同步的

#define DISPATCH_QUEUE_SERIAL NULL 串行隊列的宏定義實際上是個NULLless

2._dispatch_lane_create_with_target

DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
		dispatch_queue_t tq, bool legacy)
{
	
	dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
	...
}
複製代碼

dqa是這個函數中的第二個參數,即dispatch_queue_create中的attr

用到了串行/併發的區分符,咱們就跟進去瞧瞧

3._dispatch_queue_attr_to_info

dispatch_queue_attr_info_t
_dispatch_queue_attr_to_info(dispatch_queue_attr_t dqa)
{
	dispatch_queue_attr_info_t dqai = { };

	if (!dqa) return dqai;

#if DISPATCH_VARIANT_STATIC
	if (dqa == &_dispatch_queue_attr_concurrent) {
		dqai.dqai_concurrent = true;
		return dqai;
	}
#endif

	if (dqa < _dispatch_queue_attrs ||
			dqa >= &_dispatch_queue_attrs[DISPATCH_QUEUE_ATTR_COUNT]) {
		DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
	}

	size_t idx = (size_t)(dqa - _dispatch_queue_attrs);
	
	dqai.dqai_inactive = (idx % DISPATCH_QUEUE_ATTR_INACTIVE_COUNT);
	idx /= DISPATCH_QUEUE_ATTR_INACTIVE_COUNT;

	dqai.dqai_concurrent = !(idx % DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT);
	idx /= DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT;

	dqai.dqai_relpri = -(idx % DISPATCH_QUEUE_ATTR_PRIO_COUNT);
	idx /= DISPATCH_QUEUE_ATTR_PRIO_COUNT;

	dqai.dqai_qos = idx % DISPATCH_QUEUE_ATTR_QOS_COUNT;
	idx /= DISPATCH_QUEUE_ATTR_QOS_COUNT;

	dqai.dqai_autorelease_frequency =
			idx % DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY_COUNT;
	idx /= DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY_COUNT;

	dqai.dqai_overcommit = idx % DISPATCH_QUEUE_ATTR_OVERCOMMIT_COUNT;
	idx /= DISPATCH_QUEUE_ATTR_OVERCOMMIT_COUNT;

	return dqai;
}
複製代碼
  • dispatch_queue_attr_info_t dqai = { };進行初始化
    • dispatch_queue_attr_info_tisa同樣,是個位域結構
typedef struct dispatch_queue_attr_info_s {
	dispatch_qos_t dqai_qos : 8;
	int      dqai_relpri : 8;
	uint16_t dqai_overcommit:2;
	uint16_t dqai_autorelease_frequency:2;
	uint16_t dqai_concurrent:1;
	uint16_t dqai_inactive:1;
} dispatch_queue_attr_info_t;
複製代碼
  • 接下來把目光放到這句代碼if (!dqa) return dqai;
    • 串行隊列的dqa爲NULL,直接返回NULL
    • 異步隊列往下繼續走
  • size_t idx = (size_t)(dqa - _dispatch_queue_attrs);
    • 使用DISPATCH_QUEUE_CONCURRENT的宏定義來進行位運算
    • 併發隊列的併發數dqai.dqai_concurrent與串行隊列不一樣
#define DISPATCH_QUEUE_CONCURRENT \ DISPATCH_GLOBAL_OBJECT(dispatch_queue_attr_t, \ _dispatch_queue_attr_concurrent)
複製代碼

4.回到_dispatch_lane_create_with_target

咱們要研究的是隊列的建立,因此能夠忽略源碼中的細節,專一查找alloc之類的代碼

DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
		dispatch_queue_t tq, bool legacy)
{
	
	dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
	
	dispatch_qos_t qos = dqai.dqai_qos;
	...
	
	// 這是部分的邏輯分支
	if (tq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) {
		overcommit = _dispatch_queue_attr_overcommit_enabled;
	} else {
		overcommit = _dispatch_queue_attr_overcommit_disabled;
	}
	
	if (!tq) {
		tq = _dispatch_get_root_queue(
				qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos, // 4
				overcommit == _dispatch_queue_attr_overcommit_enabled)->_as_dq; // 0 1
		if (unlikely(!tq)) {
			DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
		}
	}
	
	...
    
        // 開闢內存 - 生成響應的對象 queue
	dispatch_lane_t dq = _dispatch_object_alloc(vtable,
			sizeof(struct dispatch_lane_s));
	
	// 構造方法
	_dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
			DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
			(dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
	// 標籤
	dq->dq_label = label;
	// 優先級
	dq->dq_priority = _dispatch_priority_make((dispatch_qos_t)dqai.dqai_qos,
			dqai.dqai_relpri);
	if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
		dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
	}
	if (!dqai.dqai_inactive) {
		_dispatch_queue_priority_inherit_from_target(dq, tq);
		_dispatch_lane_inherit_wlh_from_target(dq, tq);
	}
	_dispatch_retain(tq);
	dq->do_targetq = tq;
	_dispatch_object_debug(dq, "%s", __func__);
	return _dispatch_trace_queue_create(dq)._dq;
}
複製代碼
4.1 _dispatch_get_root_queue建立
  • tq是當前函數_dispatch_lane_create_with_target中的參數,該函數被調用時傳了DISPATCH_TARGET_QUEUE_DEFAULT,因此if (!tq)必定爲真
#define DISPATCH_TARGET_QUEUE_DEFAULT NULL
複製代碼
  • _dispatch_get_root_queue兩個傳參分別爲40/1
    • qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos因爲沒有對qos進行過賦值,因此默認爲0
    • overcommit == _dispatch_queue_attr_overcommit_enabled)->_as_dq;在代碼區有備註——「這是部分的邏輯分支」,串行隊列爲_dispatch_queue_attr_overcommit_disabled,併發隊列是_dispatch_queue_attr_overcommit_enabled
#define DISPATCH_QOS_UNSPECIFIED ((dispatch_qos_t)0)
#define DISPATCH_QOS_DEFAULT ((dispatch_qos_t)4)

DISPATCH_ALWAYS_INLINE DISPATCH_CONST
static inline dispatch_queue_global_t
_dispatch_get_root_queue(dispatch_qos_t qos, bool overcommit)
{
	if (unlikely(qos < DISPATCH_QOS_MIN || qos > DISPATCH_QOS_MAX)) {
		DISPATCH_CLIENT_CRASH(qos, "Corrupted priority");
	}
	return &_dispatch_root_queues[2 * (qos - 1) + overcommit];
}
複製代碼
  • 串行隊列、併發隊列分別renturn &_dispatch_root_queues[6]&_dispatch_root_queues[7]

全局搜索_dispatch_root_queues[](由於是個數組)能夠看到數組中第7個和第8個分別是串行隊列和併發隊列

猜測:自定義隊列是從_dispatch_root_queues模板中拿出來建立的

4.2 _dispatch_object_alloc開闢內存

其實GCD對象多爲dispatch_object_t建立而來:蘋果源碼註釋中有提到——它是全部分派對象的抽象基類型;dispatch_object_t做爲一個聯合體,只要一建立,就可使用你想要的類型(聯合體的「有我沒他」屬性)

聯合體的思想屬於多態,跟objc_object的繼承思想略有不一樣

/*! * @typedef dispatch_object_t * * @abstract * Abstract base type for all dispatch objects. * The details of the type definition are language-specific. * * @discussion * Dispatch objects are reference counted via calls to dispatch_retain() and * dispatch_release(). */
typedef union {
	struct _os_object_s *_os_obj;
	struct dispatch_object_s *_do;
	struct dispatch_queue_s *_dq;
	struct dispatch_queue_attr_s *_dqa;
	struct dispatch_group_s *_dg;
	struct dispatch_source_s *_ds;
	struct dispatch_mach_s *_dm;
	struct dispatch_mach_msg_s *_dmsg;
	struct dispatch_semaphore_s *_dsema;
	struct dispatch_data_s *_ddata;
	struct dispatch_io_s *_dchannel;
} dispatch_object_t DISPATCH_TRANSPARENT_UNION;
複製代碼
4.3 _dispatch_queue_init進行構造
  • 前文中提到了併發隊列的dqai.dqai_concurrent進行了設置,因此用dqai.dqai_concurrent進行區分併發隊列和串行隊列
  • 串行隊列的width爲1,併發隊列的widthDISPATCH_QUEUE_WIDTH_MAX
  • 咔咔一頓操做以後返回dispatch_queue_class_t,即對傳參dqu進行了賦值修改等操做
4.4 返回dispatch_queue_t
_dispatch_retain(tq);
dq->do_targetq = tq;
_dispatch_object_debug(dq, "%s", __func__);
return _dispatch_trace_queue_create(dq)._dq;
複製代碼
  • dqdispatch_lane_t類型
  • tqdispatch_queue_t類型
  • _dispatch_trace_queue_create(dq)返回類型爲dispatch_queue_class_t,它是個聯合體
    • dispatch_queue_class_t中的dq就是最終返回的dispatch_queue_t
typedef struct dispatch_queue_s *dispatch_queue_t;
typedef union {
	struct dispatch_queue_s *_dq;
	struct dispatch_workloop_s *_dwl;
	struct dispatch_lane_s *_dl;
	struct dispatch_queue_static_s *_dsq;
	struct dispatch_queue_global_s *_dgq;
	struct dispatch_queue_pthread_root_s *_dpq;
	struct dispatch_source_s *_ds;
	struct dispatch_mach_s *_dm;
	dispatch_lane_class_t _dlu;
#ifdef __OBJC__
	id<OS_dispatch_queue> _objc_dq;
#endif
} dispatch_queue_class_t DISPATCH_TRANSPARENT_UNION;
複製代碼

5.驗證猜測

NSLog會調用對象的describtion方法,而LLDB能夠打印底層的指針

  • 能夠看到串行隊列、併發隊列的target與模板中對應的如出一轍
  • 一樣的,串行隊列、併發隊列、主隊列、全局隊列的width各不相同(width表示隊列中能調度的最大任務數)
    • 串行隊列和主隊列的width沒有疑問
    • 併發隊列的widthDISPATCH_QUEUE_WIDTH_MAX是滿值-2
    • 全局隊列的widthDISPATCH_QUEUE_WIDTH_POOL是滿值-1
#define DISPATCH_QUEUE_WIDTH_FULL_BIT 0x0020000000000000ull
#define DISPATCH_QUEUE_WIDTH_FULL 0x1000ull
#define DISPATCH_QUEUE_WIDTH_POOL (DISPATCH_QUEUE_WIDTH_FULL - 1)
#define DISPATCH_QUEUE_WIDTH_MAX (DISPATCH_QUEUE_WIDTH_FULL - 2)

struct dispatch_queue_static_s _dispatch_main_q = {
	DISPATCH_GLOBAL_OBJECT_HEADER(queue_main),
#if !DISPATCH_USE_RESOLVERS
	.do_targetq = _dispatch_get_default_queue(true),
#endif
	.dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1) |
			DISPATCH_QUEUE_ROLE_BASE_ANON,
	.dq_label = "com.apple.main-thread",
	.dq_atomic_flags = DQF_THREAD_BOUND | DQF_WIDTH(1),
	.dq_serialnum = 1,
};

struct dispatch_queue_global_s _dispatch_mgr_root_queue = {
	DISPATCH_GLOBAL_OBJECT_HEADER(queue_global),
	.dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
	.do_ctxt = &_dispatch_mgr_root_queue_pthread_context,
	.dq_label = "com.apple.root.libdispatch-manager",
	.dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL),
	.dq_priority = DISPATCH_PRIORITY_FLAG_MANAGER |
			DISPATCH_PRIORITY_SATURATED_OVERRIDE,
	.dq_serialnum = 3,
	.dgq_thread_pool_size = 1,
};
複製代碼

解決了自定義隊列的建立流程,那麼問題又來了,_dispatch_root_queues的建立又是怎麼建立的呢?

6._dispatch_root_queues的建立

除了dispatch_get_main_queue,其餘隊列都是經過_dispatch_root_queues建立的

libdispatch_init以後調用_dispatch_introspection_init,經過 for 循環,調用_dispatch_trace_queue_create,再取出_dispatch_root_queues裏的地址指針一個個建立出來的

7.一圖看懂自定義隊列建立流程

2、死鎖的產生

死鎖的產生是因爲任務的相互等待,那麼底層又是怎麼實現死鎖的呢?

1.dispatch_sync

全局搜索dispatch_sync(dispatch,忽略unlikely小几率事件

DISPATCH_NOINLINE void dispatch_sync(dispatch_queue_t dq, dispatch_block_t work) {
	uintptr_t dc_flags = DC_FLAG_BLOCK;
	if (unlikely(_dispatch_block_has_private_data(work))) {
		return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
	}
	_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
#endif // __BLOCKS__
複製代碼

2._dispatch_sync_f

仍是常規的中間層封裝

DISPATCH_NOINLINE
static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
		uintptr_t dc_flags)
{
	_dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}
複製代碼

3._dispatch_sync_f_inline

  • 已知串行隊列width是1,因此串行隊列知足dq->dq_width == 1
    • return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
  • 併發隊列會繼續往下走
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func, uintptr_t dc_flags)
{
	if (likely(dq->dq_width == 1)) {
		return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
	}

	if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
		DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
	}

	dispatch_lane_t dl = upcast(dq)._dl;
	// Global concurrent queues and queues bound to non-dispatch threads
	// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
	if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
		return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
	}

	if (unlikely(dq->do_targetq->do_targetq)) {
		return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
	}
	_dispatch_introspection_sync_begin(dl);
	_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
			_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}
複製代碼

4._dispatch_barrier_sync_f

串行隊列柵欄函數的比較類似,因此跳轉到這裏,仍是中間層封裝

DISPATCH_NOINLINE
static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func, uintptr_t dc_flags)
{
	_dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}
複製代碼

5._dispatch_barrier_sync_f_inline

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func, uintptr_t dc_flags)
{
	dispatch_tid tid = _dispatch_tid_self();

	if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
		DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
	}

	dispatch_lane_t dl = upcast(dq)._dl;
	// The more correct thing to do would be to merge the qos of the thread
	// that just acquired the barrier lock into the queue state.
	//
	// However this is too expensive for the fast path, so skip doing it.
	// The chosen tradeoff is that if an enqueue on a lower priority thread
	// contends with this fast path, this thread may receive a useless override.
	//
	// Global concurrent queues and queues bound to non-dispatch threads
	// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
	
	// 死鎖
	if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
		return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
				DC_FLAG_BARRIER | dc_flags);
	}

	if (unlikely(dl->do_targetq->do_targetq)) {
		return _dispatch_sync_recurse(dl, ctxt, func,
				DC_FLAG_BARRIER | dc_flags);
	}
	_dispatch_introspection_sync_begin(dl);
	_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
			DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
					dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
複製代碼
5.1 _dispatch_tid_self

_dispatch_tid_self是個宏定義,最終調用_dispatch_thread_getspecific來獲取當前線程id(線程是以key-value的形式存在的)

#define _dispatch_tid_self() ((dispatch_tid)_dispatch_thread_port())

#if TARGET_OS_WIN32
#define _dispatch_thread_port() ((mach_port_t)0)
#elif !DISPATCH_USE_THREAD_LOCAL_STORAGE
#if DISPATCH_USE_DIRECT_TSD
#define _dispatch_thread_port() ((mach_port_t)(uintptr_t)\
		_dispatch_thread_getspecific(_PTHREAD_TSD_SLOT_MACH_THREAD_SELF))
#else
#define _dispatch_thread_port() pthread_mach_thread_np(_dispatch_thread_self())
#endif
#endif
複製代碼

是時候表演真正的技術了!接下來就是死鎖的核心分析!

5.2 _dispatch_queue_try_acquire_barrier_sync

_dispatch_queue_try_acquire_barrier_sync會從os底層獲取到一個狀態

DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
static inline bool
_dispatch_queue_try_acquire_barrier_sync(dispatch_queue_class_t dq, uint32_t tid)
{
	return _dispatch_queue_try_acquire_barrier_sync_and_suspend(dq._dl, tid, 0);
}

DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
static inline bool
_dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_lane_t dq,
		uint32_t tid, uint64_t suspend_count)
{
	uint64_t init  = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
	uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
			_dispatch_lock_value_from_tid(tid) |
			(suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL);
	uint64_t old_state, new_state;
	// 從底層獲取信息 -- 狀態信息 - 當前隊列 - 線程
	return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
		uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
		if (old_state != (init | role)) {
			os_atomic_rmw_loop_give_up(break);
		}
		new_state = value | role;
	});
}
複製代碼
5.3 _dispatch_sync_f_slow

5.2獲取到new_state就會來到這裏(死鎖崩潰時的調用棧中就有這個函數)

DISPATCH_NOINLINE
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
		dispatch_function_t func, uintptr_t top_dc_flags,
		dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
	dispatch_queue_t top_dq = top_dqu._dq;
	dispatch_queue_t dq = dqu._dq;
	if (unlikely(!dq->do_targetq)) {
		return _dispatch_sync_function_invoke(dq, ctxt, func);
	}

	pthread_priority_t pp = _dispatch_get_priority();
	struct dispatch_sync_context_s dsc = {
		.dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
		.dc_func     = _dispatch_async_and_wait_invoke,
		.dc_ctxt     = &dsc,
		.dc_other    = top_dq,
		.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
		.dc_voucher  = _voucher_get(),
		.dsc_func    = func,
		.dsc_ctxt    = ctxt,
		.dsc_waiter  = _dispatch_tid_self(),
	};
	

	_dispatch_trace_item_push(top_dq, &dsc);
	__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);

	if (dsc.dsc_func == NULL) {
		dispatch_queue_t stop_dq = dsc.dc_other;
		return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
	}

	_dispatch_introspection_sync_begin(top_dq);
	_dispatch_trace_item_pop(top_dq, &dsc);
	_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
			DISPATCH_TRACE_ARG(&dsc));
}
複製代碼
  • _dispatch_trace_item_push壓棧操做,將執行任務push到隊列中,按照FIFO執行
  • __DISPATCH_WAIT_FOR_QUEUE__是崩潰棧的最後一個函數
DISPATCH_NOINLINE
static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
	// 獲取隊列的狀態,看是不是處於等待狀態
    uint64_t dq_state = _dispatch_wait_prepare(dq);
    if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
    	DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
    			"dispatch_sync called on queue "
    			"already owned by current thread");
    }
	...
}
複製代碼
5.4 _dq_state_drain_locked_by

比較當前等待的value和線程tid,若是爲YES就返回回去進行報錯處理

DISPATCH_ALWAYS_INLINE
static inline bool
_dq_state_drain_locked_by(uint64_t dq_state, dispatch_tid tid)
{
	return _dispatch_lock_is_locked_by((dispatch_lock)dq_state, tid);
}

DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
	// equivalent to _dispatch_lock_owner(lock_value) == tid
	// ^ (異或運算法) 兩個相同就會出現 0 不然爲1
	return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
複製代碼

6.一圖看懂死鎖流程

  • 死鎖的產生是線程tid和當前等待狀態轉換後的value做比較
  • 同步執行dispatch_sync會進行壓棧操做,按照FIFO去執行
  • 柵欄函數同步執行是差很少的

3、dispatch_block任務的執行

dispatch_block處打下斷點,LLDB調試打印出函數調用棧

1._dispatch_lane_barrier_sync_invoke_and_complete

這裏也採用了相似於上文中的os底層回調,至於爲何用回調——任務的執行依賴於線程的狀態,若是線程狀態不夠良好的話任務不會執行

DISPATCH_NOINLINE
static void
_dispatch_lane_barrier_sync_invoke_and_complete(dispatch_lane_t dq,
		void *ctxt, dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
{
        ...
	// similar to _dispatch_queue_drain_try_unlock
	os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
		new_state  = old_state - DISPATCH_QUEUE_SERIAL_DRAIN_OWNED;
		new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
		new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
		if (unlikely(old_state & fail_unlock_mask)) {
			os_atomic_rmw_loop_give_up({
				return _dispatch_lane_barrier_complete(dq, 0, 0);
			});
		}
	});
	if (_dq_state_is_base_wlh(old_state)) {
		_dispatch_event_loop_assert_not_owned((dispatch_wlh_t)dq);
	}
}
複製代碼
  • _dispatch_lane_barrier_complete

直接跟到_dispatch_lane_class_barrier_complete

DISPATCH_NOINLINE
static void
_dispatch_lane_barrier_complete(dispatch_lane_class_t dqu, dispatch_qos_t qos,
		dispatch_wakeup_flags_t flags)
{
        ...
	uint64_t owned = DISPATCH_QUEUE_IN_BARRIER +
			dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
	return _dispatch_lane_class_barrier_complete(dq, qos, flags, target, owned);
}
複製代碼
  • _dispatch_lane_class_barrier_complete

跟進_dispatch_queue_push_queue

DISPATCH_NOINLINE
static void
_dispatch_lane_class_barrier_complete(dispatch_lane_t dq, dispatch_qos_t qos,
		dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target,
		uint64_t owned)
{
    ...
    if (tq) {
		if (likely((old_state ^ new_state) & enqueue)) {
			dispatch_assert(_dq_state_is_enqueued(new_state));
			dispatch_assert(flags & DISPATCH_WAKEUP_CONSUME_2);
			return _dispatch_queue_push_queue(tq, dq, new_state);
		}
		...
	}
}
複製代碼
  • _dispatch_queue_push_queue

而其中的dx_push是個宏定義

#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_queue_push_queue(dispatch_queue_t tq, dispatch_queue_class_t dq,
		uint64_t dq_state)
{
	_dispatch_trace_item_push(tq, dq);
	return dx_push(tq, dq, _dq_state_max_qos(dq_state));
}
複製代碼
  • dq_push 全局搜索來到dq_push,選擇_dispatch_root_queue_push繼續走下去

  • _dispatch_root_queue_push

大機率會走_dispatch_root_queue_push_inline

DISPATCH_NOINLINE
void
_dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
		dispatch_qos_t qos)
{   
        ...
	#if HAVE_PTHREAD_WORKQUEUE_QOS
	if (_dispatch_root_queue_push_needs_override(rq, qos)) {
		return _dispatch_root_queue_push_override(rq, dou, qos);
	}
#else
	(void)qos;
#endif
	_dispatch_root_queue_push_inline(rq, dou, dou, 1);
}
複製代碼
  • _dispatch_root_queue_push_inline
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
		dispatch_object_t _head, dispatch_object_t _tail, int n)
{
	struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
	if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
		return _dispatch_root_queue_poke(dq, n, 0);
	}
}
複製代碼
  • _dispatch_root_queue_poke
DISPATCH_NOINLINE
void
_dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
{
	...
	return _dispatch_root_queue_poke_slow(dq, n, floor);
}
複製代碼
  • _dispatch_root_queue_poke_slow
DISPATCH_NOINLINE
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
	int remaining = n;
	int r = ENOSYS;

	_dispatch_root_queues_init();
	_dispatch_debug_root_queue(dq, __func__);
	_dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);
	...
}
複製代碼
  • _dispatch_root_queues_init

跟到核心方法dispatch_once_f

static inline void
_dispatch_root_queues_init(void)
{
	dispatch_once_f(&_dispatch_root_queues_pred, NULL,
			_dispatch_root_queues_init_once);
}
複製代碼
  • dispatch_once_f 當你看到_dispatch_once_callout函數就離成功不遠了
DISPATCH_NOINLINE void dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func) {
	// 若是你來過一次 -- 下次就不來
	dispatch_once_gate_t l = (dispatch_once_gate_t)val;
	//DLOCK_ONCE_DONE
#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
	uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
	if (likely(v == DLOCK_ONCE_DONE)) {
		return;
	}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
	if (likely(DISPATCH_ONCE_IS_GEN(v))) {
		return _dispatch_once_mark_done_if_quiesced(l, v);
	}
#endif
#endif

	// 知足條件 -- 試圖進去
	if (_dispatch_once_gate_tryenter(l)) {
		// 單利調用 -- v->DLOCK_ONCE_DONE
		return _dispatch_once_callout(l, ctxt, func);
	}
	return _dispatch_once_wait(l);
}
複製代碼
  • _dispatch_once_callout
DISPATCH_NOINLINE
static void
_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
		dispatch_function_t func)
{
	_dispatch_client_callout(ctxt, func);
	_dispatch_once_gate_broadcast(l);
}
複製代碼

2._dispatch_client_callout

f(ctxt)調用執行dispatch_function_t——dispatch_block的執行點

DISPATCH_NOINLINE
void
_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
	_dispatch_get_tsd_base();
	void *u = _dispatch_get_unwind_tsd();
	if (likely(!u)) return f(ctxt);
	_dispatch_set_unwind_tsd(NULL);
	f(ctxt);
	_dispatch_free_unwind_tsd();
	_dispatch_set_unwind_tsd(u);
}
複製代碼

3.一圖看懂任務保存流程

最後的最後咱們找到了任務執行點,但沒有找到任務的保存點,接下來就要從同步函數和異步函數提及了

4、同步函數

前文中已經跟過dispatch_sync的實現了,這裏上一張圖再捋一捋(特別注意work和func的調用)

  • 串行隊列dq->dq_width == 1分支
    • _dispatch_barrier_sync_f -> _dispatch_barrier_sync_f_inline -> _dispatch_lane_barrier_sync_invoke_and_complete
    • 而後就是3、dispatch_block任務的執行中的流程
  • 其餘狀況大機率走_dispatch_sync_invoke_and_complete

1._dispatch_sync_invoke_and_complete

保存func並調用_dispatch_sync_function_invoke_inline

DISPATCH_NOINLINE
static void
_dispatch_sync_invoke_and_complete(dispatch_lane_t dq, void *ctxt,
		dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
{
	_dispatch_sync_function_invoke_inline(dq, ctxt, func);
	_dispatch_trace_item_complete(dc);
	_dispatch_lane_non_barrier_complete(dq, 0);
}
複製代碼

2._dispatch_sync_function_invoke_inline

直接調用_dispatch_client_callout3、dispatch_block任務的執行遙相呼應

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
		dispatch_function_t func)
{
	dispatch_thread_frame_s dtf;
	_dispatch_thread_frame_push(&dtf, dq);
	// f(ctxt) -- func(ctxt)
	_dispatch_client_callout(ctxt, func);
	_dispatch_perfmon_workitem_inc();
	_dispatch_thread_frame_pop(&dtf);
}
複製代碼

3.一圖看懂同步函數執行的部分流程

5、異步函數

1. 任務的保存

仍是同樣的思路,跟到dispatch_async的源碼實現中,關注dispatch_block_t

1.1 dispatch_async
void dispatch_async(dispatch_queue_t dq, dispatch_block_t work) {
	dispatch_continuation_t dc = _dispatch_continuation_alloc();
	uintptr_t dc_flags = DC_FLAG_CONSUME;
	dispatch_qos_t qos;

	qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
	_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
複製代碼
1.2 _dispatch_continuation_init

_dispatch_Block_invoke將任務進行統一格式化

DISPATCH_ALWAYS_INLINE
static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
		dispatch_queue_class_t dqu, dispatch_block_t work,
		dispatch_block_flags_t flags, uintptr_t dc_flags)
{
	void *ctxt = _dispatch_Block_copy(work);

	dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
	if (unlikely(_dispatch_block_has_private_data(work))) {
		dc->dc_flags = dc_flags;
		dc->dc_ctxt = ctxt;
		// will initialize all fields but requires dc_flags & dc_ctxt to be set
		return _dispatch_continuation_init_slow(dc, dqu, flags);
	}

	dispatch_function_t func = _dispatch_Block_invoke(work);
	if (dc_flags & DC_FLAG_CONSUME) {
		func = _dispatch_call_block_and_release;
	}
	return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}
複製代碼
1.3 _dispatch_continuation_init_f

dc->dc_func = f將block任務 保存下來

DISPATCH_ALWAYS_INLINE
static inline dispatch_qos_t
_dispatch_continuation_init_f(dispatch_continuation_t dc,
		dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f,
		dispatch_block_flags_t flags, uintptr_t dc_flags)
{
	pthread_priority_t pp = 0;
	dc->dc_flags = dc_flags | DC_FLAG_ALLOCATED;
	dc->dc_func = f;
	dc->dc_ctxt = ctxt;
	// in this context DISPATCH_BLOCK_HAS_PRIORITY means that the priority
	// should not be propagated, only taken from the handler if it has one
	if (!(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
		pp = _dispatch_priority_propagate();
	}
	_dispatch_continuation_voucher_set(dc, flags);
	return _dispatch_continuation_priority_set(dc, dqu, pp, flags);
}
複製代碼

異步函數的任務保存找到了,那它的任務又是什麼時候執行的呢?以及線程是什麼時候建立的?

2. 線程的建立

2.1 _dispatch_continuation_async
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
		dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
	if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
		_dispatch_trace_item_push(dqu, dc);
	}
#else
	(void)dc_flags;
#endif
	return dx_push(dqu._dq, dc, qos);
}
複製代碼
2.1 dx_push...

以前已經分析過了,至於爲何要用_dispatch_root_queue_push研究——由於它最基本,省去了旁枝末節

dx_push->dq_push->_dispatch_root_queue_push->_dispatch_root_queue_push_inline->_dispatch_root_queue_poke->_dispatch_root_queue_poke_slow

2.2 _dispatch_root_queue_poke_slow
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
    ...
    // floor 爲 0,remaining 是根據隊列任務的狀況處理的
    int can_request, t_count;
    // 獲取線程池的大小
    t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
    do {
        // 計算能夠請求的數量
        can_request = t_count < floor ? 0 : t_count - floor;
        if (remaining > can_request) {
    	    _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
    	    		remaining, can_request);
    	    os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
    	    remaining = can_request;
        }
        if (remaining == 0) {
            // 線程池滿了,就會報出異常的狀況
            _dispatch_root_queue_debug("pthread pool is full for root queue: "
    				"%p", dq);
                return;
    	}
    } while (!os_atomic_cmpxchgvw2o(dq, dgq_thread_pool_size, t_count,
    		t_count - remaining, &t_count, acquire));
    
    pthread_attr_t *attr = &pqc->dpq_thread_attr;
    pthread_t tid, *pthr = &tid;
    #if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
    if (unlikely(dq == &_dispatch_mgr_root_queue)) {
    	pthr = _dispatch_mgr_root_queue_init();
    }
    #endif
    do {
    	_dispatch_retain(dq); 
    	// 開闢線程
    	while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
    		if (r != EAGAIN) {
    			(void)dispatch_assume_zero(r);
    		}
    		_dispatch_temporary_resource_shortage();
    	}
    } while (--remaining);
    #else
    (void)floor;
    #endif // DISPATCH_USE_PTHREAD_POOL
}
複製代碼
  • 第一個do-while是對核心線程數的判斷、操做等等

  • 第二個do-while調用pthread_create建立線程(底層仍是用了pthread

3.任務的執行

任務的執行其實剛纔已經講過了

_dispatch_root_queues_init->dispatch_once_f->_dispatch_once_callout->_dispatch_client_callout

只不過任務在等待線程的狀態,而線程怎麼執行任務就不得而知了

4.一圖看懂異步函數的執行流程

6、信號量的原理

信號量的基本使用是這樣的,底層又是怎麼個原理呢?

dispatch_semaphore_t sem = dispatch_semaphore_create(0);
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
dispatch_semaphore_signal(sem);
複製代碼

1.dispatch_semaphore_create

只是初始化dispatch_semaphore_t,內部進行傳值保存(value必須大於0)

dispatch_semaphore_t
dispatch_semaphore_create(long value)
{
	dispatch_semaphore_t dsema;

	// If the internal value is negative, then the absolute of the value is
	// equal to the number of waiting threads. Therefore it is bogus to
	// initialize the semaphore with a negative value.
	if (value < 0) {
		return DISPATCH_BAD_INPUT;
	}

	dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
			sizeof(struct dispatch_semaphore_s));
	dsema->do_next = DISPATCH_OBJECT_LISTLESS;
	dsema->do_targetq = _dispatch_get_default_queue(false);
	dsema->dsema_value = value;
	_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
	dsema->dsema_orig = value;
	return dsema;
}
複製代碼

2.dispatch_semaphore_signal

相似KVC形式從底層取得當前信號量的value值,而且這個函數是有返回值的

long dispatch_semaphore_signal(dispatch_semaphore_t dsema) {
	long value = os_atomic_inc2o(dsema, dsema_value, release);
	if (likely(value > 0)) {
		return 0;
	}
	if (unlikely(value == LONG_MIN)) {
		DISPATCH_CLIENT_CRASH(value,
				"Unbalanced call to dispatch_semaphore_signal()");
	}
	return _dispatch_semaphore_signal_slow(dsema);
}

DISPATCH_NOINLINE
long
_dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
{
	_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
	_dispatch_sema4_signal(&dsema->dsema_sema, 1);
	return 1;
}
複製代碼

其實最核心的點在於os_atomic_inc2o進行了++操做

#define os_atomic_inc2o(p, f, m) \ os_atomic_add2o(p, f, 1, m)
#define os_atomic_add2o(p, f, v, m) \ os_atomic_add(&(p)->f, (v), m)
#define os_atomic_add(p, v, m) \ _os_atomic_c11_op((p), (v), m, add, +)
複製代碼

3.dispatch_semaphore_wait

同理dispatch_semaphore_wait也是取value值,並返回對應結果

  • value>=0就馬上返回
  • value<0根據等待時間timeout做出不一樣操做
    • DISPATCH_TIME_NOWvalue加一(也就是變爲0)——爲了抵消 wait 函數一開始的減一操做,並返回KERN_OPERATION_TIMED_OUT表示因爲等待時間超時
    • DISPATCH_TIME_FOREVER調用系統的semaphore_wait方法繼續等待,直到收到signal調用
    • 默認狀況DISPATCH_TIME_FOREVER相似,不過須要指定一個等待時間
long dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout) {
	long value = os_atomic_dec2o(dsema, dsema_value, acquire);
	if (likely(value >= 0)) {
		return 0;
	}
	return _dispatch_semaphore_wait_slow(dsema, timeout);
}

DISPATCH_NOINLINE
static long
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
		dispatch_time_t timeout)
{
	long orig;

	_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
	switch (timeout) {
	default:
		if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
			break;
		}
		// Fall through and try to undo what the fast path did to
		// dsema->dsema_value
	case DISPATCH_TIME_NOW:
		orig = dsema->dsema_value;
		while (orig < 0) {
			if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
					&orig, relaxed)) {
				return _DSEMA4_TIMEOUT();
			}
		}
		// Another thread called semaphore_signal().
		// Fall through and drain the wakeup.
	case DISPATCH_TIME_FOREVER:
		_dispatch_sema4_wait(&dsema->dsema_sema);
		break;
	}
	return 0;
}
複製代碼

os_atomic_dec2o進行了--操做

#define os_atomic_dec2o(p, f, m) \ os_atomic_sub2o(p, f, 1, m)
#define os_atomic_sub2o(p, f, v, m) \ os_atomic_sub(&(p)->f, (v), m)
#define os_atomic_sub(p, v, m) \ _os_atomic_c11_op((p), (v), m, sub, -)
複製代碼

7、調度組的原理

調度組的基本使用以下

dispatch_group_t group = dispatch_group_create();
dispatch_group_enter(group);
dispatch_group_leave(group);
dispatch_group_async(group, queue, ^{});
dispatch_group_notify(group, queue, ^{});
複製代碼

1.dispatch_group_create

跟其餘GCD對象同樣使用_dispatch_object_alloc生成dispatch_group_t

os_atomic_store2o能夠看出group底層也維護了一個value

dispatch_group_t
dispatch_group_create(void)
{
	return _dispatch_group_create_with_count(0);
}

DISPATCH_ALWAYS_INLINE
static inline dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
	dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
			sizeof(struct dispatch_group_s));
	dg->do_next = DISPATCH_OBJECT_LISTLESS;
	dg->do_targetq = _dispatch_get_default_queue(false);
	if (n) {
		os_atomic_store2o(dg, dg_bits,
				-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
		os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
	}
	return dg;
}
複製代碼

2.dispatch_group_enter & dispatch_group_leave

這兩個API與信號量的使用大同小異,os_atomic_sub_orig2oos_atomic_add_orig2o負責--++操做,若是不成對使用則會出錯

  • dispatch_group_leave出組會對state進行更新
  • 所有出組了會調用_dispatch_group_wake
void dispatch_group_enter(dispatch_group_t dg) {
	// The value is decremented on a 32bits wide atomic so that the carry
	// for the 0 -> -1 transition is not propagated to the upper 32bits.
	uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
			DISPATCH_GROUP_VALUE_INTERVAL, acquire);
	uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
	if (unlikely(old_value == 0)) {
		_dispatch_retain(dg); // <rdar://problem/22318411>
	}
	if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
		DISPATCH_CLIENT_CRASH(old_bits,
				"Too many nested calls to dispatch_group_enter()");
	}
}

void dispatch_group_leave(dispatch_group_t dg) {
	// The value is incremented on a 64bits wide atomic so that the carry for
	// the -1 -> 0 transition increments the generation atomically.
	uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
			DISPATCH_GROUP_VALUE_INTERVAL, release);
	uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);

	if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
		old_state += DISPATCH_GROUP_VALUE_INTERVAL;
		
		do {
			new_state = old_state;
			if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
				new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
				new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
			} else {
				// If the group was entered again since the atomic_add above,
				// we can't clear the waiters bit anymore as we don't know for
				// which generation the waiters are for
				new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
			}
			if (old_state == new_state) break;
		} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
				old_state, new_state, &old_state, relaxed)));
		
		return _dispatch_group_wake(dg, old_state, true);
	}

	if (unlikely(old_value == 0)) {
		DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
				"Unbalanced call to dispatch_group_leave()");
	}
}
複製代碼

3.dispatch_group_async

  • _dispatch_continuation_init_f保存任務(相似異步函數)
  • 調用_dispatch_continuation_group_async
void dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
	dispatch_continuation_t dc = _dispatch_continuation_alloc();
	uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
	dispatch_qos_t qos;

	qos = _dispatch_continuation_init_f(dc, dq, ctxt, func, 0, dc_flags);
	_dispatch_continuation_group_async(dg, dq, dc, qos);
}
複製代碼

調用dispatch_group_enter進組

static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_continuation_t dc, dispatch_qos_t qos)
{
	dispatch_group_enter(dg);
	dc->dc_data = dg;
	_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
複製代碼

進組了以後須要調用出組,也就是執行完任務會出組

_dispatch_continuation_invoke_inline若是是group形式就會調用_dispatch_continuation_with_group_invoke來出組

4.dispatch_group_wait

dispatch_group_wait與信號量也是殊途同歸

dispatch_group_create建立調度組的時候保存了一個value

  • 若是當前value和原始value相同,代表任務已經所有完成,直接返回0
  • 若是timeout爲 0 也會馬上返回,不然調用 _dispatch_group_wait_slow
    • _dispatch_group_wait_slow會一直等到任務完成返回 0
    • 一直沒有完成會返回timeout
long dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout) {
    uint64_t old_state, new_state;
    
    os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, relaxed, {
    	if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
    		os_atomic_rmw_loop_give_up_with_fence(acquire, return 0);
    	}
    	if (unlikely(timeout == 0)) {
    		os_atomic_rmw_loop_give_up(return _DSEMA4_TIMEOUT());
    	}
    	new_state = old_state | DISPATCH_GROUP_HAS_WAITERS;
    	if (unlikely(old_state & DISPATCH_GROUP_HAS_WAITERS)) {
    		os_atomic_rmw_loop_give_up(break);
    	}
    });
    
    return _dispatch_group_wait_slow(dg, _dg_state_gen(new_state), timeout);
}

複製代碼

5.dispatch_group_notify

等待_dispatch_group_wake回調(所有出組會調用)

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_continuation_t dsn)
{
	uint64_t old_state, new_state;
	dispatch_continuation_t prev;

	dsn->dc_data = dq;
	_dispatch_retain(dq);

	prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
	if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
	os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
	if (os_mpsc_push_was_empty(prev)) {
		os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
			new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
			if ((uint32_t)old_state == 0) {
				os_atomic_rmw_loop_give_up({
					return _dispatch_group_wake(dg, new_state, false);
				});
			}
		});
	}
}

static void
_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
    uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
    
    if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
    	dispatch_continuation_t dc, next_dc, tail;
    
    	// Snapshot before anything is notified/woken
    	dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
    	do {
    		dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
    		next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
    		_dispatch_continuation_async(dsn_queue, dc,
    				_dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
    		_dispatch_release(dsn_queue);
    	} while ((dc = next_dc));
    
    	refs++;
    }
    if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
    	_dispatch_wake_by_address(&dg->dg_gen);
    }
    if (refs) _dispatch_release_n(dg, refs);
}
複製代碼

7、單例的原理

#define DLOCK_ONCE_UNLOCKED ((uintptr_t)0)
void dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func) {
    dispatch_once_gate_t l = (dispatch_once_gate_t)val;
    #if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
    if (likely(v == DLOCK_ONCE_DONE)) {
    	return;
    }
    #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    if (likely(DISPATCH_ONCE_IS_GEN(v))) {
    	return _dispatch_once_mark_done_if_quiesced(l, v);
    }
    #endif
    #endif
    if (_dispatch_once_gate_tryenter(l)) {
    	return _dispatch_once_callout(l, ctxt, func);
    }
    return _dispatch_once_wait(l);
}

DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
	return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
			(uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}
複製代碼
  • 第一次調用時外部傳進來的onceToken爲空,因此val爲 NULL
    • _dispatch_once_gate_tryenter(l)判斷l->dgo_once是否標記爲DLOCK_ONCE_UNLOCKED(是否存儲過)
    • DLOCK_ONCE_UNLOCKED=0,因此if 判斷是成立的,就會進行block回調
    • 再經過_dispatch_once_gate_broadcastl->dgo_once標記爲DLOCK_ONCE_DONE
  • 第二次進來就會直接返回,保證代碼只執行一次

寫在後面

關於上一篇文章中的dispatch_barrier_async爲何使用全局隊列無效能夠看深刻淺出 GCD 之 dispatch_queue

GCD源碼還真不是通常的晦澀難懂,筆者水平有限,如有錯誤之處煩請指出

相關文章
相關標籤/搜索