深刻理解GCD之dispatch_semaphore

博客連接深刻理解GCD之dispatch_semaphore安全

再研究完dispatch_queue以後,原本是打算進入到dispath_group的源碼,可是dispath_group基本是圍繞着dispatch_semaphore即信號量實現的,因此咱們先進入到dispatch_semaphore的源碼學習。在GCD中使用dispatch_semaphore用來保證資源使用的安全性(隊列的同步執行就是依賴信號量實現)。可想而知,dispatch_semaphore的性能應該是不差的。bash

dispatch_semaphore_t

dispatch_semaphore_s是信號量的結構體。代碼以下:多線程

struct dispatch_semaphore_s {
	DISPATCH_STRUCT_HEADER(dispatch_semaphore_s, dispatch_semaphore_vtable_s);
	long dsema_value;	//當前信號量
	long dsema_orig;	//初始化信號量
	size_t dsema_sent_ksignals;
#if USE_MACH_SEM && USE_POSIX_SEM
#error "Too many supported semaphore types"
#elif USE_MACH_SEM
	semaphore_t dsema_port;
	semaphore_t dsema_waiter_port;
#elif USE_POSIX_SEM
	sem_t dsema_sem;
#else
#error "No supported semaphore type"
#endif
	size_t dsema_group_waiters;
	struct dispatch_sema_notify_s *dsema_notify_head; //notify鏈表頭部
	struct dispatch_sema_notify_s *dsema_notify_tail; //notify鏈表尾部
};

typedef mach_port_t		semaphore_t;

struct dispatch_sema_notify_s {
	struct dispatch_sema_notify_s *volatile dsn_next; //下一個信號節點
	dispatch_queue_t dsn_queue;	//操做的隊列
	void *dsn_ctxt;				//上下文
	void (*dsn_func)(void *);	//執行函數
};
複製代碼

雖然上面還有一些屬性不知道是作什麼做用的,但咱們繼續往下走。併發

dispatch_semaphore_create

dispatch_semaphore_create用於信號量的建立。app

dispatch_semaphore_t
dispatch_semaphore_create(long value)
{
	dispatch_semaphore_t dsema;

	// If the internal value is negative, then the absolute of the value is
	// equal to the number of waiting threads. Therefore it is bogus to
	// initialize the semaphore with a negative value.
	if (value < 0) {//value必須大於等於0
		return NULL;
	}
	
	//申請dispatch_semaphore_s的內存
	dsema = calloc(1, sizeof(struct dispatch_semaphore_s));

	if (fastpath(dsema)) {
		//設置dispatch_semaphore_s 的操做函數
		dsema->do_vtable = &_dispatch_semaphore_vtable;
		//設置鏈表尾部
		dsema->do_next = DISPATCH_OBJECT_LISTLESS;
		//引用計數
		dsema->do_ref_cnt = 1;
		dsema->do_xref_cnt = 1;
		//目標隊列的設置
		dsema->do_targetq = dispatch_get_global_queue(
				DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
		//當前信號量和初始化信號的賦值
		dsema->dsema_value = value;
		dsema->dsema_orig = value;
#if USE_POSIX_SEM
		int ret = sem_init(&dsema->dsema_sem, 0, 0);
		DISPATCH_SEMAPHORE_VERIFY_RET(ret);
#endif
	}

	return dsema;
}
複製代碼

上面的源碼中dsema->do_vtable = &_dispatch_semaphore_vtable;異步

_dispatch_semaphore_vtable定義以下:async

const struct dispatch_semaphore_vtable_s _dispatch_semaphore_vtable = {
	.do_type = DISPATCH_SEMAPHORE_TYPE,
	.do_kind = "semaphore",
	.do_dispose = _dispatch_semaphore_dispose,
	.do_debug = _dispatch_semaphore_debug,
};
複製代碼

這裏有個_dispatch_semaphore_dispose函數就是信號量的銷燬函數。代碼以下:函數

static void
_dispatch_semaphore_dispose(dispatch_semaphore_t dsema)
{
	//信號量的當前值小於初始化,會發生閃退。由於信號量已經被釋放了
	if (dsema->dsema_value < dsema->dsema_orig) {
		DISPATCH_CLIENT_CRASH(
				"Semaphore/group object deallocated while in use");
	}

#if USE_MACH_SEM
	kern_return_t kr;
	//釋放信號,這個信號是dispatch_semaphore使用的信號
	if (dsema->dsema_port) {
		kr = semaphore_destroy(mach_task_self(), dsema->dsema_port);
		DISPATCH_SEMAPHORE_VERIFY_KR(kr);
	}
	//釋放信號,這個信號是dispatch_group使用的信號
	if (dsema->dsema_waiter_port) {
		kr = semaphore_destroy(mach_task_self(), dsema->dsema_waiter_port);
		DISPATCH_SEMAPHORE_VERIFY_KR(kr);
	}
#elif USE_POSIX_SEM
	int ret = sem_destroy(&dsema->dsema_sem);
	DISPATCH_SEMAPHORE_VERIFY_RET(ret);
#endif

	_dispatch_dispose(dsema);
}
複製代碼

dispatch_semaphore_wait

建立好一個信號量後就會開始進入等待信號發消息。post

long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
	//原子性減1,這裏說明dsema_value是當前信號值,並將新值賦給value
	long value = dispatch_atomic_dec2o(dsema, dsema_value);
	dispatch_atomic_acquire_barrier();
	if (fastpath(value >= 0)) {
		//說明有資源可用,直接返回0,表示等到信號量的信息了
		return 0;
	}
	//等待信號量喚醒或者timeout超時
	return _dispatch_semaphore_wait_slow(dsema, timeout);
}
複製代碼

