golang 的metux 的實現有幾個點作法是很是有意思的,一個是底層數據結構上,用了平時不多用的位運算,第二個,用到了自旋,並作了自旋策略控制,最後是用了信號量控制協程。golang
首先是golang mutex 中用了不少位運算。位運算不作細介紹,對內存利用比較高的算法都有涉及,好比redies 的壓縮列表,好比golang 的Protobuffer。算法
有幾個關鍵點,iota 在定義的時候,用的多,作自增運算: 數據結構
const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexWaiterShift = iota ) // 這裏 第一個變量爲1 ,第二個變量爲10, 第三個爲10
而後,位運算的求或和求與用的不少,一個是與1 求或將某位置1,一個是與0 求與將某位置0,這些都是用於改變某些標誌位的方式,不要看懵逼了:函數
new := old | mutexLocked // 將old 的最後一位置1,表示new 鎖必定是被持有狀態 if old&mutexLocked != 0 { // 將最後一位保留後,其餘位所有置0, 判斷最後一位的狀態是否是0,判斷是否是被持有 if runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexWoken flag to inform Unlock // to not wake other blocked goroutines. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ continue } new = old + 1<<mutexWaiterShift }
第二個是golang 的加鎖會自旋,4次自旋沒拿到鎖後再將協程休眠,這樣能夠減小切換成本。這裏關判斷條件是自旋次數,cpu核數,p 的數量:ui
func sync_runtime_canSpin(i int) bool { if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }
最後是利用信號量掛起和喚醒協程,核心函數是atom
runtime_SemacquireMutex(&m.sema) runtime_Semrelease(&m.sema)
獲取信號時,當s > 0 ,將s--,若是s 爲負數,會將當前g 放入阻塞隊列,掛起直到s>0。code
func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return // cas 獲取到鎖,直接返回 } awoke := false //循環標記 iter := 0 //循環計數器 for { old := m.state //保存當前鎖狀態 new := old | mutexLocked //將狀態位最後一位指定1 if old&mutexLocked != 0 { //鎖被佔用 if runtime_canSpin(iter) { //檢查是否能夠進入自旋鎖,4次 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { //awoke標記爲true awoke = true } runtime_doSpin()//進入自旋 iter++ continue } new = old + 1<<mutexWaiterShift //鎖被佔用,且自旋次數超過4次,掛起協程數+1,下面步驟將g 掛起並等待 } if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken //清除標誌 } if atomic.CompareAndSwapInt32(&m.state, old, new) { //更新協程計數 if old&mutexLocked == 0 { break } // 鎖請求失敗,進入休眠狀態,等待信號喚醒後從新開始循環,一直阻塞在這裏 runtime_SemacquireMutex(&m.sema) awoke = true iter = 0 } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
解鎖的過程就和加鎖反過來便可:orm
func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } new := atomic.AddInt32(&m.state, -mutexLocked)// 移除加鎖位 if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } old := new for { //當休眠隊列內的等待計數爲0或者自旋狀態計數器爲0,退出 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { return } // 等待協程數-1,更改清除標記位 new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema)// 釋放鎖,發送釋放信號,對應以前的acquire return } old = m.state } }