筆記-源碼解析之dispatch_once、信號量、調度組

單例dispatch_once

static dispatch_once_t onceToken;
    dispatch_once(&onceToken, ^{
        
    });
複製代碼

這串代碼不用解釋,相信你們都熟悉。如今前往源碼解析bash

typedef long dispatch_once_t;app

這裏的once就是一個long類型,拿到它的指針類型傳入到函數裏。async

跟着源碼走,會進入到這裏:ide

dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
	dispatch_once_gate_t l = (dispatch_once_gate_t)val;

#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
	uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
	if (likely(v == DLOCK_ONCE_DONE)) {
		return;
	}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
	if (likely(DISPATCH_ONCE_IS_GEN(v))) {
		return _dispatch_once_mark_done_if_quiesced(l, v);
	}
#endif
#endif
	if (_dispatch_once_gate_tryenter(l)) {
		return _dispatch_once_callout(l, ctxt, func);
	}
	return _dispatch_once_wait(l);
}
複製代碼

第一句代碼把咱們傳進來的指針強轉成dispatch_once_gate_t,進入這個會發現它是一個結構體,裏面只有一個聯合體。 再往下看的時候,能夠先思考一個問題,單例是如何實現只建立一次的?帶着這個問題,咱們往下走,能夠直接跳轉到最後函數

if (_dispatch_once_gate_tryenter(l)) {
		return _dispatch_once_callout(l, ctxt, func);
	}
複製代碼

從這句代碼能夠看出,這裏是建立它。一步一步來,先看條件oop

_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
	// os 對象是否存儲過
	// unlock
	return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
			(uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}
複製代碼

說實話,懵逼,代碼具體幹嗎的不清楚,查資料後,能夠理解成判斷對象是否在os存儲過。而後接着往下走源碼分析

_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
		dispatch_function_t func)
{
	_dispatch_client_callout(ctxt, func);
	_dispatch_once_gate_broadcast(l);
}
複製代碼

看過我以前文章的小夥伴,看到_dispatch_client_callout()這個應該很熟悉了,這裏就是執行block裏面的內容。建立完以後,就進行下面這個廣播的函數學習

_dispatch_once_gate_broadcast(dispatch_once_gate_t l)
{
	dispatch_lock value_self = _dispatch_lock_value_for_self();
	uintptr_t v;
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
	v = _dispatch_once_mark_quiescing(l);
#else
	v = _dispatch_once_mark_done(l);
#endif
	if (likely((dispatch_lock)v == value_self)) return;
	_dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}
複製代碼

仔細看這段代碼的意思,第一句拿到當前self對象,而後拿到一個v,仔細去看這個v究竟是什麼!!!ui

第一個_dispatch_once_mark_quiescing表示正在建立,這裏標記了一個_dispatch_once_generation()this

_dispatch_once_mark_quiescing(dispatch_once_gate_t dgo)
{
	return os_atomic_xchg(&dgo->dgo_once, _dispatch_once_generation(), release);
}
複製代碼

而後看第二個_dispatch_once_mark_done(),這裏表示標記一個DLOCK_ONCE_DONE

_dispatch_once_mark_done(dispatch_once_gate_t dgo)
{
	return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
}
複製代碼

此時此刻,是否還記得咱們一開始跳過的方法裏的代碼,看下面代碼:

uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
	if (likely(v == DLOCK_ONCE_DONE)) {
		return;
	}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
	if (likely(DISPATCH_ONCE_IS_GEN(v))) {
		return _dispatch_once_mark_done_if_quiesced(l, v);
	}
複製代碼

若是標記了獲取到的v等於DLOCK_ONCE_DONE,就直接返回;若是是下面的,咱們在進入查看一下:

_dispatch_once_mark_done_if_quiesced(dispatch_once_gate_t dgo, uintptr_t gen)
{
	if (_dispatch_once_generation() - gen >= DISPATCH_ONCE_GEN_SAFE_DELTA) {
		/*
		 * See explanation above, when the quiescing counter approach is taken
		 * then this store needs only to be relaxed as it is used as a witness
		 * that the required barriers have happened.
		 */
		os_atomic_store(&dgo->dgo_once, DLOCK_ONCE_DONE, relaxed);
	}
}
複製代碼

不知道走到這裏你們是否明白,單例是如何實現只建立一次的。

信號量dispatch_semaphore_t

// 建立信號量對象 信號量 >= 0
    dispatch_semaphore_t sem = dispatch_semaphore_create(1);
    // -1操做
    dispatch_wait(sem, DISPATCH_TIME_FOREVER);
    // +1 操做
    dispatch_semaphore_signal(sem);
複製代碼

上面三句代碼,就是建立信號量的代碼。 wait-1操做至關於阻塞操做,signal則是+1操做。

