互斥鎖是併發程序中對共享資源進行訪問控制的主要手段,Mutex是go語言提供的簡單易用的互斥鎖。Mutex的結構很簡單,暴露的方法也只有2個,一個加鎖 一個解鎖。那麼咱們天天用的Mutex互斥鎖是如何實現的呢?
其實使用的是go語言automic包中的院子操做,具體如何使用能夠參考以前寫的文章。
在Mutex中的state是狀態碼,在mutex中把state分紅4段。以下圖:segmentfault
互斥鎖的實現其實就是爭奪Locked,當goroutineA 搶到了鎖以後,第二個GoroutineB獲取鎖則會被阻塞等到GoroutineA釋放鎖以後GoroutineB將會被喚醒。固然具體實現則不會有這麼簡單,其中還有飢餓模式,自旋函數等一些概念。併發
加鎖時,若是當前Locked位爲1,說明該鎖當前由其餘協程持有,嘗試加鎖的協程並非立刻轉入阻塞,而是會持續的探測Locked位是否變爲0,這個過程即爲自旋過程。自旋時間很短,但若是在自旋過程當中發現鎖已被釋放,那麼協程能夠當即獲取鎖。此時即使有協程被喚醒也沒法獲取鎖,只能再次阻塞。
自旋的好處是,當加鎖失敗時沒必要當即轉入阻塞,有必定機會獲取到鎖,這樣能夠避免協程的切換。函數
若是自旋過程當中得到鎖,則立刻執行該goroutine。若是永遠在自旋模式中那麼以前阻塞的goroutine則很難得到鎖,這樣一來一些goroutine則會被阻塞時間過長。如何解決這個問題,go mutex中引入了兩種模式,具體請看下文。源碼分析
在普通模式下等待者以 FIFO 的順序排隊來獲取鎖,但被喚醒的等待者發現並無獲取到 mutex,而且還要與新到達的 goroutine 們競爭 mutex 的全部權。ui
在飢餓模式下,mutex 的全部權直接從對 mutex 執行解鎖的 goroutine 傳遞給等待隊列前面的等待者。新到達的 goroutine 們不要嘗試去獲取 mutex,即便它看起來是在解鎖狀態,也不要試圖自旋。this
type Mutex struct { // 狀態碼 state int32 // 信號量,用於向處於 Gwaitting 的 G 發送信號 sema uint32 } const( // 值=1 表示是否鎖住 1=鎖 0=未鎖 mutexLocked = 1 << iota // mutex is locked // 值=2 表示是否被喚醒 1=喚醒 0=未喚醒 mutexWoken // 是否爲飢渴模式(等待超過1秒則爲飢渴模式) mutexStarving // 右移3位,爲等待的數量 mutexWaiterShift = iota // 飢餓模式的時間 starvationThresholdNs = 1e6 )
func (m *Mutex) Lock() { // 利用atomic包中的cas操做判斷是否上鎖 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { // 判斷是否啓用了race檢測 if race.Enabled { race.Acquire(unsafe.Pointer(m)) } // m.state = 1 直接返回 其餘goroutine調用lock會發現已經被上鎖 return } // 等待時間 var waitStartTime int64 // 飢餓模式標誌位 starving := false // 喚醒標誌位 awoke := false // 自旋迭代的次數 iter := 0 // 保存 mutex 當前狀態 old := m.state // 循環 for { // 判斷 若是不是飢餓模式而且是否可以執行自旋函數(判斷自旋次數) // old&(0001|0100) == 0001 ==> old&0101 // 當old爲0001爲非飢餓模式 0001 == 0001 true 當old爲0101飢餓模式 0101 == 0001 false // runtime_canSpin 判斷自旋少於4次,而且是多核機器上而且GOMAXPROCS>1 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 判斷條件: // 未被喚醒 && 等待數量不爲0 && 使用CAS設置狀態爲已喚醒 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { // 設置激活爲true awoke = true } // 自旋函數 自旋次數+1 runtime_doSpin() iter++ old = m.state continue } // 若是不能執行自旋函數 記錄一個new狀態 而後判斷改變new 最終使用CAS替換嘗試設置state屬性 new := old // 當前的mutex.state處於正常模式,則將new的鎖位設置爲1 if old&mutexStarving == 0 { new |= mutexLocked } // 若是當前鎖鎖狀態爲鎖定狀態或者處於飢餓模式,則將等待的線程數量+1 if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // 若是starving變量爲true而且處於鎖定狀態,則new的飢餓狀態位打開 if starving && old&mutexLocked != 0 { new |= mutexStarving } // 對於狀態的驗證 if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } // new已經判斷設置完,若是mutex的state沒有變更過的話 則替換成new if atomic.CompareAndSwapInt32(&m.state, old, new) { // 若是未被鎖定而且並非出於飢餓狀態 退出循環 goroutine獲取到鎖 if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // 若是當前的 goroutine 以前已經在排隊了,就排到隊列的前面。 queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 進入休眠狀態,等待信號喚醒後從新開始循環 若是queueLifo爲true,則將等待goroutine插入到隊列的前面 runtime_SemacquireMutex(&m.sema, queueLifo) // 計算等待時間 肯定 mutex 當前所處模式 // 此時這個goroutine已經被喚醒 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state // 判斷被喚醒的goroutine是否爲飢餓狀態 if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) // 當不是飢餓狀態或者等待數只有一個,則退出飢餓模式 if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } // 若是不是飢餓模式 讓新到來的 goroutine 先獲取鎖,繼續循環 awoke = true iter = 0 } else { // 若是CAS替換未能成功 則繼續循環 old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // 利用原子操做 設置state鎖位置爲0 new := atomic.AddInt32(&m.state, -mutexLocked) // 判斷狀態,給未加鎖的mutex解鎖,拋出錯誤 if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } // 判斷是否爲飢餓模式 if new&mutexStarving == 0 { // 正常狀態 old := new for { // 若是等待的goroutine爲零 || 已經被鎖定、喚醒、或者已經變成飢餓狀態 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // 更新new的值,減去等待數量 new = (old - 1<<mutexWaiterShift) | mutexWoken // 使用CAS 替換舊值 if atomic.CompareAndSwapInt32(&m.state, old, new) { // 若是替換成功 則恢復掛起的goroutine.r若是爲 true代表將喚醒第一個阻塞的goroutine runtime_Semrelease(&m.sema, false) return } old = m.state } } else { // 恢復掛起的goroutine.r若是爲 true代表將喚醒第一個阻塞的goroutine runtime_Semrelease(&m.sema, true) } }