源碼剖析golang中sync.Mutex

go語言以併發做爲其特性之一,併發必然會帶來對於資源的競爭,這時候咱們就須要使用go提供的sync.Mutex這把互斥鎖來保證臨界資源的訪問互斥。算法

既然常常會用這把鎖,那麼瞭解一下其內部實現,就能瞭解這把鎖適用什麼場景,特性如何了。編程

引子

在我第一次看這段代碼的時候,感受真的是驚爲天人,特別是整個Mutex只用到了兩個私有字段,以及一次CAS就加鎖的過程,這其中設計以及編程的理念真的讓我感受自愧不如。併發

在看sync.Mutex的代碼的時候,必定要記住,同時會有多個goroutine會來要這把鎖,因此鎖的狀態state是可能會一直更改的。app

鎖的性質

先說結論:sync.Mutex是把公平鎖。less

在源代碼中,有一段註釋:ide

// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.

看懂這段註釋對於咱們理解mutex這把鎖有很大的幫助,這裏面講了這把鎖的設計理念。大體意思以下:函數

// 公平鎖
//
// 鎖有兩種模式:正常模式和飢餓模式。
// 在正常模式下,全部的等待鎖的goroutine都會存在一個先進先出的隊列中(輪流被喚醒)
// 可是一個被喚醒的goroutine並非直接得到鎖,而是仍然須要和那些新請求鎖的(new arrivial)
// 的goroutine競爭,而這實際上是不公平的,由於新請求鎖的goroutine有一個優點——它們正在CPU上
// 運行,而且數量可能會不少。因此一個被喚醒的goroutine拿到鎖的機率是很小的。在這種狀況下,
// 這個被喚醒的goroutine會加入到隊列的頭部。若是一個等待的goroutine有超過1ms(寫死在代碼中)
// 都沒獲取到鎖,那麼就會把鎖轉變爲飢餓模式。
//
// 在飢餓模式中,鎖的全部權會直接從釋放鎖(unlock)的goroutine轉交給隊列頭的goroutine,
// 新請求鎖的goroutine就算鎖是空閒狀態也不會去獲取鎖,而且也不會嘗試自旋。它們只是排到隊列的尾部。
//
// 若是一個goroutine獲取到了鎖以後,它會判斷如下兩種狀況:
// 1. 它是隊列中最後一個goroutine;
// 2. 它拿到鎖所花的時間小於1ms;
// 以上只要有一個成立,它就會把鎖轉變回正常模式。

// 正常模式會有比較好的性能,由於即便有不少阻塞的等待鎖的goroutine,
// 一個goroutine也能夠嘗試請求屢次鎖。
// 飢餓模式對於防止尾部延遲來講很是的重要。

在下一步真正看源代碼以前,咱們必需要理解一點:當一個goroutine獲取到鎖的時候,有可能沒有競爭者,也有可能會有不少競爭者,那麼咱們就須要站在不一樣的goroutine的角度上去考慮goroutine看到的鎖的狀態和實際狀態、指望狀態之間的轉化。性能

字段定義

sync.Mutex只包含兩個字段:ui

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
    state int32
    sema    uint32
}

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving
    mutexWaiterShift = iota

    starvationThresholdNs = 1e6
)

其中state是一個表示鎖的狀態的字段,這個字段會同時被多個goroutine所共用(使用atomic.CAS來保證原子性),第0個bit(1)表示鎖已被獲取,也就是已加鎖,被某個goroutine擁有;第1個bit(2)表示有goroutine被喚醒,嘗試獲取鎖;第2個bit(4)標記這把鎖是否爲飢餓狀態。atom

sema字段就是用來喚醒goroutine所用的信號量。

Lock

在看代碼以前,咱們須要有一個概念:每一個goroutine也有本身的狀態,存在局部變量裏面(也就是函數棧裏面),goroutine有多是新到的、被喚醒的、正常的、飢餓的。

atomic.CAS

先瞻仰一下驚爲天人的一行代碼加鎖的CAS操做:

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    ...
}

這是第一段代碼,這段代碼調用了atomic包中的CompareAndSwapInt32這個方法來嘗試快速獲取鎖,這個方法的簽名以下:

// CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value.
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

意思是,若是addr指向的地址中存的值和old同樣,那麼就把addr中的值改成new並返回true;不然什麼都不作,返回false。因爲是atomic中的函數,因此是保證了原子性的。

咱們來具體看看CAS的實現(src/runtime/internal/atomic/asm_amd64.s):