進入源碼分析:

dispatch_semaphore_create(long value)
{
	dispatch_semaphore_t dsema;
	if (value < 0) {
		return DISPATCH_BAD_INPUT;
	}

	dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
			sizeof(struct dispatch_semaphore_s));
	dsema->do_next = DISPATCH_OBJECT_LISTLESS;
	dsema->do_targetq = _dispatch_get_default_queue(false);
	dsema->dsema_value = value;
	_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
	dsema->dsema_orig = value;
	return dsema;
}
複製代碼

第一步就聲明瞭一個信號量對象,而後判斷value,小於0就是不正確操做。後面就是建立對象,開闢空間。最主要的兩步其實就是value的賦值。

接下來看一下dispatch_wait():

#define dispatch_wait(object, timeout) \
		_Generic((object), \
			dispatch_block_t:dispatch_block_wait, \
			dispatch_group_t:dispatch_group_wait, \
			dispatch_semaphore_t:dispatch_semaphore_wait \
		)((object),(timeout))
複製代碼

這裏咱們看的是信號量,因此走到dispatch_semaphore_wait方法裏:

dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
	long value = os_atomic_dec2o(dsema, dsema_value, acquire);
	if (likely(value >= 0)) {
		return 0;
	}
	return _dispatch_semaphore_wait_slow(dsema, timeout);
}
複製代碼

中間一個if條件判斷,若是value >= 0,則直接返回一個0,返回0表示堵塞,容許進來的線程爲0; 因此咱們看下os_atomic_dec2o()方法具體作了什麼:

#define os_atomic_dec2o(p, f, m) \ os_atomic_sub2o(p, f, 1, m) =======> 宏定義

#define os_atomic_sub2o(p, f, v, m) \ os_atomic_sub(&(p)->f, (v), m) ========> 宏定義

#define os_atomic_sub(p, v, m) \ _os_atomic_c11_op((p), (v), m, sub, -) ========> 宏定義
複製代碼

連續的幾個宏定義,其實後面還有,不過看到這裏已經夠了,sub1-,不知道你們有沒有看到這些敏感的字樣,實際上這一步就是進行-1操做。

再看_dispatch_semaphore_wait_slow()方法,走到這步,說明value值是小於0的

_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
		dispatch_time_t timeout)
{
	long orig;

	_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
	switch (timeout) {
	default:
		if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
			break;
		}
		// Fall through and try to undo what the fast path did to
		// dsema->dsema_value
	case DISPATCH_TIME_NOW:
		orig = dsema->dsema_value;
		while (orig < 0) {
			if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
					&orig, relaxed)) {
				return _DSEMA4_TIMEOUT();
			}
		}
		// Another thread called semaphore_signal().
		// Fall through and drain the wakeup.
	case DISPATCH_TIME_FOREVER:
		_dispatch_sema4_wait(&dsema->dsema_sema);
		break;
	}
	return 0;
}
複製代碼

咔咔咔。。。其實這麼多代碼,就是告訴咱們會一直的進行等待,它是對咱們第二個參數設置的遍歷。 咱們這裏設置的是forever,永久等待。註釋也幫咱們解答了,若是想要喚醒,那麼就調用semaphore_signal()方法。

dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
	long value = os_atomic_inc2o(dsema, dsema_value, release);
	if (likely(value > 0)) {
		return 0;
	}
	if (unlikely(value == LONG_MIN)) {
		DISPATCH_CLIENT_CRASH(value,
				"Unbalanced call to dispatch_semaphore_signal()");
	}
	return _dispatch_semaphore_signal_slow(dsema);
}
複製代碼

這裏代碼和前面的wait很類似,先看看os_atomic_inc2o()方法:

#define os_atomic_inc2o(p, f, m) \ os_atomic_add2o(p, f, 1, m)

#define os_atomic_add2o(p, f, v, m) \ os_atomic_add(&(p)->f, (v), m)

#define os_atomic_add(p, v, m) \ _os_atomic_c11_op((p), (v), m, add, +)
複製代碼

經過上面的代碼,我相信聰明的你已經猜到了os_atomic_inc2o()的意義了,沒錯,就是+1操做。

後面的方法_dispatch_semaphore_signal_slow表示持續加一的狀態,最後返回1,表示爲非阻塞狀態。

調度組dispatch_group_t

dispatch_group_create()

    dispatch_group_enter()
    dispatch_group_leave() 

    dispatch_group_async(<#dispatch_group_t _Nonnull group#>, <#dispatch_queue_t _Nonnull queue#>, ^{
    
    });

    dispatch_group_notify(, , );
複製代碼

調度組咱們圍繞這幾個方法來講,先看建立dispatch_group_create()

