【golang】sync.Mutex互斥鎖的實現原理

sync.Mutex是一個不可重入的排他鎖。 這點和Java不一樣,golang裏面的排它鎖是不可重入的。golang

當一個 goroutine 得到了這個鎖的擁有權後, 其它請求鎖的 goroutine 就會阻塞在 Lock 方法的調用上,直到鎖被釋放。數據結構

數據結構與狀態機

sync.Mutex 由兩個字段 state 和 sema 組成。其中 state 表示當前互斥鎖的狀態,而 sema 是用於控制鎖狀態的信號量。ide

type Mutex struct {
    state int32
    sema  uint32
}

須要強調的是Mutex一旦使用以後,必定不要作copy操做。函數

Mutex的狀態機比較複雜,使用一個int32來表示:性能

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken  //2
    mutexStarving //4
    mutexWaiterShift = iota //3
)
                                                                                             
32                                               3             2             1             0 
 |                                               |             |             |             | 
 |                                               |             |             |             | 
 v-----------------------------------------------v-------------v-------------v-------------+ 
 |                                               |             |             |             v 
 |                 waitersCount                  |mutexStarving| mutexWoken  | mutexLocked | 
 |                                               |             |             |             | 
 +-----------------------------------------------+-------------+-------------+-------------+

最低三位分別表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用來表示當前有多少個 Goroutine 等待互斥鎖的釋放:ui

在默認狀況下,互斥鎖的全部狀態位都是 0,int32 中的不一樣位分別表示了不一樣的狀態:this

  • mutexLocked — 表示互斥鎖的鎖定狀態;
  • mutexWoken — 表示從正常模式被從喚醒;
  • mutexStarving — 當前的互斥鎖進入飢餓狀態;
  • waitersCount — 當前互斥鎖上等待的 goroutine 個數;

爲了保證鎖的公平性,設計上互斥鎖有兩種狀態:正常狀態和飢餓狀態。atom

正常模式下,全部等待鎖的goroutine按照FIFO順序等待。喚醒的goroutine不會直接擁有鎖,而是會和新請求鎖的goroutine競爭鎖的擁有。新請求鎖的goroutine具備優點:它正在CPU上執行,並且可能有好幾個,因此剛剛喚醒的goroutine有很大可能在鎖競爭中失敗。在這種狀況下,這個被喚醒的goroutine會加入到等待隊列的前面。 若是一個等待的goroutine超過1ms沒有獲取鎖,那麼它將會把鎖轉變爲飢餓模式設計

飢餓模式下,鎖的全部權將從unlock的gorutine直接交給交給等待隊列中的第一個。新來的goroutine將不會嘗試去得到鎖,即便鎖看起來是unlock狀態, 也不會去嘗試自旋操做,而是放在等待隊列的尾部。code

若是一個等待的goroutine獲取了鎖,而且知足一如下其中的任何一個條件:(1)它是隊列中的最後一個;(2)它等待的時候小於1ms。它會將鎖的狀態轉換爲正常狀態。

正常狀態有很好的性能表現,飢餓模式也是很是重要的,由於它能阻止尾部延遲的現象。

Lock

func (m *Mutex) Lock() {
    // 若是mutex的state沒有被鎖,也沒有等待/喚醒的goroutine, 鎖處於正常狀態,那麼得到鎖,返回.
    // 好比鎖第一次被goroutine請求時,就是這種狀態。或者鎖處於空閒的時候,也是這種狀態。
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()
}