// bool Cas(int32 *val, int32 old, int32 new)
// Atomically:
//    if(*val == old){
//        *val = new;
//        return 1;
//    } else
//        return 0;
// 這裏參數及返回值大小加起來是17,是由於一個指針在amd64下是8字節,
// 而後int32分別是佔用4字節,最後的返回值是bool佔用1字節,因此加起來是17
TEXT runtime∕internal∕atomic·Cas(SB),NOSPLIT,$0-17 
    // 爲何不把*val指針放到AX中呢?由於AX有特殊用處,
    // 在下面的CMPXCHGL裏面,會從AX中讀取要比較的其中一個數
    MOVQ    ptr+0(FP), BX
    // 因此AX要用來存參數old
    MOVL    old+8(FP), AX
    // 把new中的數存到寄存器CX中
    MOVL    new+12(FP), CX
    // 注意這裏了,這裏使用了LOCK前綴,因此保證操做是原子的
    LOCK
    // 0(BX) 能夠理解爲 *val
    // 把 AX中的數 和 第二個操做數 0(BX)——也就是BX寄存器所指向的地址中存的值 進行比較
    // 若是相等,就把 第一個操做數 CX寄存器中存的值 賦給 第二個操做數 BX寄存器所指向的地址
    // 並將標誌寄存器ZF設爲1
    // 不然將標誌寄存器ZF清零
    CMPXCHGL    CX, 0(BX)
    // SETE的做用是:
    // 若是Zero Flag標誌寄存器爲1,那麼就把操做數設爲1
    // 不然把操做數設爲0
    // 也就是說,若是上面的比較相等了,就返回true,不然爲false
    // ret+16(FP)表明了返回值的地址
    SETEQ    ret+16(FP)
    RET

若是看不懂也沒太大關係,只要知道這個函數的做用,以及這個函數是原子性的便可。

那麼這段代碼的意思就是:先看看這把鎖是否是空閒狀態,若是是的話,直接原子性地修改一下state爲已被獲取就好了。多麼簡潔(雖而後面的代碼並非……)!

主流程

