Go Mutex 源碼學習

概述

互斥鎖是併發程序中對共享資源進行訪問控制的主要手段,Mutex是go語言提供的簡單易用的互斥鎖。Mutex的結構很簡單,暴露的方法也只有2個,一個加鎖 一個解鎖。那麼咱們天天用的Mutex互斥鎖是如何實現的呢?
其實使用的是go語言automic包中的院子操做,具體如何使用能夠參考以前寫的文章
在Mutex中的state是狀態碼,在mutex中把state分紅4段。以下圖:
圖片描述segmentfault

  • Locked:表示是否上鎖 上鎖爲1 未上鎖爲0
  • Woken:表示是否被喚醒,喚醒爲1 未喚醒爲0
  • Starving:表示是否爲飢餓模式,飢餓模式爲1 非飢餓模式爲0
  • waiter:剩餘的29位則爲等待的goroutine數量

互斥鎖的實現其實就是爭奪Locked,當goroutineA 搶到了鎖以後,第二個GoroutineB獲取鎖則會被阻塞等到GoroutineA釋放鎖以後GoroutineB將會被喚醒。固然具體實現則不會有這麼簡單,其中還有飢餓模式,自旋函數等一些概念。併發

前置概念

自旋

什麼是自旋

加鎖時,若是當前Locked位爲1,說明該鎖當前由其餘協程持有,嘗試加鎖的協程並非立刻轉入阻塞,而是會持續的探測Locked位是否變爲0,這個過程即爲自旋過程。自旋時間很短,但若是在自旋過程當中發現鎖已被釋放,那麼協程能夠當即獲取鎖。此時即使有協程被喚醒也沒法獲取鎖,只能再次阻塞。
自旋的好處是,當加鎖失敗時沒必要當即轉入阻塞,有必定機會獲取到鎖,這樣能夠避免協程的切換。函數

自旋的問題

若是自旋過程當中得到鎖,則立刻執行該goroutine。若是永遠在自旋模式中那麼以前阻塞的goroutine則很難得到鎖,這樣一來一些goroutine則會被阻塞時間過長。如何解決這個問題,go mutex中引入了兩種模式,具體請看下文。源碼分析

Mutex的兩種模式

普通模式

在普通模式下等待者以 FIFO 的順序排隊來獲取鎖,但被喚醒的等待者發現並無獲取到 mutex,而且還要與新到達的 goroutine 們競爭 mutex 的全部權。ui

飢餓模式

在飢餓模式下,mutex 的全部權直接從對 mutex 執行解鎖的 goroutine 傳遞給等待隊列前面的等待者。新到達的 goroutine 們不要嘗試去獲取 mutex,即便它看起來是在解鎖狀態,也不要試圖自旋。this

源碼分析

Mutex對象

type Mutex struct {
    // 狀態碼
    state int32
    // 信號量,用於向處於 Gwaitting 的 G 發送信號
    sema  uint32
}

const(
    // 值=1 表示是否鎖住 1=鎖 0=未鎖
    mutexLocked = 1 << iota // mutex is locked
    // 值=2 表示是否被喚醒 1=喚醒  0=未喚醒
    mutexWoken
    // 是否爲飢渴模式(等待超過1秒則爲飢渴模式)
    mutexStarving
    // 右移3位,爲等待的數量
    mutexWaiterShift = iota
    // 飢餓模式的時間 
    starvationThresholdNs = 1e6
)

加鎖

func (m *Mutex) Lock() {
    // 利用atomic包中的cas操做判斷是否上鎖
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        // 判斷是否啓用了race檢測
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        // m.state = 1 直接返回 其餘goroutine調用lock會發現已經被上鎖
        return
    }
    
    // 等待時間
    var waitStartTime int64
    // 飢餓模式標誌位
    starving := false
    // 喚醒標誌位
    awoke := false
    // 自旋迭代的次數
    iter := 0
    // 保存 mutex 當前狀態
    old := m.state
    // 循環
    for {
        // 判斷 若是不是飢餓模式而且是否可以執行自旋函數(判斷自旋次數)
        // old&(0001|0100) == 0001 ==> old&0101 
        // 當old爲0001爲非飢餓模式  0001 == 0001 true     當old爲0101飢餓模式  0101 == 0001 false  
        // runtime_canSpin 判斷自旋少於4次,而且是多核機器上而且GOMAXPROCS>1
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // 判斷條件:
            // 未被喚醒 && 等待數量不爲0 && 使用CAS設置狀態爲已喚醒 
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                // 設置激活爲true
                awoke = true
            }
            // 自旋函數 自旋次數+1
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
        
        // 若是不能執行自旋函數 記錄一個new狀態 而後判斷改變new 最終使用CAS替換嘗試設置state屬性
        new := old
        // 當前的mutex.state處於正常模式,則將new的鎖位設置爲1
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        // 若是當前鎖鎖狀態爲鎖定狀態或者處於飢餓模式,則將等待的線程數量+1
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        // 若是starving變量爲true而且處於鎖定狀態,則new的飢餓狀態位打開
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        // 對於狀態的驗證
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        // new已經判斷設置完,若是mutex的state沒有變更過的話 則替換成new 
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 若是未被鎖定而且並非出於飢餓狀態 退出循環 goroutine獲取到鎖
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // 若是當前的 goroutine 以前已經在排隊了,就排到隊列的前面。
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            // 進入休眠狀態,等待信號喚醒後從新開始循環 若是queueLifo爲true,則將等待goroutine插入到隊列的前面
            runtime_SemacquireMutex(&m.sema, queueLifo)
            // 計算等待時間 肯定 mutex 當前所處模式
            // 此時這個goroutine已經被喚醒 
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            // 判斷被喚醒的goroutine是否爲飢餓狀態
            if old&mutexStarving != 0 {
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                // 當不是飢餓狀態或者等待數只有一個,則退出飢餓模式
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            // 若是不是飢餓模式 讓新到來的 goroutine 先獲取鎖,繼續循環
            awoke = true
            iter = 0
        } else {
            // 若是CAS替換未能成功 則繼續循環
            old = m.state
        }
    }

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

解鎖

func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }

    // 利用原子操做 設置state鎖位置爲0
    new := atomic.AddInt32(&m.state, -mutexLocked)
    // 判斷狀態,給未加鎖的mutex解鎖,拋出錯誤
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    // 判斷是否爲飢餓模式
    if new&mutexStarving == 0 {
        // 正常狀態
        old := new
        for {
            // 若是等待的goroutine爲零 || 已經被鎖定、喚醒、或者已經變成飢餓狀態
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // 更新new的值,減去等待數量
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            // 使用CAS 替換舊值
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 若是替換成功 則恢復掛起的goroutine.r若是爲 true代表將喚醒第一個阻塞的goroutine
                runtime_Semrelease(&m.sema, false)
                return
            }
            old = m.state
        }
    } else {
        // 恢復掛起的goroutine.r若是爲 true代表將喚醒第一個阻塞的goroutine
        runtime_Semrelease(&m.sema, true)
    }
}
相關文章
相關標籤/搜索