_dispatch_semaphore_wait_slow

dispatch_semaphore_wait中,若是value小於0,就會執行_dispatch_semaphore_wait_slow等待信號量喚醒或者timeout超時。_dispatch_semaphore_wait_slow的代碼以下:性能

static long
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
		dispatch_time_t timeout)
{
	long orig;

again:
	// Mach semaphores appear to sometimes spuriously wake up. Therefore,
	// we keep a parallel count of the number of times a Mach semaphore is
	// signaled (6880961).
	//第一部分:
	//只要dsema->dsema_sent_ksignals不爲零就會進入循環
	//dispatch_atomic_cmpxchg2o(dsema, dsema_sent_ksignals, orig,orig - 1)的意思是
	//dsema->dsema_sent_ksignals若是等於orig,則將orig - 1賦值給dsema_sent_ksignals,
	//而且返回true,不然返回false。
	//若是返回true,說明又獲取了資源
	while ((orig = dsema->dsema_sent_ksignals)) {
		if (dispatch_atomic_cmpxchg2o(dsema, dsema_sent_ksignals, orig,
				orig - 1)) {
			return 0;
		}
	}

#if USE_MACH_SEM
	mach_timespec_t _timeout;
	kern_return_t kr;

	//第二部分:dispatch_semaphore_s中的dsema_port賦值,以懶加載的形式
	_dispatch_semaphore_create_port(&dsema->dsema_port);

	// From xnu/osfmk/kern/sync_sema.c:
	// wait_semaphore->count = -1; /* we don't keep an actual count */ // // The code above does not match the documentation, and that fact is // not surprising. The documented semantics are clumsy to use in any // practical way. The above hack effectively tricks the rest of the // Mach semaphore logic to behave like the libdispatch algorithm. //第三部分: switch (timeout) { default: //計算剩餘時間,調用mach內核的等待函數semaphore_timedwait()進行等待。 //若是在指定時間內沒有獲得通知,則會一直阻塞住,監聽dsema_port等待其通知; //當超時的時候,會執行下面的case代碼(這個default沒有break)。 do { uint64_t nsec = _dispatch_timeout(timeout); _timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC); _timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC); kr = slowpath(semaphore_timedwait(dsema->dsema_port, _timeout)); } while (kr == KERN_ABORTED); if (kr != KERN_OPERATION_TIMED_OUT) { DISPATCH_SEMAPHORE_VERIFY_KR(kr); break; } // Fall through and try to undo what the fast path did to // dsema->dsema_value case DISPATCH_TIME_NOW: //若當前信號量desma_value小於0,對其加一併返回超時信號KERN_OPERATION_TIMED_OUT。 //KERN_OPERATION_TIMED_OUT表明等待超時而返回 //因爲一開始在第一部分代碼中進行了減1操做,因此須要加1以撤銷以前的操做。 while ((orig = dsema->dsema_value) < 0) { if (dispatch_atomic_cmpxchg2o(dsema, dsema_value, orig, orig + 1)) { return KERN_OPERATION_TIMED_OUT; } } // Another thread called semaphore_signal(). // Fall through and drain the wakeup. case DISPATCH_TIME_FOREVER: //一直等待直到有信號。當有信號的時候說明dsema_value大於0,會跳轉到again,從新執行本函數的流程 do { kr = semaphore_wait(dsema->dsema_port); } while (kr == KERN_ABORTED); DISPATCH_SEMAPHORE_VERIFY_KR(kr); break; } #elif USE_POSIX_SEM //此處的代碼省略,跟上面USE_MACH_SEM代碼相似 #endif goto again; } 複製代碼

在上面的源碼還有幾個地方須要注意:

  1. 第一部分的那個while循環和if條件。在dsema_sent_ksignals非0的狀況下便會進入while循環,if的條件是dsema->dsema_sent_ksignals若是等於orig,則將orig - 1賦值給dsema_sent_ksignals,而且返回true,不然返回false。很明顯,只要能進入循環,這個條件是必定成立的,函數直接返回0,表示等到信號。而在初始化信號量的時候沒有對dsema_sent_ksignals賦值,因此就會進入以後的代碼。也就是說沒有信號量的實際通知或者遭受了系統異常通知,並不會解除等待

  2. 在上面中出現了semaphore_timedwaitsemaphore_wait。這些方法是在semaphore.h中的。因此說dispatch_semaphore是基於mach內核的信號量接口實現的。另外這兩個方法傳入的參數是dsema_portdsema_port被mach內核semaphore監聽,因此咱們理解dsema_port是dispatch_semaphore的信號。

  3. 咱們回過頭再看一下dispatch_semaphore_s結構體中的dsema_waiter_port。全局搜索一下能夠發現,這個屬性是用在dispatch_group中。以前也說了dispatch_group的實現是基於dispatch_semaphore,在dispatch_groupsemaphore_wait監聽的並非dsema_port而是dsema_waiter_port

dispatch_semaphore_wait流程以下圖所示:

dispatch_semaphore_wait

dispatch_semaphore_signal

發送信號的代碼相對等待信號來講簡單不少,它不須要阻塞,只發送喚醒。

long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
	dispatch_atomic_release_barrier();
	//原子性加1,value大於0 說明有資源當即返回
	long value = dispatch_atomic_inc2o(dsema, dsema_value);
	if (fastpath(value > 0)) {
		return 0;
	}
	if (slowpath(value == LONG_MIN)) {
		DISPATCH_CLIENT_CRASH("Unbalanced call to dispatch_semaphore_signal()");
	}
	return _dispatch_semaphore_signal_slow(dsema);
}
複製代碼