接下來具體看主流程的代碼,代碼中有一些位運算看起來比較暈,我會試着用僞代碼在邊上註釋。

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }

    // 用來存當前goroutine等待的時間
    var waitStartTime int64
    // 用來存當前goroutine是否飢餓
    starving := false
    // 用來存當前goroutine是否已喚醒
    awoke := false
    // 用來存當前goroutine的循環次數(想想一個goroutine若是循環了2147483648次咋辦……)
    iter := 0
    // 複製一下當前鎖的狀態
    old := m.state
    // 自旋
    for {
        // 若是是飢餓狀況之下,就不要自旋了,由於鎖會直接交給隊列頭部的goroutine
        // 若是鎖是被獲取狀態,而且知足自旋條件(canSpin見後文分析),那麼就自旋等鎖
        // 僞代碼:if isLocked() and isNotStarving() and canSpin()
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // 將本身的狀態以及鎖的狀態設置爲喚醒,這樣當Unlock的時候就不會去喚醒其它被阻塞的goroutine了
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            // 進行自旋(分析見後文)
            runtime_doSpin()
            iter++
            // 更新鎖的狀態(有可能在自旋的這段時間以內鎖的狀態已經被其它goroutine改變)
            old = m.state
            continue
        }
        
        // 當走到這一步的時候,可能會有如下的狀況:
        // 1. 鎖被獲取+飢餓
        // 2. 鎖被獲取+正常
        // 3. 鎖空閒+飢餓
        // 4. 鎖空閒+正常
        
        // goroutine的狀態多是喚醒以及非喚醒
        
        // 複製一份當前的狀態,目的是根據當前狀態設置出指望的狀態,存在new裏面,
        // 而且經過CAS來比較以及更新鎖的狀態
        // old用來存鎖的當前狀態
        new := old

        // 若是說鎖不是飢餓狀態,就把指望狀態設置爲被獲取(獲取鎖)
        // 也就是說,若是是飢餓狀態,就不要把指望狀態設置爲被獲取
        // 新到的goroutine乖乖排隊去
        // 僞代碼:if isNotStarving()
        if old&mutexStarving == 0 {
            // 僞代碼:newState = locked
            new |= mutexLocked
        }
        // 若是鎖是被獲取狀態,或者飢餓狀態
        // 就把指望狀態中的等待隊列的等待者數量+1(其實是new + 8)
        // (會不會可能有三億個goroutine等待拿鎖……)
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        // 若是說當前的goroutine是飢餓狀態,而且鎖被其它goroutine獲取
        // 那麼將指望的鎖的狀態設置爲飢餓狀態
        // 若是鎖是釋放狀態,那麼就不用切換了
        // Unlock指望一個飢餓的鎖會有一些等待拿鎖的goroutine,而不僅是一個
        // 這種狀況下不會成立
        if starving && old&mutexLocked != 0 {
            // 指望狀態設置爲飢餓狀態
            new |= mutexStarving
        }
        // 若是說當前goroutine是被喚醒狀態,咱們須要reset這個狀態
        // 由於goroutine要麼是拿到鎖了,要麼是進入sleep了
        if awoke {
            // 若是說指望狀態不是woken狀態,那麼確定出問題了
            // 這裏看不懂不要緊,wake的邏輯在下面
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            // 這句就是把new設置爲非喚醒狀態
            // &^的意思是and not
            new &^= mutexWoken
        }
        // 經過CAS來嘗試設置鎖的狀態
        // 這裏多是設置鎖,也有多是隻設置爲飢餓狀態和等待數量
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 若是說old狀態不是飢餓狀態也不是被獲取狀態
            // 那麼表明當前goroutine已經經過CAS成功獲取了鎖
            // (能進入這個代碼塊表示狀態已改變,也就是說狀態是從空閒到被獲取)
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // 若是以前已經等待過了,那麼就要放到隊列頭
            queueLifo := waitStartTime != 0
            // 若是說以前沒有等待過,就初始化設置如今的等待時間
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            // 既然獲取鎖失敗了,就使用sleep原語來阻塞當前goroutine
            // 經過信號量來排隊獲取鎖
            // 若是是新來的goroutine,就放到隊列尾部
            // 若是是被喚醒的等待鎖的goroutine,就放到隊列頭部
            runtime_SemacquireMutex(&m.sema, queueLifo)
            
            // 這裏sleep完了,被喚醒
            
            // 若是當前goroutine已是飢餓狀態了
            // 或者當前goroutine已經等待了1ms(在上面定義常量)以上
            // 就把當前goroutine的狀態設置爲飢餓
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            // 再次獲取一下鎖如今的狀態
            old = m.state
            // 若是說鎖如今是飢餓狀態,就表明如今鎖是被釋放的狀態,當前goroutine是被信號量所喚醒的
            // 也就是說,鎖被直接交給了當前goroutine
            if old&mutexStarving != 0 {
                // 若是說當前鎖的狀態是被喚醒狀態或者被獲取狀態,或者說等待的隊列爲空
                // 那麼是不可能的,確定是出問題了,由於當前狀態確定應該有等待的隊列,鎖也必定是被釋放狀態且未喚醒
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                // 當前的goroutine得到了鎖,那麼就把等待隊列-1
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                // 若是當前goroutine非飢餓狀態,或者說當前goroutine是隊列中最後一個goroutine
                // 那麼就退出飢餓模式,把狀態設置爲正常
                if !starving || old>>mutexWaiterShift == 1 {
                    // Exit starvation mode.
                    // Critical to do it here and consider wait time.
                    // Starvation mode is so inefficient, that two goroutines
                    // can go lock-step infinitely once they switch mutex
                    // to starvation mode.
                    delta -= mutexStarving
                }
                // 原子性地加上改動的狀態
                atomic.AddInt32(&m.state, delta)
                break
            }
            // 若是鎖不是飢餓模式,就把當前的goroutine設爲被喚醒
            // 而且重置iter(重置spin)
            awoke = true
            iter = 0
        } else {
            // 若是CAS不成功,也就是說沒能成功得到鎖,鎖被別的goroutine得到了或者鎖一直沒被釋放
            // 那麼就更新狀態,從新開始循環嘗試拿鎖
            old = m.state
        }
    }

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

以上爲何CAS能拿到鎖呢?由於CAS會原子性地判斷old state和當前鎖的狀態是否一致;而總有一個goroutine會知足以上條件成功拿鎖。

canSpin

接下來咱們來看看上文提到的canSpin條件如何:

// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
    // 這裏的active_spin是個常量,值爲4
    // 簡單來講,sync.Mutex是有可能被多個goroutine競爭的,因此不該該大量自旋(消耗CPU)
    // 自旋的條件以下:
    // 1. 自旋次數小於active_spin(這裏是4)次;
    // 2. 在多核機器上;
    // 3. GOMAXPROCS > 1而且至少有一個其它的處於運行狀態的P;
    // 4. 當前P沒有其它等待運行的G;
    // 知足以上四個條件才能夠進行自旋。
    if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
        return false
    }
    if p := getg().m.p.ptr(); !runqempty(p) {
        return false
    }
    return true
}