func (m *Mutex) lockSlow() {
    // 標記本goroutine的等待時間
    var waitStartTime int64
    // 本goroutine是否已經處於飢餓狀態
    starving := false
    // 本goroutine是否已喚醒
    awoke := false
    // 自旋次數
    iter := 0
    old := m.state
    for {
        // 第一個條件:1.mutex已經被鎖了;2.不處於飢餓模式(若是時飢餓狀態,自旋時沒有用的,鎖的擁有權直接交給了等待隊列的第一個。)
        // 嘗試自旋的條件:參考runtime_canSpin函數
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // 進入這裏確定是普通模式
            // 自旋的過程當中若是發現state尚未設置woken標識,則設置它的woken標識, 並標記本身爲被喚醒。
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
        
        // 到了這一步, state的狀態多是:
        // 1. 鎖尚未被釋放,鎖處於正常狀態
        // 2. 鎖尚未被釋放, 鎖處於飢餓狀態
        // 3. 鎖已經被釋放, 鎖處於正常狀態
        // 4. 鎖已經被釋放, 鎖處於飢餓狀態
        // 而且本gorutine的 awoke多是true, 也多是false (其它goutine已經設置了state的woken標識)
        
        // new 複製 state的當前狀態, 用來設置新的狀態
        // old 是鎖當前的狀態
        new := old
        
        // 若是old state狀態不是飢餓狀態, new state 設置鎖, 嘗試經過CAS獲取鎖,
        // 若是old state狀態是飢餓狀態, 則不設置new state的鎖,由於飢餓狀態下鎖直接轉給等待隊列的第一個.
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        // 將等待隊列的等待者的數量加1
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        
        // 若是當前goroutine已經處於飢餓狀態, 而且old state的已被加鎖,
        // 將new state的狀態標記爲飢餓狀態, 將鎖轉變爲飢餓狀態.
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        
         // 若是本goroutine已經設置爲喚醒狀態, 須要清除new state的喚醒標記, 由於本goroutine要麼得到了鎖,要麼進入休眠,
        // 總之state的新狀態再也不是woken狀態.
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }

        // 經過CAS設置new state值.
        // 注意new的鎖標記不必定是true, 也可能只是標記一下鎖的state是飢餓狀態.
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            
            // 若是old state的狀態是未被鎖狀態,而且鎖不處於飢餓狀態,
            // 那麼當前goroutine已經獲取了鎖的擁有權,返回
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // If we were already waiting before, queue at the front of the queue.
            // 設置並計算本goroutine的等待時間
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            // 既然未能獲取到鎖, 那麼就使用sleep原語阻塞本goroutine
            // 若是是新來的goroutine,queueLifo=false, 加入到等待隊列的尾部,耐心等待
            // 若是是喚醒的goroutine, queueLifo=true, 加入到等待隊列的頭部
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)

            // sleep以後,此goroutine被喚醒
            // 計算當前goroutine是否已經處於飢餓狀態.
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            // 獲得當前的鎖狀態
            old = m.state

            // 若是當前的state已是飢餓狀態
            // 那麼鎖應該處於Unlock狀態,那麼應該是鎖被直接交給了本goroutine
            if old&mutexStarving != 0 {
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                // 當前goroutine用來設置鎖,並將等待的goroutine數減1.
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                // 若是本goroutine是最後一個等待者,或者它並不處於飢餓狀態,
                // 那麼咱們須要把鎖的state狀態設置爲正常模式.
                if !starving || old>>mutexWaiterShift == 1 {
                     // 退出飢餓模式
                    delta -= mutexStarving
                }
                // 設置新state, 由於已經得到了鎖,退出、返回
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }
}

整個過程比較複雜,這裏總結一下一些重點:

  1. 若是鎖處於初始狀態(unlock, 正常模式),則經過CAS(0 -> Locked)獲取鎖;若是獲取失敗,那麼就進入slowLock的流程:

slowLock的獲取鎖流程有兩種模式: 飢餓模式 和 正常模式。

(1)正常模式

  1. mutex已經被locked了,處於正常模式下;
  2. 前 Goroutine 爲了獲取該鎖進入自旋的次數小於四次;
  3. 當前機器CPU核數大於1;
  4. 當前機器上至少存在一個正在運行的處理器 P 而且處理的運行隊列爲空;

知足上面四個條件的goroutine才能夠作自旋。自旋就會調用sync.runtime_doSpin 和 runtime.procyield 並執行 30 次的 PAUSE 指令,該指令只會佔用 CPU 並消耗 CPU 時間。