dispatch_group_create(void)
{
	return _dispatch_group_create_with_count(0);
}

_dispatch_group_create_with_count(uint32_t n)
{
	dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
			sizeof(struct dispatch_group_s));
	dg->do_next = DISPATCH_OBJECT_LISTLESS;
	dg->do_targetq = _dispatch_get_default_queue(false);
	if (n) {
		os_atomic_store2o(dg, dg_bits,
				-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
		os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
	}
	return dg;
}
複製代碼

能夠看出就是建立了一個dispatch_group_t類型的對象,比較簡單。

下面看dispatch_group_enter():

dispatch_group_enter(dispatch_group_t dg)
{
	// The value is decremented on a 32bits wide atomic so that the carry
	// for the 0 -> -1 transition is not propagated to the upper 32bits.
	uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
			DISPATCH_GROUP_VALUE_INTERVAL, acquire);
	uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
	if (unlikely(old_value == 0)) {
		_dispatch_retain(dg); // <rdar://problem/22318411>
	}
	if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
		DISPATCH_CLIENT_CRASH(old_bits,
				"Too many nested calls to dispatch_group_enter()");
	}
}
複製代碼

其實能夠經過註釋看到下面的操做是0 -> 1的操做,很明顯是這個方法os_atomic_sub_orig2o

#define os_atomic_sub_orig2o(p, f, v, m) \ os_atomic_sub_orig(&(p)->f, (v), m)
#define os_atomic_sub_orig(p, v, m) \ _os_atomic_c11_op_orig((p), (v), m, sub, -)
複製代碼

而後判斷,等於0直接返回,形成堵塞。

接着看dispatch_group_leave()

dispatch_group_leave(dispatch_group_t dg)
{
	// The value is incremented on a 64bits wide atomic so that the carry for
	// the -1 -> 0 transition increments the generation atomically.
	uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
			DISPATCH_GROUP_VALUE_INTERVAL, release);
	uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);

	if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
		// 省略一些無關代碼
		return _dispatch_group_wake(dg, old_state, true);
	}

	if (unlikely(old_value == 0)) {
		DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
				"Unbalanced call to dispatch_group_leave()");
	}
}
複製代碼

一樣的道理,第一步就是執行+1的操做,這裏就不重複了。最後面有個判斷,表示+1操做以後,還等於0,就會報錯,這就要求dispatch_group_enter()dispatch_group_leave()要成對出現,否則會有問題。

主要看後面的方法_dispatch_group_wake

_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
	uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>

	if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
		dispatch_continuation_t dc, next_dc, tail;

		// Snapshot before anything is notified/woken <rdar://problem/8554546>
		dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
		do {
			dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
			next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
			_dispatch_continuation_async(dsn_queue, dc,
					_dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
			_dispatch_release(dsn_queue);
		} while ((dc = next_dc));

		refs++;
	}

	if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
		_dispatch_wake_by_address(&dg->dg_gen);
	}

	if (refs) _dispatch_release_n(dg, refs);
}
複製代碼

走到這步就代表出裏調度組,執行隊列裏的全部任務,裏面有個do...while循環。

下面看另外一個方法dispatch_group_async()

dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_block_t db)
{
	dispatch_continuation_t dc = _dispatch_continuation_alloc();
	uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
	dispatch_qos_t qos;

	qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
	_dispatch_continuation_group_async(dg, dq, dc, qos);
}
複製代碼

第一步_dispatch_continuation_init()操做是保存任務塊
接着看_dispatch_continuation_group_async()

_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_continuation_t dc, dispatch_qos_t qos)
{
	dispatch_group_enter(dg);
	dc->dc_data = dg;
	_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
複製代碼

走到這裏咱們看到一個熟悉的方法dispatch_group_enter()上面剛剛說過的,而方法_dispatch_continuation_async()在上一篇文章裏描述函數的部分也介紹過。不瞭解的小夥伴能夠去看看

接着看方法_dispatch_group_notify()

_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_continuation_t dsn)
{
	prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
	if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
	os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
	if (os_mpsc_push_was_empty(prev)) {
		os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
			new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
			if ((uint32_t)old_state == 0) {
				os_atomic_rmw_loop_give_up({
					return _dispatch_group_wake(dg, new_state, false);
				});
			}
		});
	}
}
複製代碼

其實前面的一系列判斷說什麼都不重要,最重要的是咱們看到了方法_dispatch_group_wake(),是否是又回到了dispatch_group_leave()方法
總結: 其實dispatch_group_async()_dispatch_group_notify()就是對dispatch_group_enter()dispatch_group_leave()的封裝,不過我的感受後面二者更加靈活。

上面描述若是有什麼錯誤,還但願小夥伴們指出,一塊兒學習一塊兒交流!!!

相關文章
相關標籤/搜索