因此能夠看出來,並非一直無限自旋下去的,當自旋次數到達4次或者其它條件不符合的時候,就改成信號量拿鎖了。

doSpin

而後咱們來看看doSpin的實現(其實也沒啥好看的):

//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
    procyield(active_spin_cnt)
}

這是一個彙編實現的函數,簡單看兩眼amd64上的實現:

TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL    cycles+0(FP), AX
again:
    PAUSE
    SUBL    $1, AX
    JNZ    again
    RET

看起來沒啥好看的,直接跳過吧。

Unlock

接下來咱們來看看Unlock的實現,對於Unlock來講,有兩個比較關鍵的特性:

  1. 若是說鎖不是處於locked狀態,那麼對鎖執行Unlock會致使panic;
  2. 鎖和goroutine沒有對應關係,因此咱們徹底能夠在goroutine 1中獲取到鎖,而後在goroutine 2中調用Unlock來釋放鎖(這是什麼騷操做!)(雖然不推薦你們這麼幹……)
func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }

    // Fast path: drop lock bit.
    // 這裏獲取到鎖的狀態,而後將狀態減去被獲取的狀態(也就是解鎖),稱爲new(指望)狀態
    // 注意以上兩個操做是原子的,因此不用擔憂多個goroutine併發的問題
    new := atomic.AddInt32(&m.state, -mutexLocked)
    // 若是說,指望狀態加上被獲取的狀態,不是被獲取的話
    // 那麼就panic
    // 在這裏給你們提一個問題:幹嗎要這麼大費周章先減去再加上,直接比較一下原來鎖的狀態是否被獲取不就完事了?
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    // 若是說new狀態(也就是鎖的狀態)不是飢餓狀態
    if new&mutexStarving == 0 {
        // 複製一下原先狀態
        old := new
        for {
            // 若是說鎖沒有等待拿鎖的goroutine
            // 或者鎖被獲取了(在循環的過程當中被其它goroutine獲取了)
            // 或者鎖是被喚醒狀態(表示有goroutine被喚醒,不須要再去嘗試喚醒其它goroutine)
            // 或者鎖是飢餓模式(會直接轉交給隊列頭的goroutine)
            // 那麼就直接返回,啥都不用作了
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // 走到這一步的時候,說明鎖目前仍是空閒狀態,而且沒有goroutine被喚醒且隊列中有goroutine等待拿鎖
            // 那麼咱們就要把鎖的狀態設置爲被喚醒,等待隊列-1
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            // 又是熟悉的CAS
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                // 若是狀態設置成功了,咱們就經過信號量去喚醒goroutine
                runtime_Semrelease(&m.sema, false)
                return
            }
            // 循環結束的時候,更新一下狀態,由於有可能在執行的過程當中,狀態被修改了(好比被Lock改成了飢餓狀態)
            old = m.state
        }
    } else {
        // 若是是飢餓狀態下,那麼咱們就直接把鎖的全部權經過信號量移交給隊列頭的goroutine就行了
        // handoff = true表示直接把鎖交給隊列頭部的goroutine
        // 注意:在這個時候,鎖被獲取的狀態沒有被設置,會由被喚醒的goroutine在喚醒後設置
        // 可是當鎖處於飢餓狀態的時候,咱們也認爲鎖是被獲取的(由於咱們手動指定了獲取的goroutine)
        // 因此說新來的goroutine不會嘗試去獲取鎖(在Lock中有體現)
        runtime_Semrelease(&m.sema, true)
    }
}

總結

根據以上代碼的分析,能夠看出,sync.Mutex這把鎖在你的工做負載(所需時間)比較低,好比只是對某個關鍵變量賦值的時候,性能仍是比較好的,可是若是說對於臨界資源的操做耗時很長(特別是單個操做就大於1ms)的話,實際上性能上會有必定的問題,這也就是咱們常常看到「的鎖一直處於飢餓狀態」的問題,對於這種狀況,可能就須要另尋他法了。

好了,至此整個sync.Mutex的分析就此結束了,雖然只有短短200行代碼(包括150行註釋,實際代碼估計就50行),可是其中的算法、設計的思想、編程的理念倒是值得感悟,所謂大道至簡、少便是多可能就是如此吧。

相關文章
相關標籤/搜索