go語言以併發做爲其特性之一,併發必然會帶來對於資源的競爭,這時候咱們就須要使用go提供的sync.Mutex
這把互斥鎖來保證臨界資源的訪問互斥。算法
既然常常會用這把鎖,那麼瞭解一下其內部實現,就能瞭解這把鎖適用什麼場景,特性如何了。編程
在我第一次看這段代碼的時候,感受真的是驚爲天人,特別是整個Mutex
只用到了兩個私有字段,以及一次CAS就加鎖的過程,這其中設計以及編程的理念真的讓我感受自愧不如。併發
在看sync.Mutex
的代碼的時候,必定要記住,同時會有多個goroutine會來要這把鎖,因此鎖的狀態state
是可能會一直更改的。app
先說結論:sync.Mutex
是把公平鎖。less
在源代碼中,有一段註釋:ide
// Mutex fairness. // // Mutex can be in 2 modes of operations: normal and starvation. // In normal mode waiters are queued in FIFO order, but a woken up waiter // does not own the mutex and competes with new arriving goroutines over // the ownership. New arriving goroutines have an advantage -- they are // already running on CPU and there can be lots of them, so a woken up // waiter has good chances of losing. In such case it is queued at front // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, // it switches mutex to the starvation mode. // // In starvation mode ownership of the mutex is directly handed off from // the unlocking goroutine to the waiter at the front of the queue. // New arriving goroutines don't try to acquire the mutex even if it appears // to be unlocked, and don't try to spin. Instead they queue themselves at // the tail of the wait queue. // // If a waiter receives ownership of the mutex and sees that either // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, // it switches mutex back to normal operation mode. // // Normal mode has considerably better performance as a goroutine can acquire // a mutex several times in a row even if there are blocked waiters. // Starvation mode is important to prevent pathological cases of tail latency.
看懂這段註釋對於咱們理解mutex這把鎖有很大的幫助,這裏面講了這把鎖的設計理念。大體意思以下:函數
// 公平鎖 // // 鎖有兩種模式:正常模式和飢餓模式。 // 在正常模式下,全部的等待鎖的goroutine都會存在一個先進先出的隊列中(輪流被喚醒) // 可是一個被喚醒的goroutine並非直接得到鎖,而是仍然須要和那些新請求鎖的(new arrivial) // 的goroutine競爭,而這實際上是不公平的,由於新請求鎖的goroutine有一個優點——它們正在CPU上 // 運行,而且數量可能會不少。因此一個被喚醒的goroutine拿到鎖的機率是很小的。在這種狀況下, // 這個被喚醒的goroutine會加入到隊列的頭部。若是一個等待的goroutine有超過1ms(寫死在代碼中) // 都沒獲取到鎖,那麼就會把鎖轉變爲飢餓模式。 // // 在飢餓模式中,鎖的全部權會直接從釋放鎖(unlock)的goroutine轉交給隊列頭的goroutine, // 新請求鎖的goroutine就算鎖是空閒狀態也不會去獲取鎖,而且也不會嘗試自旋。它們只是排到隊列的尾部。 // // 若是一個goroutine獲取到了鎖以後,它會判斷如下兩種狀況: // 1. 它是隊列中最後一個goroutine; // 2. 它拿到鎖所花的時間小於1ms; // 以上只要有一個成立,它就會把鎖轉變回正常模式。 // 正常模式會有比較好的性能,由於即便有不少阻塞的等待鎖的goroutine, // 一個goroutine也能夠嘗試請求屢次鎖。 // 飢餓模式對於防止尾部延遲來講很是的重要。
在下一步真正看源代碼以前,咱們必需要理解一點:當一個goroutine獲取到鎖的時候,有可能沒有競爭者,也有可能會有不少競爭者,那麼咱們就須要站在不一樣的goroutine的角度上去考慮goroutine看到的鎖的狀態和實際狀態、指望狀態之間的轉化。性能
sync.Mutex
只包含兩個字段:ui
// A Mutex is a mutual exclusion lock. // The zero value for a Mutex is an unlocked mutex. // // A Mutex must not be copied after first use. type Mutex struct { state int32 sema uint32 } const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexStarving mutexWaiterShift = iota starvationThresholdNs = 1e6 )
其中state
是一個表示鎖的狀態的字段,這個字段會同時被多個goroutine所共用(使用atomic.CAS來保證原子性),第0個bit(1)表示鎖已被獲取,也就是已加鎖,被某個goroutine擁有;第1個bit(2)表示有goroutine被喚醒,嘗試獲取鎖;第2個bit(4)標記這把鎖是否爲飢餓狀態。atom
sema
字段就是用來喚醒goroutine所用的信號量。
在看代碼以前,咱們須要有一個概念:每一個goroutine也有本身的狀態,存在局部變量裏面(也就是函數棧裏面),goroutine有多是新到的、被喚醒的、正常的、飢餓的。
先瞻仰一下驚爲天人的一行代碼加鎖的CAS操做:
// Lock locks m. // If the lock is already in use, the calling goroutine // blocks until the mutex is available. func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } ... }
這是第一段代碼,這段代碼調用了atomic
包中的CompareAndSwapInt32
這個方法來嘗試快速獲取鎖,這個方法的簽名以下:
// CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value. func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
意思是,若是addr指向的地址中存的值和old同樣,那麼就把addr中的值改成new並返回true;不然什麼都不作,返回false。因爲是atomic
中的函數,因此是保證了原子性的。
咱們來具體看看CAS的實現(src/runtime/internal/atomic/asm_amd64.s
):
// bool Cas(int32 *val, int32 old, int32 new) // Atomically: // if(*val == old){ // *val = new; // return 1; // } else // return 0; // 這裏參數及返回值大小加起來是17,是由於一個指針在amd64下是8字節, // 而後int32分別是佔用4字節,最後的返回值是bool佔用1字節,因此加起來是17 TEXT runtime∕internal∕atomic·Cas(SB),NOSPLIT,$0-17 // 爲何不把*val指針放到AX中呢?由於AX有特殊用處, // 在下面的CMPXCHGL裏面,會從AX中讀取要比較的其中一個數 MOVQ ptr+0(FP), BX // 因此AX要用來存參數old MOVL old+8(FP), AX // 把new中的數存到寄存器CX中 MOVL new+12(FP), CX // 注意這裏了,這裏使用了LOCK前綴,因此保證操做是原子的 LOCK // 0(BX) 能夠理解爲 *val // 把 AX中的數 和 第二個操做數 0(BX)——也就是BX寄存器所指向的地址中存的值 進行比較 // 若是相等,就把 第一個操做數 CX寄存器中存的值 賦給 第二個操做數 BX寄存器所指向的地址 // 並將標誌寄存器ZF設爲1 // 不然將標誌寄存器ZF清零 CMPXCHGL CX, 0(BX) // SETE的做用是: // 若是Zero Flag標誌寄存器爲1,那麼就把操做數設爲1 // 不然把操做數設爲0 // 也就是說,若是上面的比較相等了,就返回true,不然爲false // ret+16(FP)表明了返回值的地址 SETEQ ret+16(FP) RET
若是看不懂也沒太大關係,只要知道這個函數的做用,以及這個函數是原子性的便可。
那麼這段代碼的意思就是:先看看這把鎖是否是空閒狀態,若是是的話,直接原子性地修改一下state
爲已被獲取就好了。多麼簡潔(雖而後面的代碼並非……)!
接下來具體看主流程的代碼,代碼中有一些位運算看起來比較暈,我會試着用僞代碼在邊上註釋。
// Lock locks m. // If the lock is already in use, the calling goroutine // blocks until the mutex is available. func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } // 用來存當前goroutine等待的時間 var waitStartTime int64 // 用來存當前goroutine是否飢餓 starving := false // 用來存當前goroutine是否已喚醒 awoke := false // 用來存當前goroutine的循環次數(想想一個goroutine若是循環了2147483648次咋辦……) iter := 0 // 複製一下當前鎖的狀態 old := m.state // 自旋 for { // 若是是飢餓狀況之下,就不要自旋了,由於鎖會直接交給隊列頭部的goroutine // 若是鎖是被獲取狀態,而且知足自旋條件(canSpin見後文分析),那麼就自旋等鎖 // 僞代碼:if isLocked() and isNotStarving() and canSpin() if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 將本身的狀態以及鎖的狀態設置爲喚醒,這樣當Unlock的時候就不會去喚醒其它被阻塞的goroutine了 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } // 進行自旋(分析見後文) runtime_doSpin() iter++ // 更新鎖的狀態(有可能在自旋的這段時間以內鎖的狀態已經被其它goroutine改變) old = m.state continue } // 當走到這一步的時候,可能會有如下的狀況: // 1. 鎖被獲取+飢餓 // 2. 鎖被獲取+正常 // 3. 鎖空閒+飢餓 // 4. 鎖空閒+正常 // goroutine的狀態多是喚醒以及非喚醒 // 複製一份當前的狀態,目的是根據當前狀態設置出指望的狀態,存在new裏面, // 而且經過CAS來比較以及更新鎖的狀態 // old用來存鎖的當前狀態 new := old // 若是說鎖不是飢餓狀態,就把指望狀態設置爲被獲取(獲取鎖) // 也就是說,若是是飢餓狀態,就不要把指望狀態設置爲被獲取 // 新到的goroutine乖乖排隊去 // 僞代碼:if isNotStarving() if old&mutexStarving == 0 { // 僞代碼:newState = locked new |= mutexLocked } // 若是鎖是被獲取狀態,或者飢餓狀態 // 就把指望狀態中的等待隊列的等待者數量+1(其實是new + 8) // (會不會可能有三億個goroutine等待拿鎖……) if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // 若是說當前的goroutine是飢餓狀態,而且鎖被其它goroutine獲取 // 那麼將指望的鎖的狀態設置爲飢餓狀態 // 若是鎖是釋放狀態,那麼就不用切換了 // Unlock指望一個飢餓的鎖會有一些等待拿鎖的goroutine,而不僅是一個 // 這種狀況下不會成立 if starving && old&mutexLocked != 0 { // 指望狀態設置爲飢餓狀態 new |= mutexStarving } // 若是說當前goroutine是被喚醒狀態,咱們須要reset這個狀態 // 由於goroutine要麼是拿到鎖了,要麼是進入sleep了 if awoke { // 若是說指望狀態不是woken狀態,那麼確定出問題了 // 這裏看不懂不要緊,wake的邏輯在下面 if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } // 這句就是把new設置爲非喚醒狀態 // &^的意思是and not new &^= mutexWoken } // 經過CAS來嘗試設置鎖的狀態 // 這裏多是設置鎖,也有多是隻設置爲飢餓狀態和等待數量 if atomic.CompareAndSwapInt32(&m.state, old, new) { // 若是說old狀態不是飢餓狀態也不是被獲取狀態 // 那麼表明當前goroutine已經經過CAS成功獲取了鎖 // (能進入這個代碼塊表示狀態已改變,也就是說狀態是從空閒到被獲取) if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // 若是以前已經等待過了,那麼就要放到隊列頭 queueLifo := waitStartTime != 0 // 若是說以前沒有等待過,就初始化設置如今的等待時間 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 既然獲取鎖失敗了,就使用sleep原語來阻塞當前goroutine // 經過信號量來排隊獲取鎖 // 若是是新來的goroutine,就放到隊列尾部 // 若是是被喚醒的等待鎖的goroutine,就放到隊列頭部 runtime_SemacquireMutex(&m.sema, queueLifo) // 這裏sleep完了,被喚醒 // 若是當前goroutine已是飢餓狀態了 // 或者當前goroutine已經等待了1ms(在上面定義常量)以上 // 就把當前goroutine的狀態設置爲飢餓 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs // 再次獲取一下鎖如今的狀態 old = m.state // 若是說鎖如今是飢餓狀態,就表明如今鎖是被釋放的狀態,當前goroutine是被信號量所喚醒的 // 也就是說,鎖被直接交給了當前goroutine if old&mutexStarving != 0 { // 若是說當前鎖的狀態是被喚醒狀態或者被獲取狀態,或者說等待的隊列爲空 // 那麼是不可能的,確定是出問題了,由於當前狀態確定應該有等待的隊列,鎖也必定是被釋放狀態且未喚醒 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // 當前的goroutine得到了鎖,那麼就把等待隊列-1 delta := int32(mutexLocked - 1<<mutexWaiterShift) // 若是當前goroutine非飢餓狀態,或者說當前goroutine是隊列中最後一個goroutine // 那麼就退出飢餓模式,把狀態設置爲正常 if !starving || old>>mutexWaiterShift == 1 { // Exit starvation mode. // Critical to do it here and consider wait time. // Starvation mode is so inefficient, that two goroutines // can go lock-step infinitely once they switch mutex // to starvation mode. delta -= mutexStarving } // 原子性地加上改動的狀態 atomic.AddInt32(&m.state, delta) break } // 若是鎖不是飢餓模式,就把當前的goroutine設爲被喚醒 // 而且重置iter(重置spin) awoke = true iter = 0 } else { // 若是CAS不成功,也就是說沒能成功得到鎖,鎖被別的goroutine得到了或者鎖一直沒被釋放 // 那麼就更新狀態,從新開始循環嘗試拿鎖 old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
以上爲何CAS能拿到鎖呢?由於CAS會原子性地判斷old state
和當前鎖的狀態是否一致;而總有一個goroutine會知足以上條件成功拿鎖。
接下來咱們來看看上文提到的canSpin
條件如何:
// Active spinning for sync.Mutex. //go:linkname sync_runtime_canSpin sync.runtime_canSpin //go:nosplit func sync_runtime_canSpin(i int) bool { // 這裏的active_spin是個常量,值爲4 // 簡單來講,sync.Mutex是有可能被多個goroutine競爭的,因此不該該大量自旋(消耗CPU) // 自旋的條件以下: // 1. 自旋次數小於active_spin(這裏是4)次; // 2. 在多核機器上; // 3. GOMAXPROCS > 1而且至少有一個其它的處於運行狀態的P; // 4. 當前P沒有其它等待運行的G; // 知足以上四個條件才能夠進行自旋。 if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }
因此能夠看出來,並非一直無限自旋下去的,當自旋次數到達4次或者其它條件不符合的時候,就改成信號量拿鎖了。
而後咱們來看看doSpin
的實現(其實也沒啥好看的):
//go:linkname sync_runtime_doSpin sync.runtime_doSpin //go:nosplit func sync_runtime_doSpin() { procyield(active_spin_cnt) }
這是一個彙編實現的函數,簡單看兩眼amd64上的實現:
TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AX again: PAUSE SUBL $1, AX JNZ again RET
看起來沒啥好看的,直接跳過吧。
接下來咱們來看看Unlock的實現,對於Unlock來講,有兩個比較關鍵的特性:
func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // Fast path: drop lock bit. // 這裏獲取到鎖的狀態,而後將狀態減去被獲取的狀態(也就是解鎖),稱爲new(指望)狀態 // 注意以上兩個操做是原子的,因此不用擔憂多個goroutine併發的問題 new := atomic.AddInt32(&m.state, -mutexLocked) // 若是說,指望狀態加上被獲取的狀態,不是被獲取的話 // 那麼就panic // 在這裏給你們提一個問題:幹嗎要這麼大費周章先減去再加上,直接比較一下原來鎖的狀態是否被獲取不就完事了? if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } // 若是說new狀態(也就是鎖的狀態)不是飢餓狀態 if new&mutexStarving == 0 { // 複製一下原先狀態 old := new for { // 若是說鎖沒有等待拿鎖的goroutine // 或者鎖被獲取了(在循環的過程當中被其它goroutine獲取了) // 或者鎖是被喚醒狀態(表示有goroutine被喚醒,不須要再去嘗試喚醒其它goroutine) // 或者鎖是飢餓模式(會直接轉交給隊列頭的goroutine) // 那麼就直接返回,啥都不用作了 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // 走到這一步的時候,說明鎖目前仍是空閒狀態,而且沒有goroutine被喚醒且隊列中有goroutine等待拿鎖 // 那麼咱們就要把鎖的狀態設置爲被喚醒,等待隊列-1 new = (old - 1<<mutexWaiterShift) | mutexWoken // 又是熟悉的CAS if atomic.CompareAndSwapInt32(&m.state, old, new) { // 若是狀態設置成功了,咱們就經過信號量去喚醒goroutine runtime_Semrelease(&m.sema, false) return } // 循環結束的時候,更新一下狀態,由於有可能在執行的過程當中,狀態被修改了(好比被Lock改成了飢餓狀態) old = m.state } } else { // 若是是飢餓狀態下,那麼咱們就直接把鎖的全部權經過信號量移交給隊列頭的goroutine就行了 // handoff = true表示直接把鎖交給隊列頭部的goroutine // 注意:在這個時候,鎖被獲取的狀態沒有被設置,會由被喚醒的goroutine在喚醒後設置 // 可是當鎖處於飢餓狀態的時候,咱們也認爲鎖是被獲取的(由於咱們手動指定了獲取的goroutine) // 因此說新來的goroutine不會嘗試去獲取鎖(在Lock中有體現) runtime_Semrelease(&m.sema, true) } }
根據以上代碼的分析,能夠看出,sync.Mutex
這把鎖在你的工做負載(所需時間)比較低,好比只是對某個關鍵變量賦值的時候,性能仍是比較好的,可是若是說對於臨界資源的操做耗時很長(特別是單個操做就大於1ms)的話,實際上性能上會有必定的問題,這也就是咱們常常看到「的鎖一直處於飢餓狀態」的問題,對於這種狀況,可能就須要另尋他法了。
好了,至此整個sync.Mutex
的分析就此結束了,雖然只有短短200行代碼(包括150行註釋,實際代碼估計就50行),可是其中的算法、設計的思想、編程的理念倒是值得感悟,所謂大道至簡、少便是多可能就是如此吧。