互斥鎖對於平常使用來講很是簡單, 可是sync.Mutex
裏的狀態變動, 併發控制, 原子操做, 循環體等表示很複雜, 讓我探究一下里面是什麼葫蘆藥呢!併發
mutex.Lock()
裏的流程很簡單, 只是判斷m.state
能不能用atomic.CompareAndSwapInt32
上鎖, 能夠就直接退出, 不能則執行lockSlow()
函數, 以下圖: 函數
lockSlow()
是個既複雜又重要的函數, 只要不是即時能獲取鎖的都會到這裏來.ui
在開始時先初始化幾個變量: waitStartTime
atom
waitStartTime int64 // 開始等待時間(納秒), 用於判斷是新來的g仍是喚醒的g, 還用於判斷能不能切換飢餓模式.
starving := false // 當前是否飢餓
awoke := false // 當前是否已喚醒
iter := 0 // 自旋次數
old := m.state // 最近一次獲取的狀態
複製代碼
上碼:spa
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
...(同步局部變量喚醒)
runtime_doSpin()
iter++
old = m.state
continue
}
複製代碼
這段代碼用來執行自旋, 不過執行前要先判斷能不能自旋, 條件比較苛刻: 當前是正常模式且已鎖定 (old&(mutexLocked|mutexStarving) == mutexLocked
) 自旋次數小於5次 (runtime_canSpin(iter)
) cpu核數大於1個 (同上) P大於1 (同上) 有一個正在運行的P而且runq爲空. (同上) 執行自旋而且更新最近狀態, 直到不容許自旋.調試
new := old
if old&mutexStarving == 0 { // 非飢餓模式下才能加鎖
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 { // 已加鎖或者在飢餓模式下, 累計加上一個等待的g
new += 1 << mutexWaiterShift // 十進制: new += 8 (第四位開始就是等待數量)
}
if starving && old&mutexLocked != 0 { // 准許切換飢餓模式而且已鎖定
new |= mutexStarving // 設置飢餓模式
}
if awoke {
if new&mutexWoken == 0 {
// 喚醒狀態不一致
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken // mutexWoken位由 1 => 0 // 重置喚醒狀態
}
複製代碼
new
爲即將要改變狀態的變量, 對下的4個判斷用來對new
的計算. 涉及到承上啓下及併發邏輯, 第一次看應該比較混亂.code
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// ...下面代碼
} else{
old = m.state
}
複製代碼
若是對比交換值m.state
失敗, 則表明m.state
被其它修改, 只能賦上新的狀態並從新循環一次. 若是成功則進入如下代碼:cdn
if old&(mutexLocked|mutexStarving) == 0 { // 此處表示飢餓模式下不會獲取鎖
break // 已利用CAS獲取鎖
}
// waitStartTime 開始等待時間
// queueLifo 是否後入先出, 喚醒的g後入先出, 新來的g則排在隊列後面
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 運行時信號量互斥
// 等待喚醒
複製代碼
waitStartTime
不等於0表示喚醒的g, 不然表示新來的g runtime_SemacquireMutex()
進入內部信號量互斥(不開放), 實際上跟channel
的阻塞原理是同樣的, 都是經過goparkunlock
實現. (具體看runtime.sync_runtime_SemacquireMutex()
)blog
// 若是大於1毫秒, 準可切換飢餓模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state // 獲取最新狀態, 睡眠前與喚醒後的狀態有可能不一致
if old&mutexStarving != 0 { // 若是已是飢餓模式
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
// 狀態不一致
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift) // -7: 減少一個等待者並設置爲鎖定狀態
if !starving || old>>mutexWaiterShift == 1 { // 若是當前g不許可飢餓模式且只有一個等待者
delta -= mutexStarving // -11: 在delta上再退出飢餓模式
}
atomic.AddInt32(&m.state, delta)
break // 退出循環(即當前g已獲取鎖)
}
awoke = true // 表明當前已經是喚醒後的g
iter = 0 // 重置自旋次數
複製代碼
中間的判斷只要是進入飢餓模式都能獲取鎖, 新來的g永遠排在後面.隊列
Lock()
與
Unlock()
都容易閱讀理解,
new
表示爲有多個等待者.
unlockSlow()
也相對比
lockSlow()
簡單多了.
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
if old>>mutexWaiterShift == 0 ||
old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
runtime_Semrelease(&m.sema, true, 1)
}
}
複製代碼
非飢餓模式下: 若是沒有等待者, 或者m.state
帶有狀態(新g搶到鎖), 直接返回. 不然喚醒一個g繼續執行.
處於飢餓模式下: rumtime_Semrelease
的handoff
爲真, 表示須要阻塞其餘的g, 並以優先級執行等待隊列的g.
原理很簡單, 實現確很難. 併發時控制m.state
既複雜又重要的事情, 看多幾回源碼和調試就能知道處理併發時的畫面, 調試時注意看m.state
的變化. sync.Mutex
的鎖實現依賴着信號量, 都在這個文件實現rumtime/sema.go
, semacquire1
和semrelease1
分別是獲取鎖和釋放鎖, 代碼比sync.Mutex
更加複雜.