_dispatch_semaphore_signal_slow

long
_dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
{
	// Before dsema_sent_ksignals is incremented we can rely on the reference
	// held by the waiter. However, once this value is incremented the waiter
	// may return between the atomic increment and the semaphore_signal(),
	// therefore an explicit reference must be held in order to safely access
	// dsema after the atomic increment.
	_dispatch_retain(dsema);

	(void)dispatch_atomic_inc2o(dsema, dsema_sent_ksignals);

#if USE_MACH_SEM
	_dispatch_semaphore_create_port(&dsema->dsema_port);
	kern_return_t kr = semaphore_signal(dsema->dsema_port);
	DISPATCH_SEMAPHORE_VERIFY_KR(kr);
#elif USE_POSIX_SEM
	int ret = sem_post(&dsema->dsema_sem);
	DISPATCH_SEMAPHORE_VERIFY_RET(ret);
#endif

	_dispatch_release(dsema);
	return 1;
}
複製代碼

_dispatch_semaphore_signal_slow的做用就是內核的semaphore_signal函數喚醒在dispatch_semaphore_wait中等待的線程量,而後返回1。

dispatch_semaphore_signal流程以下圖所示:

dispatch_semaphore_signal

總結

  1. dispatch_semaphore是基於mach內核的信號量接口實現的

  2. 調用dispatch_semaphore_wait信號量減1,調用dispatch_semaphore_signal信號量加1

  3. wait中,信號量大於等於0表明有資源當即返回,不然等待信號量或者返回超時;在signal中,信號量大於0表明有資源當即返回,不然喚醒某個正在等待的線程

  4. dispatch_semaphore利用了兩個變量desma_valuedsema_sent_ksignals來處理waitsignal,在singnal中若是有資源,則不須要喚醒線程,那麼此時只須要使用desma_value。當須要喚醒線程的時候,發送的信號是dsema_sent_ksignals的值,此時會從新執行wait的流程,因此在wait中一開始是用dsema_sent_ksignals作判斷。

  5. 再看一下dispatch_semaphore_s結構體的變量。

