sync.Mutex是一個不可重入的排他鎖。 這點和Java不一樣,golang裏面的排它鎖是不可重入的。golang
當一個 goroutine 得到了這個鎖的擁有權後, 其它請求鎖的 goroutine 就會阻塞在 Lock 方法的調用上,直到鎖被釋放。數據結構
sync.Mutex 由兩個字段 state 和 sema 組成。其中 state 表示當前互斥鎖的狀態,而 sema 是用於控制鎖狀態的信號量。ide
type Mutex struct { state int32 sema uint32 }
須要強調的是Mutex一旦使用以後,必定不要作copy操做。函數
Mutex的狀態機比較複雜,使用一個int32來表示:性能
const ( mutexLocked = 1 << iota // mutex is locked mutexWoken //2 mutexStarving //4 mutexWaiterShift = iota //3 ) 32 3 2 1 0 | | | | | | | | | | v-----------------------------------------------v-------------v-------------v-------------+ | | | | v | waitersCount |mutexStarving| mutexWoken | mutexLocked | | | | | | +-----------------------------------------------+-------------+-------------+-------------+
最低三位分別表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用來表示當前有多少個 Goroutine 等待互斥鎖的釋放:ui
在默認狀況下,互斥鎖的全部狀態位都是 0,int32 中的不一樣位分別表示了不一樣的狀態:this
爲了保證鎖的公平性,設計上互斥鎖有兩種狀態:正常狀態和飢餓狀態。atom
正常模式
下,全部等待鎖的goroutine按照FIFO順序等待。喚醒的goroutine不會直接擁有鎖,而是會和新請求鎖的goroutine競爭鎖的擁有。新請求鎖的goroutine具備優點:它正在CPU上執行,並且可能有好幾個,因此剛剛喚醒的goroutine有很大可能在鎖競爭中失敗。在這種狀況下,這個被喚醒的goroutine會加入到等待隊列的前面。 若是一個等待的goroutine超過1ms沒有獲取鎖,那麼它將會把鎖轉變爲飢餓模式
。設計
飢餓模式
下,鎖的全部權將從unlock的gorutine直接交給交給等待隊列中的第一個。新來的goroutine將不會嘗試去得到鎖,即便鎖看起來是unlock狀態, 也不會去嘗試自旋操做,而是放在等待隊列的尾部。code
若是一個等待的goroutine獲取了鎖,而且知足一如下其中的任何一個條件:(1)它是隊列中的最後一個;(2)它等待的時候小於1ms。它會將鎖的狀態轉換爲正常狀態。
正常狀態有很好的性能表現,飢餓模式也是很是重要的,由於它能阻止尾部延遲的現象。
func (m *Mutex) Lock() { // 若是mutex的state沒有被鎖,也沒有等待/喚醒的goroutine, 鎖處於正常狀態,那麼得到鎖,返回. // 好比鎖第一次被goroutine請求時,就是這種狀態。或者鎖處於空閒的時候,也是這種狀態。 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } // Slow path (outlined so that the fast path can be inlined) m.lockSlow() } func (m *Mutex) lockSlow() { // 標記本goroutine的等待時間 var waitStartTime int64 // 本goroutine是否已經處於飢餓狀態 starving := false // 本goroutine是否已喚醒 awoke := false // 自旋次數 iter := 0 old := m.state for { // 第一個條件:1.mutex已經被鎖了;2.不處於飢餓模式(若是時飢餓狀態,自旋時沒有用的,鎖的擁有權直接交給了等待隊列的第一個。) // 嘗試自旋的條件:參考runtime_canSpin函數 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 進入這裏確定是普通模式 // 自旋的過程當中若是發現state尚未設置woken標識,則設置它的woken標識, 並標記本身爲被喚醒。 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } // 到了這一步, state的狀態多是: // 1. 鎖尚未被釋放,鎖處於正常狀態 // 2. 鎖尚未被釋放, 鎖處於飢餓狀態 // 3. 鎖已經被釋放, 鎖處於正常狀態 // 4. 鎖已經被釋放, 鎖處於飢餓狀態 // 而且本gorutine的 awoke多是true, 也多是false (其它goutine已經設置了state的woken標識) // new 複製 state的當前狀態, 用來設置新的狀態 // old 是鎖當前的狀態 new := old // 若是old state狀態不是飢餓狀態, new state 設置鎖, 嘗試經過CAS獲取鎖, // 若是old state狀態是飢餓狀態, 則不設置new state的鎖,由於飢餓狀態下鎖直接轉給等待隊列的第一個. if old&mutexStarving == 0 { new |= mutexLocked } // 將等待隊列的等待者的數量加1 if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // 若是當前goroutine已經處於飢餓狀態, 而且old state的已被加鎖, // 將new state的狀態標記爲飢餓狀態, 將鎖轉變爲飢餓狀態. if starving && old&mutexLocked != 0 { new |= mutexStarving } // 若是本goroutine已經設置爲喚醒狀態, 須要清除new state的喚醒標記, 由於本goroutine要麼得到了鎖,要麼進入休眠, // 總之state的新狀態再也不是woken狀態. if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } // 經過CAS設置new state值. // 注意new的鎖標記不必定是true, 也可能只是標記一下鎖的state是飢餓狀態. if atomic.CompareAndSwapInt32(&m.state, old, new) { // 若是old state的狀態是未被鎖狀態,而且鎖不處於飢餓狀態, // 那麼當前goroutine已經獲取了鎖的擁有權,返回 if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. // 設置並計算本goroutine的等待時間 queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 既然未能獲取到鎖, 那麼就使用sleep原語阻塞本goroutine // 若是是新來的goroutine,queueLifo=false, 加入到等待隊列的尾部,耐心等待 // 若是是喚醒的goroutine, queueLifo=true, 加入到等待隊列的頭部 runtime_SemacquireMutex(&m.sema, queueLifo, 1) // sleep以後,此goroutine被喚醒 // 計算當前goroutine是否已經處於飢餓狀態. starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs // 獲得當前的鎖狀態 old = m.state // 若是當前的state已是飢餓狀態 // 那麼鎖應該處於Unlock狀態,那麼應該是鎖被直接交給了本goroutine if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // 當前goroutine用來設置鎖,並將等待的goroutine數減1. delta := int32(mutexLocked - 1<<mutexWaiterShift) // 若是本goroutine是最後一個等待者,或者它並不處於飢餓狀態, // 那麼咱們須要把鎖的state狀態設置爲正常模式. if !starving || old>>mutexWaiterShift == 1 { // 退出飢餓模式 delta -= mutexStarving } // 設置新state, 由於已經得到了鎖,退出、返回 atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } }
整個過程比較複雜,這裏總結一下一些重點:
slowLock的獲取鎖流程有兩種模式: 飢餓模式 和 正常模式。
知足上面四個條件的goroutine才能夠作自旋。自旋就會調用sync.runtime_doSpin 和 runtime.procyield 並執行 30 次的 PAUSE 指令,該指令只會佔用 CPU 並消耗 CPU 時間。
處理了自旋相關的特殊邏輯以後,互斥鎖會根據上下文計算當前互斥鎖最新的狀態new。幾個不一樣的條件分別會更新 state 字段中存儲的不一樣信息 — mutexLocked、mutexStarving、mutexWoken 和 mutexWaiterShift:
計算最新的new以後,CAS更新,若是更新成功且old狀態是未被鎖狀態,而且鎖不處於飢餓狀態,就表明當前goroutine競爭成功並獲取到了鎖返回。(這也就是當前goroutine在正常模式下競爭時更容易得到鎖的緣由)
若是當前goroutine競爭失敗,會調用 sync.runtime_SemacquireMutex
使用信號量保證資源不會被兩個 Goroutine 獲取。sync.runtime_SemacquireMutex
會在方法中不斷調用嘗試獲取鎖並休眠當前 Goroutine 等待信號量的釋放,一旦當前 Goroutine 能夠獲取信號量,它就會馬上返回,sync.Mutex.Lock 方法的剩餘代碼也會繼續執行。
飢餓模式自己是爲了必定程度保證公平性而設計的模式。因此飢餓模式不會有自旋的操做,新的 Goroutine 在該狀態下不能獲取鎖、也不會進入自旋狀態,它們只會在隊列的末尾等待。
不論是正常模式仍是飢餓模式,獲取信號量,它就會從阻塞中馬上返回,並執行剩下代碼:
func (m *Mutex) Unlock() { // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) } } func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { // If there are no waiters or a goroutine has already // been woken or grabbed the lock, no need to wake anyone. // In starvation mode ownership is directly handed off from unlocking // goroutine to the next waiter. We are not part of this chain, // since we did not observe mutexStarving when we unlocked the mutex above. // So get off the way. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // Grab the right to wake someone. new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // Starving mode: handoff mutex ownership to the next waiter, and yield // our time slice so that the next waiter can start to run immediately. // Note: mutexLocked is not set, the waiter will set it after wakeup. // But mutex is still considered locked if mutexStarving is set, // so new coming goroutines won't acquire it. runtime_Semrelease(&m.sema, true, 1) } }
互斥鎖的解鎖過程 sync.Mutex.Unlock 與加鎖過程相比就很簡單,該過程會先使用 AddInt32 函數快速解鎖,這時會發生下面的兩種狀況:
sync.Mutex.unlockSlow
方法首先會校驗鎖狀態的合法性 — 若是當前互斥鎖已經被解鎖過了就會直接拋出異常 sync: unlock of unlocked mutex 停止當前程序。
在正常狀況下會根據當前互斥鎖的狀態,分別處理正常模式和飢餓模式下的互斥鎖: