探究sync.Mutex代碼流程細節

探究目的

互斥鎖對於平常使用來講很是簡單, 可是sync.Mutex裏的狀態變動, 併發控制, 原子操做, 循環體等表示很複雜, 讓我探究一下里面是什麼葫蘆藥呢!併發

Lock

mutex.Lock()裏的流程很簡單, 只是判斷m.state能不能用atomic.CompareAndSwapInt32上鎖, 能夠就直接退出, 不能則執行lockSlow()函數, 以下圖: 函數

lockSlow()是個既複雜又重要的函數, 只要不是即時能獲取鎖的都會到這裏來.ui

在開始時先初始化幾個變量: waitStartTimeatom

waitStartTime int64	    // 開始等待時間(納秒), 用於判斷是新來的g仍是喚醒的g, 還用於判斷能不能切換飢餓模式.
starving := false       // 當前是否飢餓
awoke := false          // 當前是否已喚醒
iter := 0              // 自旋次數
old := m.state         // 最近一次獲取的狀態
複製代碼
接下來進入循環體, 邏輯複雜只能拆分來分析:

上碼:spa

if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
    ...(同步局部變量喚醒)
    runtime_doSpin()
    iter++
    old = m.state
    continue
}
複製代碼

這段代碼用來執行自旋, 不過執行前要先判斷能不能自旋, 條件比較苛刻: 當前是正常模式且已鎖定 (old&(mutexLocked|mutexStarving) == mutexLocked) 自旋次數小於5次 (runtime_canSpin(iter)) cpu核數大於1個 (同上) P大於1 (同上) 有一個正在運行的P而且runq爲空. (同上) 執行自旋而且更新最近狀態, 直到不容許自旋.調試

new := old
if old&mutexStarving == 0 { // 非飢餓模式下才能加鎖
    new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 { // 已加鎖或者在飢餓模式下, 累計加上一個等待的g
    new += 1 << mutexWaiterShift		 // 十進制: new += 8 (第四位開始就是等待數量)
}
if starving && old&mutexLocked != 0 { // 准許切換飢餓模式而且已鎖定
    new |= mutexStarving // 設置飢餓模式
}
if awoke {
    if new&mutexWoken == 0 {
        // 喚醒狀態不一致
        throw("sync: inconsistent mutex state")
    }
    new &^= mutexWoken // mutexWoken位由 1 => 0 // 重置喚醒狀態
}
複製代碼

new爲即將要改變狀態的變量, 對下的4個判斷用來對new的計算. 涉及到承上啓下及併發邏輯, 第一次看應該比較混亂.code

if atomic.CompareAndSwapInt32(&m.state, old, new) {
    // ...下面代碼
} else{ 
    old = m.state
}
複製代碼

若是對比交換值m.state失敗, 則表明m.state被其它修改, 只能賦上新的狀態並從新循環一次. 若是成功則進入如下代碼:cdn

if old&(mutexLocked|mutexStarving) == 0 { // 此處表示飢餓模式下不會獲取鎖
    break // 已利用CAS獲取鎖
}
// waitStartTime 開始等待時間
// queueLifo 是否後入先出, 喚醒的g後入先出, 新來的g則排在隊列後面
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
    waitStartTime = runtime_nanotime()
}

runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 運行時信號量互斥
// 等待喚醒
複製代碼

waitStartTime不等於0表示喚醒的g, 不然表示新來的g runtime_SemacquireMutex()進入內部信號量互斥(不開放), 實際上跟channel的阻塞原理是同樣的, 都是經過goparkunlock實現. (具體看runtime.sync_runtime_SemacquireMutex())blog

// 若是大於1毫秒, 準可切換飢餓模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs 
old = m.state // 獲取最新狀態, 睡眠前與喚醒後的狀態有可能不一致
if old&mutexStarving != 0 { // 若是已是飢餓模式
    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
        // 狀態不一致
        throw("sync: inconsistent mutex state")
    }
    delta := int32(mutexLocked - 1<<mutexWaiterShift) // -7: 減少一個等待者並設置爲鎖定狀態
    if !starving || old>>mutexWaiterShift == 1 {     // 若是當前g不許可飢餓模式且只有一個等待者
        delta -= mutexStarving  // -11: 在delta上再退出飢餓模式
    }
    atomic.AddInt32(&m.state, delta)
    break   // 退出循環(即當前g已獲取鎖)
}
awoke = true // 表明當前已經是喚醒後的g
iter = 0    // 重置自旋次數
複製代碼

中間的判斷只要是進入飢餓模式都能獲取鎖, 新來的g永遠排在後面.隊列

Unlock

Lock()Unlock()都容易閱讀理解, new表示爲有多個等待者. unlockSlow()也相對比 lockSlow()簡單多了.

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            if old>>mutexWaiterShift == 0 || 
            old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // Grab the right to wake someone.
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        runtime_Semrelease(&m.sema, true, 1)
    }
}
複製代碼

非飢餓模式下: 若是沒有等待者, 或者m.state帶有狀態(新g搶到鎖), 直接返回. 不然喚醒一個g繼續執行.

處於飢餓模式下: rumtime_Semreleasehandoff爲真, 表示須要阻塞其餘的g, 並以優先級執行等待隊列的g.

小結

原理很簡單, 實現確很難. 併發時控制m.state既複雜又重要的事情, 看多幾回源碼和調試就能知道處理併發時的畫面, 調試時注意看m.state的變化. sync.Mutex的鎖實現依賴着信號量, 都在這個文件實現rumtime/sema.go, semacquire1semrelease1分別是獲取鎖和釋放鎖, 代碼比sync.Mutex更加複雜.

相關文章
相關標籤/搜索