通過前幾章的學習,咱們對
GCD
的使用和隊列的原理有了基本的瞭解,可是GCD
底層究竟是如何開闢線程,如何執行函數等還不是很清楚,本章就來一探究竟。git
系列文章傳送門:github
☞ iOS底層學習 - 多線程之GCD初探swift
☞ iOS底層學習 - 多線程之GCD隊列原理篇bash
☞ iOS底層學習 - 多線程之GCD應用篇markdown
對於GCD
的底層來講,主要有隊列建立,函數執行,同步異步原理和其餘應用函數的原理。關於隊列原理的,咱們以前的篇章已經講過,相信對於GCD
是如何建立隊列的,已經有了認識,今天就來繼續看其餘的底層原理,仍是經過源碼來深刻研究多線程
dispatch_sync
咱們都知道,當使用dispatch_sync
在串行隊列上執行時,會造成dispatch_sync
塊任務和內部執行任務的相互等待,從而形成死鎖崩潰。那麼我就從這個問題來觸發,看一下爲何會形成死鎖,從而瞭解同步執行的原理併發
仍是老規矩,talk is cheap,show me the codeapp
咱們經過源碼找到了dispatch_sync
的調用以下,因爲unlikely
通常運行的較少,多爲容錯處理,因此咱們先跟主流程,最終來到了函數_dispatch_sync_f_inline
中less
void dispatch_sync(dispatch_queue_t dq, dispatch_block_t work) { uintptr_t dc_flags = DC_FLAG_BLOCK; if (unlikely(_dispatch_block_has_private_data(work))) { return _dispatch_sync_block_with_privdata(dq, work, dc_flags); } _dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags); } ----------------------------------------------------------------------------------------- static void _dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, uintptr_t dc_flags) { _dispatch_sync_f_inline(dq, ctxt, func, dc_flags); } ----------------------------------------------------------------------------------------- 複製代碼
在_dispatch_sync_f_inline
中發現了一個判斷likely(dq->dq_width == 1
,經過以前隊列的原理咱們能夠知道,串行隊列的width
是爲1
的,因此串行的執行方法,是在_dispatch_barrier_sync_f
中的。
並且根據函數名,咱們能夠知道_dispatch_barrier
是以前講的柵欄函數的調用,因此說柵欄函數也會走到此方法中。
因爲咱們先找死鎖的緣由,因此在這裏就先不看下面併發的邏輯了。
DISPATCH_ALWAYS_INLINE static inline void _dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, uintptr_t dc_flags) { ✅// 串行 來到這裏 if (likely(dq->dq_width == 1)) { return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags); } if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) { DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync"); } dispatch_lane_t dl = upcast(dq)._dl; // Global concurrent queues and queues bound to non-dispatch threads // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) { return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags); } if (unlikely(dq->do_targetq->do_targetq)) { return _dispatch_sync_recurse(dl, ctxt, func, dc_flags); } _dispatch_introspection_sync_begin(dl); _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG( _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags))); } ----------------------------------------------------------------------------------------- static void _dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, uintptr_t dc_flags) { _dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags); } 複製代碼
最終,咱們來到了_dispatch_barrier_sync_f_inline
函數中。
首先執行了_dispatch_tid_self
方法。經過源碼跟蹤,咱們能夠發現其爲宏定義的方法,底層主要執行了_dispatch_thread_getspecific
。這個函數書主要是經過KeyValue
的方式來獲取線程的一些信息。在這裏就是獲取當前線程的tid
,即惟一ID。
咱們知道,形成死鎖的緣由就是串行隊列上任務的相互等待。那麼必然會經過tid
來判斷是否知足條件,從而找到了_dispatch_queue_try_acquire_barrier_sync
函數
#define _dispatch_tid_self() ((dispatch_tid)_dispatch_thread_port()) #define _dispatch_thread_port() ((mach_port_t)(uintptr_t)\ _dispatch_thread_getspecific(_PTHREAD_TSD_SLOT_MACH_THREAD_SELF)) 複製代碼
DISPATCH_ALWAYS_INLINE static inline void _dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, uintptr_t dc_flags) { ✅// 獲取線程ID -- mach pthread -- dispatch_tid tid = _dispatch_tid_self(); if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) { DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync"); } dispatch_lane_t dl = upcast(dq)._dl; // The more correct thing to do would be to merge the qos of the thread // that just acquired the barrier lock into the queue state. // // However this is too expensive for the fast path, so skip doing it. // The chosen tradeoff is that if an enqueue on a lower priority thread // contends with this fast path, this thread may receive a useless override. // // Global concurrent queues and queues bound to non-dispatch threads // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE ✅// 死鎖 if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) { return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl, DC_FLAG_BARRIER | dc_flags); } if (unlikely(dl->do_targetq->do_targetq)) { return _dispatch_sync_recurse(dl, ctxt, func, DC_FLAG_BARRIER | dc_flags); } _dispatch_introspection_sync_begin(dl); _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop( dq, ctxt, func, dc_flags | DC_FLAG_BARRIER))); } 複製代碼
在函數_dispatch_queue_try_acquire_barrier_sync_and_suspend
中,從該函數咱們能夠知道,經過os_atomic_rmw_loop2o
函數回調,從OS底層獲取到了狀態信息,並返回。
DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT static inline bool _dispatch_queue_try_acquire_barrier_sync(dispatch_queue_class_t dq, uint32_t tid) { return _dispatch_queue_try_acquire_barrier_sync_and_suspend(dq._dl, tid, 0); } ----------------------------------------------------------------------------------------- DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT static inline bool _dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_lane_t dq, uint32_t tid, uint64_t suspend_count) { uint64_t init = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width); uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER | _dispatch_lock_value_from_tid(tid) | (suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL); uint64_t old_state, new_state; ✅// 從底層獲取信息 -- 狀態信息 - 當前隊列 - 線程 return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, { uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK; if (old_state != (init | role)) { os_atomic_rmw_loop_give_up(break); } new_state = value | role; }); } 複製代碼
那麼返回以後,就執行了_dispatch_sync_f_slow
函數。經過下圖崩潰堆棧咱們也能夠從側方面驗證。
其中經過源碼能夠發現,首先是生成了一些任務的信息,而後經過_dispatch_trace_item_push
來進行壓棧操做,從而存放在咱們的同步隊列中(FIFO),從而實現函數的執行。
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt, dispatch_function_t func, uintptr_t top_dc_flags, dispatch_queue_class_t dqu, uintptr_t dc_flags) { ... pthread_priority_t pp = _dispatch_get_priority(); struct dispatch_sync_context_s dsc = { .dc_flags = DC_FLAG_SYNC_WAITER | dc_flags, .dc_func = _dispatch_async_and_wait_invoke, .dc_ctxt = &dsc, .dc_other = top_dq, .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG, .dc_voucher = _voucher_get(), .dsc_func = func, .dsc_ctxt = ctxt, .dsc_waiter = _dispatch_tid_self(), }; _dispatch_trace_item_push(top_dq, &dsc); __DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq); ... } 複製代碼
那麼產生死鎖的主要檢測就再__DISPATCH_WAIT_FOR_QUEUE__
這個函數中了,經過查看函數,發現它會獲取到隊列的狀態,看其是否爲等待狀態,而後調用_dq_state_drain_locked_by
中的異或運算,判斷隊列和線程的等待狀態,若是二者都在等待,那麼就會返回YES
,從而形成死鎖的崩潰。
static void __DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq) { // 獲取隊列的狀態,看是不是處於等待狀態 uint64_t dq_state = _dispatch_wait_prepare(dq); if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) { DISPATCH_CLIENT_CRASH((uintptr_t)dq_state, "dispatch_sync called on queue " "already owned by current thread"); } ... } ----------------------------------------------------------------------------------------- static inline bool _dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid) { // lock_value 爲隊列狀態,tid 爲線程 id // ^ (異或運算法) 兩個相同就會出現 0 不然爲1 return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0; } 複製代碼
小結一下
_dispatch_sync
首先獲取當前線程的tid
status
block
任務的執行對於同步任務的block
執行,咱們在繼續跟進以前的源碼_dispatch_sync
源碼中_dispatch_barrier_sync_f_inline
函數,觀看其函數實現,函數的執行主要是在_dispatch_client_callout
方法中。
DISPATCH_NOINLINE static void _dispatch_lane_barrier_sync_invoke_and_complete(dispatch_lane_t dq, void *ctxt, dispatch_function_t func DISPATCH_TRACE_ARG(void *dc)) { _dispatch_sync_function_invoke_inline(dq, ctxt, func); ... } ----------------------------------------------------------------------------------------- DISPATCH_ALWAYS_INLINE static inline void _dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt, dispatch_function_t func) { dispatch_thread_frame_s dtf; _dispatch_thread_frame_push(&dtf, dq); // f(ctxt) -- func(ctxt) _dispatch_client_callout(ctxt, func); _dispatch_perfmon_workitem_inc(); _dispatch_thread_frame_pop(&dtf); } 複製代碼
查看_dispatch_client_callout
方法,裏面果真有函數的調用f(ctxt);
至此,同步函數的block
調用完成
_dispatch_client_callout(void *ctxt, dispatch_function_t f) { _dispatch_get_tsd_base(); void *u = _dispatch_get_unwind_tsd(); if (likely(!u)) return f(ctxt); _dispatch_set_unwind_tsd(NULL); f(ctxt); _dispatch_free_unwind_tsd(); _dispatch_set_unwind_tsd(u); } 複製代碼
小結一下
同步函數的block調用步驟
dispatch_sync
└──_dispatch_barrier_sync_f_inline
└──_dispatch_sync_invoke_and_complete
└──_dispatch_sync_function_invoke_inline
└──_dispatch_client_callout
└──f(ctxt);
複製代碼
dispatch_async
看完了同步執行的相關源碼,下面咱們來看異步的執行就簡單多了。
查看其源碼,主要執行了兩個函數_dispatch_continuation_init
和_dispatch_continuation_async
,下面咱們一個個來看一下
void dispatch_async(dispatch_queue_t dq, dispatch_block_t work) { dispatch_continuation_t dc = _dispatch_continuation_alloc(); uintptr_t dc_flags = DC_FLAG_CONSUME; dispatch_qos_t qos; qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags); _dispatch_continuation_async(dq, dc, qos, dc->dc_flags); } 複製代碼
_dispatch_continuation_init
經過源碼咱們可知,這個函數dispatch_qos_t
這個對象,裏面的實現必然是對其進行初始化賦值的操做。
經過_dispatch_Block_invoke
的宏定義,咱們能夠發現其對傳入的dispatch_block_t
回調參數進行了封裝。
DISPATCH_ALWAYS_INLINE static inline dispatch_qos_t _dispatch_continuation_init(dispatch_continuation_t dc, dispatch_queue_class_t dqu, dispatch_block_t work, dispatch_block_flags_t flags, uintptr_t dc_flags) { void *ctxt = _dispatch_Block_copy(work); dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED; if (unlikely(_dispatch_block_has_private_data(work))) { dc->dc_flags = dc_flags; dc->dc_ctxt = ctxt; // will initialize all fields but requires dc_flags & dc_ctxt to be set return _dispatch_continuation_init_slow(dc, dqu, flags); } dispatch_function_t func = _dispatch_Block_invoke(work); if (dc_flags & DC_FLAG_CONSUME) { func = _dispatch_call_block_and_release; } return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags); } ----------------------------------------------------------------------------------------- #define _dispatch_Block_invoke(bb) \ ((dispatch_function_t)((struct Block_layout *)bb)->invoke) 複製代碼
最終的封裝會在_dispatch_continuation_init_f
中,其代碼也很是的簡單,仍舊是函數式保存的賦值的相關操做,對回調等也進行了封裝保存。
而進行封裝保存的意義也很簡單:由於異步須要在合適的時機進行線程回調block
DISPATCH_ALWAYS_INLINE static inline dispatch_qos_t _dispatch_continuation_init_f(dispatch_continuation_t dc, dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f, dispatch_block_flags_t flags, uintptr_t dc_flags) { pthread_priority_t pp = 0; dc->dc_flags = dc_flags | DC_FLAG_ALLOCATED; dc->dc_func = f; dc->dc_ctxt = ctxt; // in this context DISPATCH_BLOCK_HAS_PRIORITY means that the priority // should not be propagated, only taken from the handler if it has one if (!(flags & DISPATCH_BLOCK_HAS_PRIORITY)) { pp = _dispatch_priority_propagate(); } _dispatch_continuation_voucher_set(dc, flags); return _dispatch_continuation_priority_set(dc, dqu, pp, flags); } 複製代碼
_dispatch_continuation_async
咱們知道了上一步對信息進行函數式封裝,那麼對於一個異步執行來講,最重要的就是什麼時候建立線程和函數執行呢,那麼就再這個方法裏面了。
查看方法,發現實現很是的簡單,可是越簡單的東西,其內裏就越複雜。這個方法主要就是執行了dx_push
方法,查看其代碼,發現爲宏定義,主要執行了dq_push
方法.
DISPATCH_ALWAYS_INLINE static inline void _dispatch_continuation_async(dispatch_queue_class_t dqu, dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags) { #if DISPATCH_INTROSPECTION if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) { _dispatch_trace_item_push(dqu, dc); } #else (void)dc_flags; #endif return dx_push(dqu._dq, dc, qos); } ----------------------------------------------------------------------------------------- #define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z) 複製代碼
那麼dq_push
又是怎麼賦值的呢,因爲其是一個屬性,因此咱們能夠搜索.dq_pus
來查看其賦值。咱們發現其賦值的地方很是多,可是大致的意思咱們能夠理解,就是主要在根隊列,自定義隊列,主隊列等等進行push
操做的時候調用。
咱們知道線程的建立通常都是在根隊列上進行建立的,因此咱們直接找根隊列的dq_push
賦值,這樣比較快速,固然其餘的也能夠,最終都會走到這裏。
咱們發現_dispatch_root_queue_push
方法最終會調用_dispatch_root_queue_push_inline
方法,而_dispatch_root_queue_push_inline
方法最終又會調用_dispatch_root_queue_poke
。
_dispatch_root_queue_poke
這個函數主要進行了一些容錯的判斷,最終走到了_dispatch_root_queue_poke_slow
相關的方法裏
void _dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou, dispatch_qos_t qos) { #if DISPATCH_USE_KEVENT_WORKQUEUE dispatch_deferred_items_t ddi = _dispatch_deferred_items_get(); if (unlikely(ddi && ddi->ddi_can_stash)) ...一些不重要的操做 ... #endif #if HAVE_PTHREAD_WORKQUEUE_QOS if (_dispatch_root_queue_push_needs_override(rq, qos)) { return _dispatch_root_queue_push_override(rq, dou, qos); } #else (void)qos; #endif _dispatch_root_queue_push_inline(rq, dou, dou, 1); } ----------------------------------------------------------------------------------------- DISPATCH_ALWAYS_INLINE static inline void _dispatch_root_queue_push_inline(dispatch_queue_global_t dq, dispatch_object_t _head, dispatch_object_t _tail, int n) { struct dispatch_object_s *hd = _head._do, *tl = _tail._do; if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) { return _dispatch_root_queue_poke(dq, n, 0); } } ----------------------------------------------------------------------------------------- void _dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor) { if (!_dispatch_queue_class_probe(dq)) { return; } #if !DISPATCH_USE_INTERNAL_WORKQUEUE #if DISPATCH_USE_PTHREAD_POOL if (likely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)) #endif { if (unlikely(!os_atomic_cmpxchg2o(dq, dgq_pending, 0, n, relaxed))) { _dispatch_root_queue_debug("worker thread request still pending " "for global queue: %p", dq); return; } } #endif // !DISPATCH_USE_INTERNAL_WORKQUEUE return _dispatch_root_queue_poke_slow(dq, n, floor); } 複製代碼
_dispatch_root_queue_poke_slow
這個方法就是異步執行的主要方法,建立線程也是在此,因爲代碼比較長,咱們仍是尋找代碼中的關鍵節點來說。
DISPATCH_NOINLINE static void _dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor) { ... ✅//隊列初始化,runtime強轉等操做,防止類型沒法匹配等狀況 _dispatch_root_queues_init(); _dispatch_debug_root_queue(dq, __func__); _dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n); ... int can_request, t_count; // seq_cst with atomic store to tail <rdar://problem/16932833> ✅// 獲取線程池的大小 t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered); do { ✅// 計算能夠請求的數量 can_request = t_count < floor ? 0 : t_count - floor; if (remaining > can_request) { _dispatch_root_queue_debug("pthread pool reducing request from %d to %d", remaining, can_request); os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed); remaining = can_request; } if (remaining == 0) { // 線程池無可用將會報錯 _dispatch_root_queue_debug("pthread pool is full for root queue: " "%p", dq); return; } } while (!os_atomic_cmpxchgvw2o(dq, dgq_thread_pool_size, t_count, t_count - remaining, &t_count, acquire)); pthread_attr_t *attr = &pqc->dpq_thread_attr; pthread_t tid, *pthr = &tid; #if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES if (unlikely(dq == &_dispatch_mgr_root_queue)) { pthr = _dispatch_mgr_root_queue_init(); } #endif do { _dispatch_retain(dq); // released in _dispatch_worker_thread ✅✅✅//開闢線程✅✅✅ while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) { if (r != EAGAIN) { (void)dispatch_assume_zero(r); } _dispatch_temporary_resource_shortage(); } } while (--remaining); #else (void)floor; #endif // DISPATCH_USE_PTHREAD_POOL } ----------------------------------------------------------------------------------------- #define _dispatch_trace_runtime_event(evt, ptr, value) \ _dispatch_introspection_runtime_event(\ dispatch_introspection_runtime_event_##evt, ptr, value) 複製代碼
根據代碼能夠知道,系統會獲取線程池總數量和能夠建立的數量,而後經過兩個do while
來進行動態的開闢線程。
dispatch_once
經過dispatch_once
函數查看其底層調用,能夠發現其最終調用到dispatch_once_f
方法中。相關的代碼以下。
val
一開始爲NULL
,並將其轉換爲dispatch_once_gate_t
_dispatch_once_gate_tryenter
源碼,咱們知道其在OS底層經過判斷l->dgo_once
是否爲DLOCK_ONCE_UNLOCKED
狀態_dispatch_once_callout
函數。執行對應的block,而後將l->dgo_once
置爲DLOCK_ONCE_DONE
,從而保證了只執行一次DISPATCH_NOINLINE void dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func) { // 若是你來過一次 -- 下次就不來 dispatch_once_gate_t l = (dispatch_once_gate_t)val; //DLOCK_ONCE_DONE #if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER uintptr_t v = os_atomic_load(&l->dgo_once, acquire); if (likely(v == DLOCK_ONCE_DONE)) { return; } #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER if (likely(DISPATCH_ONCE_IS_GEN(v))) { return _dispatch_once_mark_done_if_quiesced(l, v); } #endif #endif // 知足條件 -- 試圖進去 if (_dispatch_once_gate_tryenter(l)) { // 單利調用 -- v->DLOCK_ONCE_DONE return _dispatch_once_callout(l, ctxt, func); } return _dispatch_once_wait(l); } ----------------------------------------------------------------------------------------- DISPATCH_ALWAYS_INLINE static inline bool _dispatch_once_gate_tryenter(dispatch_once_gate_t l) { // os 對象是否存儲過 // unlock return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED, (uintptr_t)_dispatch_lock_value_for_self(), relaxed); } ----------------------------------------------------------------------------------------- _dispatch_once_callout(dispatch_once_gate_t l, void *ctxt, dispatch_function_t func) { // block() _dispatch_client_callout(ctxt, func); _dispatch_once_gate_broadcast(l); } 複製代碼
dispatch_semaphore
dispatch_semaphore_create
這個方法比較明確,就是函數式保存,轉換成了dispatch_semaphore_t
對象。信號量的處理都是基於此對象來進行的。
dispatch_semaphore_t dispatch_semaphore_create(long value) { dispatch_semaphore_t dsema; // 若是 value 小於 0 直接返回 0 if (value < 0) { return DISPATCH_BAD_INPUT; } dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore), sizeof(struct dispatch_semaphore_s)); dsema->do_next = DISPATCH_OBJECT_LISTLESS; dsema->do_targetq = _dispatch_get_default_queue(false); dsema->dsema_value = value; _dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); dsema->dsema_orig = value; return dsema; } 複製代碼
dispatch_semaphore_wait
wait
函數主要進行了3步操做:
os_atomic_dec2o
宏。經過對這個宏的查看,咱們發現其就是一個對dsema
進行原子性的-1
操做value
是否>= 0
,若是知足條件,則不阻塞,直接執行_dispatch_semaphore_wait_slow
。經過源碼,咱們能夠發現其對timeout
的參數進行了分別的處理long dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout) { long value = os_atomic_dec2o(dsema, dsema_value, acquire); if (likely(value >= 0)) { return 0; } return _dispatch_semaphore_wait_slow(dsema, timeout); } 複製代碼
#define os_atomic_dec2o(p, f, m) \ os_atomic_sub2o(p, f, 1, m) #define os_atomic_sub2o(p, f, v, m) \ os_atomic_sub(&(p)->f, (v), m) #define os_atomic_sub(p, v, m) \ _os_atomic_c11_op((p), (v), m, sub, -) 複製代碼
_dispatch_semaphore_wait_slow
函數的處理以下:
default:
主要調用了_dispatch_sema4_timedwait
方法,這個方法主要是判斷當前的操做是否超過指定的超時時間。DISPATCH_TIME_NOW
中的while
是必定會執行的,若是不知足條件,已經在以前的操做跳出了,不會執行到此。if
操做調用os_atomic_cmpxchgvw2o
,會將value
進行+1,跳出阻塞,並返回_DSEMA4_TIMEOUT
超時DISPATCH_TIME_FOREVER
中即調用_dispatch_sema4_wait
,表示會一直阻塞,知道等到single
加1變爲0爲止,跳出阻塞DISPATCH_NOINLINE static long _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout) { long orig; _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); switch (timeout) { default: if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) { break; } // Fall through and try to undo what the fast path did to // dsema->dsema_value case DISPATCH_TIME_NOW: orig = dsema->dsema_value; while (orig < 0) { if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1, &orig, relaxed)) { return _DSEMA4_TIMEOUT(); } } // Another thread called semaphore_signal(). // Fall through and drain the wakeup. case DISPATCH_TIME_FOREVER: _dispatch_sema4_wait(&dsema->dsema_sema); break; } return 0; } 複製代碼
dispatch_semaphore_signal
瞭解了wait
以後,對signal
的理解也很簡單。os_atomic_inc2o
宏定義就是對dsema
進行原子性+1
的操做,若是大於0,則繼續執行。
long dispatch_semaphore_signal(dispatch_semaphore_t dsema) { // 取值 + 1 == 0 + 1 = 1 long value = os_atomic_inc2o(dsema, dsema_value, release); if (likely(value > 0)) { return 0; } if (unlikely(value == LONG_MIN)) { DISPATCH_CLIENT_CRASH(value, "Unbalanced call to dispatch_semaphore_signal()"); } return _dispatch_semaphore_signal_slow(dsema); } 複製代碼
總結一下信號的底層原理:
信號量在初始化時要指定 value,隨後內部將這個 value 進行函數式保存。實際操做時會存兩個 value,一個是當前的value,一個是記錄初始 value。信號的 wait 和 signal 是互逆的兩個操做,wait進行減1的操做,single進行加1的操做。初始 value 必須大於等於 0,若是爲0或者小於0 並隨後調用 wait 方法,線程將被阻塞直到別的線程調用了 signal 方法
dispatch_group
其實dispatch_group
的相關函數的底層原理和信號量的底層原理的思想是同樣的。都是在底層維護了一個value
的值,進組和出組操做時,對value
的值進行操做,達到0這個臨界值的時候,進行後續的操做。
dispatch_group_create
和信號量相似,建立組後,對其進行了函數式保存dispatch_group_t
,並經過os_atomic_store2o
宏定義,內部維護了一個value
的值
dispatch_group_t dispatch_group_create(void) { return _dispatch_group_create_with_count(0); } ----------------------------------------------------------------------------------------- DISPATCH_ALWAYS_INLINE static inline dispatch_group_t _dispatch_group_create_with_count(uint32_t n) { dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group), sizeof(struct dispatch_group_s)); dg->do_next = DISPATCH_OBJECT_LISTLESS; dg->do_targetq = _dispatch_get_default_queue(false); if (n) { os_atomic_store2o(dg, dg_bits, -n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed); os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411> } return dg; } 複製代碼
dispatch_group_enter
經過源碼,咱們能夠知道進組操做,主要是先經過os_atomic_sub_orig2o
宏定義,對bit
進行了原子性減1的操做,而後又經過位運算& DISPATCH_GROUP_VALUE_MASK
得到真正的value
void dispatch_group_enter(dispatch_group_t dg) { // The value is decremented on a 32bits wide atomic so that the carry // for the 0 -> -1 transition is not propagated to the upper 32bits. uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits, DISPATCH_GROUP_VALUE_INTERVAL, acquire); uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK; if (unlikely(old_value == 0)) { _dispatch_retain(dg); // <rdar://problem/22318411> } if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) { DISPATCH_CLIENT_CRASH(old_bits, "Too many nested calls to dispatch_group_enter()"); } } 複製代碼
dispatch_group_leave
出組的操做即經過os_atomic_add_orig2o
的對值進行原子性的加操做,並經過& DISPATCH_GROUP_VALUE_MASK
獲取到真實的value
值。若是新舊兩個值相等,則執行_dispatch_group_wake
操做,進行後續的操做。
void dispatch_group_leave(dispatch_group_t dg) { // The value is incremented on a 64bits wide atomic so that the carry for // the -1 -> 0 transition increments the generation atomically. uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state, DISPATCH_GROUP_VALUE_INTERVAL, release); uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK); if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) { old_state += DISPATCH_GROUP_VALUE_INTERVAL; do { new_state = old_state; if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) { new_state &= ~DISPATCH_GROUP_HAS_WAITERS; new_state &= ~DISPATCH_GROUP_HAS_NOTIFS; } else { // If the group was entered again since the atomic_add above, // we can't clear the waiters bit anymore as we don't know for // which generation the waiters are for new_state &= ~DISPATCH_GROUP_HAS_NOTIFS; } if (old_state == new_state) break; } while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state, old_state, new_state, &old_state, relaxed))); return _dispatch_group_wake(dg, old_state, true); } if (unlikely(old_value == 0)) { DISPATCH_CLIENT_CRASH((uintptr_t)old_value, "Unbalanced call to dispatch_group_leave()"); } } 複製代碼
dispatch_group_async
dispatch_group_async
函數就是對enter
和leave
的封裝。經過代碼能夠看出其和異步調用函數相似,都對block進行的封裝保存。而後再內部執行的時候,手工調用了dispatch_group_enter
和dispatch_group_leave
方法。
void dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq, dispatch_block_t db) { dispatch_continuation_t dc = _dispatch_continuation_alloc(); uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC; dispatch_qos_t qos; // 保存任務 qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags); _dispatch_continuation_group_async(dg, dq, dc, qos); } static inline void _dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq, dispatch_continuation_t dc, dispatch_qos_t qos) { // 進組 dispatch_group_enter(dg); dc->dc_data = dg; _dispatch_continuation_async(dq, dc, qos, dc->dc_flags); } static inline void _dispatch_continuation_with_group_invoke(dispatch_continuation_t dc) { struct dispatch_object_s *dou = dc->dc_data; unsigned long type = dx_type(dou); if (type == DISPATCH_GROUP_TYPE) { _dispatch_client_callout(dc->dc_ctxt, dc->dc_func); _dispatch_trace_item_complete(dc); // 出組 dispatch_group_leave((dispatch_group_t)dou); } else { DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type"); } } 複製代碼
dispatch_group_notify
經過源碼,咱們能夠發現,經過調用os_atomic_rmw_loop2o
在系統內核中獲取到對應的狀態,最終仍是調用到了_dispatch_group_wake
DISPATCH_ALWAYS_INLINE static inline void _dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq, dispatch_continuation_t dsn) { uint64_t old_state, new_state; dispatch_continuation_t prev; dsn->dc_data = dq; _dispatch_retain(dq); prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next); if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg); os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next); if (os_mpsc_push_was_empty(prev)) { os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, { new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS; if ((uint32_t)old_state == 0) { os_atomic_rmw_loop_give_up({ return _dispatch_group_wake(dg, new_state, false); }); } }); } } 複製代碼
_dispatch_group_wake
這個函數主要分爲兩部分,首先循環調用 semaphore_signal
告知喚醒當初等待 group 的信號量,所以 dispatch_group_wait
函數得以返回。
而後獲取鏈表,依次調用 dispatch_async_f
異步執行在 notify
函數中註冊的回調。
DISPATCH_NOINLINE static void _dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release) { uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411> if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) { dispatch_continuation_t dc, next_dc, tail; // Snapshot before anything is notified/woken <rdar://problem/8554546> dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail); do { dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data; next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next); _dispatch_continuation_async(dsn_queue, dc, _dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags); _dispatch_release(dsn_queue); } while ((dc = next_dc)); refs++; } if (dg_state & DISPATCH_GROUP_HAS_WAITERS) { _dispatch_wake_by_address(&dg->dg_gen); } if (refs) _dispatch_release_n(dg, refs); } 複製代碼
dispatch_sync
將任務 block
經過 push
到隊列中,而後按照 FIFO
去執行。dispatch_sync
形成死鎖的主要緣由是堵塞的tid
和如今運行的tid
爲同一個dispatch_async
會把任務包裝並保存,以後就會開闢相應線程去執行已保存的任務。semaphore
主要在底層維護一個value
的值,使用 signal
進行 + +1
,wait
進行-1
。若是value
的值大於或者等於0,則取消阻塞,不然根據timeout
參數進行超時判斷dispatch_group
底層也是維護了一個 value
的值,等待 group
完成實際上就是等待value
恢復初始值。而notify
的做用是將全部註冊的回調組裝成一個鏈表,在 dispatch_async
完成時判斷 value
是否是恢復初始值,若是是則調用dispatch_async
異步執行全部註冊的回調。dispatch_once
經過一個靜態變量來標記 block
是否已被執行,同時使用加鎖確保只有一個線程能執行,執行完 block
後會喚醒其餘全部等待的線程。