處理了自旋相關的特殊邏輯以後,互斥鎖會根據上下文計算當前互斥鎖最新的狀態new。幾個不一樣的條件分別會更新 state 字段中存儲的不一樣信息 — mutexLocked、mutexStarving、mutexWoken 和 mutexWaiterShift:

計算最新的new以後,CAS更新,若是更新成功且old狀態是未被鎖狀態,而且鎖不處於飢餓狀態,就表明當前goroutine競爭成功並獲取到了鎖返回。(這也就是當前goroutine在正常模式下競爭時更容易得到鎖的緣由)

若是當前goroutine競爭失敗,會調用 sync.runtime_SemacquireMutex 使用信號量保證資源不會被兩個 Goroutine 獲取。sync.runtime_SemacquireMutex 會在方法中不斷調用嘗試獲取鎖並休眠當前 Goroutine 等待信號量的釋放,一旦當前 Goroutine 能夠獲取信號量,它就會馬上返回,sync.Mutex.Lock 方法的剩餘代碼也會繼續執行。

(2) 飢餓模式

飢餓模式自己是爲了必定程度保證公平性而設計的模式。因此飢餓模式不會有自旋的操做,新的 Goroutine 在該狀態下不能獲取鎖、也不會進入自旋狀態,它們只會在隊列的末尾等待。

不論是正常模式仍是飢餓模式,獲取信號量,它就會從阻塞中馬上返回,並執行剩下代碼:

  1. 在正常模式下,這段代碼會設置喚醒和飢餓標記、重置迭代次數並從新執行獲取鎖的循環;
  2. 在飢餓模式下,當前 Goroutine 會得到互斥鎖,若是等待隊列中只存在當前 Goroutine,互斥鎖還會從飢餓模式中退出;

Unlock

func (m *Mutex) Unlock() {
    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        // Outlined slow path to allow inlining the fast path.
        // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
        m.unlockSlow(new)
    }
}

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            // If there are no waiters or a goroutine has already
            // been woken or grabbed the lock, no need to wake anyone.
            // In starvation mode ownership is directly handed off from unlocking
            // goroutine to the next waiter. We are not part of this chain,
            // since we did not observe mutexStarving when we unlocked the mutex above.
            // So get off the way.
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // Grab the right to wake someone.
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        // Starving mode: handoff mutex ownership to the next waiter, and yield
        // our time slice so that the next waiter can start to run immediately.
        // Note: mutexLocked is not set, the waiter will set it after wakeup.
        // But mutex is still considered locked if mutexStarving is set,
        // so new coming goroutines won't acquire it.
        runtime_Semrelease(&m.sema, true, 1)
    }
}

互斥鎖的解鎖過程 sync.Mutex.Unlock 與加鎖過程相比就很簡單,該過程會先使用 AddInt32 函數快速解鎖,這時會發生下面的兩種狀況:

  1. 若是該函數返回的新狀態等於 0,當前 Goroutine 就成功解鎖了互斥鎖;
  2. 若是該函數返回的新狀態不等於 0,這段代碼會調用 sync.Mutex.unlockSlow 方法開始慢速解鎖:

sync.Mutex.unlockSlow 方法首先會校驗鎖狀態的合法性 — 若是當前互斥鎖已經被解鎖過了就會直接拋出異常 sync: unlock of unlocked mutex 停止當前程序。

在正常狀況下會根據當前互斥鎖的狀態,分別處理正常模式和飢餓模式下的互斥鎖:

  • 在正常模式下,這段代碼會分別處理如下兩種狀況處理;
  1. 若是互斥鎖不存在等待者或者互斥鎖的 mutexLocked、mutexStarving、mutexWoken 狀態不都爲 0,那麼當前方法就能夠直接返回,不須要喚醒其餘等待者;
  2. 若是互斥鎖存在等待者,會經過 sync.runtime_Semrelease 喚醒等待者並移交鎖的全部權;
  • 在飢餓模式下,上述代碼會直接調用 sync.runtime_Semrelease 方法將當前鎖交給下一個正在嘗試獲取鎖的等待者,等待者被喚醒後會獲得鎖,在這時互斥鎖還不會退出飢餓狀態;
相關文章
相關標籤/搜索