struct dispatch_semaphore_s {
	DISPATCH_STRUCT_HEADER(dispatch_semaphore_s, dispatch_semaphore_vtable_s);
	long dsema_value;	//當前信號量
	long dsema_orig;	//初始化信號量
	size_t dsema_sent_ksignals; //喚醒時候的信號量
#if USE_MACH_SEM && USE_POSIX_SEM
#error "Too many supported semaphore types"
#elif USE_MACH_SEM
	semaphore_t dsema_port; //結構體使用的semaphore信號
	semaphore_t dsema_waiter_port;//dispatch_group使用的使用的semaphore信號
#elif USE_POSIX_SEM
	sem_t dsema_sem;
#else
#error "No supported semaphore type"
#endif
	size_t dsema_group_waiters;
	struct dispatch_sema_notify_s *dsema_notify_head; //notify鏈表頭部
	struct dispatch_sema_notify_s *dsema_notify_tail; //notify鏈表尾部
};
複製代碼

補充

如何控制線程併發數

方法1:使用信號量進行併發控制

dispatch_queue_t concurrentQueue = dispatch_queue_create("concurrentQueue", DISPATCH_QUEUE_CONCURRENT);
    dispatch_queue_t serialQueue = dispatch_queue_create("serialQueue",DISPATCH_QUEUE_SERIAL);
    dispatch_semaphore_t semaphore = dispatch_semaphore_create(4);
    for (NSInteger i = 0; i < 15; i++) {
        dispatch_async(serialQueue, ^{
            dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
            dispatch_async(concurrentQueue, ^{
                NSLog(@"thread:%@開始執行任務%d",[NSThread currentThread],(int)i);
                sleep(1);
                NSLog(@"thread:%@結束執行任務%d",[NSThread currentThread],(int)i);
                dispatch_semaphore_signal(semaphore);});
        });
    }
    NSLog(@"主線程...!");
複製代碼

結果

控制最大併發

方法2:YYDispatchQueuePool的實現思路

YYKit組件中的YYDispatchQueuePool也能控制併發隊列的併發數

在iOS保持界面流暢的技巧原文中提到:

其思路是爲不一樣優先級建立和 CPU 數量相同的 serial queue,每次從 pool 中獲取 queue 時,會輪詢返回其中一個 queue。我把 App 內全部異步操做,包括圖像解碼、對象釋放、異步繪製等,都按優先級不一樣放入了全局的 serial queue 中執行,這樣儘可能避免了過多線程致使的性能問題。

相關文章
相關標籤/搜索