目前GO已經更新到了1.14的版本
我們通常人若是直接去看mutex的源碼的話,實際上是比較難理解爲何寫成了如今這個樣子,尤爲是加鎖裏面的各類邏輯判斷太多了,各類位運算一臉懵逼,其實咱們只要掌握它最初的設計思想,那麼後面新增的邏輯,理解起來都很簡單了。git
Mutex初版代碼加上註釋不過才109行。很是精簡,下面介紹一下我對初版Mutex源碼的理解github
// Mutex有state和sema兩個成員變量,這一點是在1.14沒有變化的 // 其中 state 字段表明當前鎖的狀態,sema是控制鎖狀態的信號量,主要關注state就行 // // state 比較複雜,state一共32位 // 最低位表明 locked狀態, 0表示未上鎖,1表示上鎖 // 倒數第二位 woken狀態,0 表示未喚醒,1表示已喚醒 // 剩餘30位用於表示當前有多少個goroutine等待互斥鎖的釋放,表明最多支持2^30個goroutine type Mutex struct { state int32 sema uint32 }
咱們接下來看它的Lock方法golang
func (m *Mutex) Lock() { // 首先直接CAS嘗試獲取鎖 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if raceenabled { raceAcquire(unsafe.Pointer(m)) } // 上鎖成功後,直接返回 return } // CAS獲取鎖失敗 // awoke 默認是未喚醒狀態 awoke := false for { // 當前state賦值給old old := m.state // 給old上鎖 new := old | mutexLocked // 若是old自己就已經上了鎖的話 if old&mutexLocked != 0 { // goroutine等待數 + 1 new = old + 1<<mutexWaiterShift } // 若是當前g被喚醒了 if awoke { // 把woken標記清除掉 new &^= mutexWoken } // 更新一下當前鎖的狀態 if atomic.CompareAndSwapInt32(&m.state, old, new) { // 若是old自己就是解鎖狀態 if old&mutexLocked == 0 { // 那麼表明搶鎖成功,直接退出for循環 break } // 不是解鎖狀態 // 嘗試獲取信號量,進入等待隊列,等待被喚醒 runtime_Semacquire(&m.sema) // 被喚醒,awoke設置true,繼續for循環 awoke = true } } if raceenabled { raceAcquire(unsafe.Pointer(m)) } }
簡單總結一下Lock的邏輯,分幾種狀況說明一下多線程
第一種狀況:第一次上鎖的時候,直接走第一步CAS上鎖,成功返回ide
第二種狀況:Mutex已經被另外一個g上鎖,那麼state的g等待數+1,更新當前的鎖狀態,而後就進入隊列,等待被喚醒,等到另個一g調用了Unlock方法以後,當前g被喚醒,而後設置awoken=true,再執行一遍for循環,此時locked位就是未上鎖狀態(0),new就是表明上鎖,而後清除woken位,而後再CAS更新new到state上,由於以前的鎖是未上鎖狀態,那麼就表明搶鎖成功,break,返回oop
第三種狀況:和第二種同樣,只不過,在CAS更新new到state上時,有其餘g先改掉了state的值,那麼就繼續for循環,而後重複到第二種狀況。性能
接下來看下Unlock方法優化
func (m *Mutex) Unlock() { if raceenabled { _ = m.state raceRelease(unsafe.Pointer(m)) } // 一開始也是直接去掉加鎖狀態 new := atomic.AddInt32(&m.state, -mutexLocked) // 判斷一下是否解鎖了一個未加鎖的Mutex if (new+mutexLocked)&mutexLocked == 0 { // 直接panic panic("sync: unlock of unlocked mutex") } // 把解鎖後的值賦值給old old := new for { // 若是此時沒有須要等待獲取鎖的G // 或者當前Mutex已經被搶鎖成功或者已經有被喚醒的G,那麼就能夠直接返回 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { return } // g等待數-1,而後設置喚醒標記位 new = (old - 1<<mutexWaiterShift) | mutexWoken // 更新Mutex的state的值 if atomic.CompareAndSwapInt32(&m.state, old, new) { // 手動喚醒一個被runtime_Semacquire阻塞的G runtime_Semrelease(&m.sema) // 返回 return } // 更新state失敗,說明有其餘G修改了state的值,那麼,從新賦值一下,再進行下一次循環 old = m.state } }
Unlock要比Lock簡單不少,因此這裏不總結了,看註釋就能明白ui
到這裏,最第一版本的Mutex源碼已經分析完了,關鍵仍是在上鎖的方法裏面。上鎖邏輯很是簡單粗暴,直接CAS獲取鎖,失敗就G等待數+1,而後進入隊列,等待被喚醒。this
那麼,若是仔細想一想,就會發現性能上仍是有能夠改進的地方。
咱們應用Mutex的時候,確定把鎖粒度控制的越小越好,那麼這樣的話就極可能會出現這麼一個問題,當第一次上鎖CAS失敗的時候,mutex已經被其餘G解鎖了,可是當前G就仍是直接進入隊列,等待被喚醒,這樣的話其實就會帶來額外的調度開銷。
因此,Mutex後面引進了自旋鎖的概念自旋鎖提交代碼
Currently sync.Mutex is fully cooperative. That is, once contention is discovered,
the goroutine calls into scheduler. This is suboptimal as the resource can become
free soon after (especially if critical sections are short). Server software
usually runs at ~~50% CPU utilization, that is, switching to other goroutines
is not necessary profitable.This change adds limited active spinning to sync.Mutex if:
- running on a multicore machine and
- GOMAXPROCS>1 and
- there is at least one other running P and
- local runq is empty. As opposed to runtime mutex we don't do passive spinning, because there can be work on global runq on on other
Ps.
簡單歸納一下,就是爲了解決鎖粒度很是小的時候,給系統帶來的沒必要要的調度開銷
不過自旋要先知足幾個條件
首先程序要跑在多核的機器上,而後GOMAXPROCS要大於1,而且此時有至少一個P的local runq是空的,才能進入到自旋的狀態自旋是一種多線程同步機制,當前的進程在進入自旋的過程當中會一直保持 CPU 的佔用,持續檢查某個條件是否爲真。在多核的 CPU 上,自旋能夠避免 Goroutine 的切換,使用恰當會對性能帶來很大的增益,可是使用的不恰當就會拖慢整個程序,因此 Goroutine 進入自旋的條件很是苛刻
看一下更新以後的Lock方法
func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if raceenabled { raceAcquire(unsafe.Pointer(m)) } return } awoke := false iter := 0 // 自旋的次數( <= 4) for { old := m.state new := old | mutexLocked // 沒有解鎖 if old&mutexLocked != 0 { // 判斷是否知足自旋的狀態 if runtime_canSpin(iter) { // 當woken標記位沒有被設置,並且等待G數量不等於0,並設置woken標記位成功 // 這裏設置woken標記位的緣由是,通知Unlock不用去喚醒等待隊列裏面的G了 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { // 標記awoke=true awoke = true } // runtime_doSpin -> sync_runtime_doSpin // 每次自旋30個時鐘週期,最多120個週期 runtime_doSpin() iter++ // 再次執行for循環 continue } // 自旋結束以後,G等待數量+1 new = old + 1<<mutexWaiterShift } if awoke { // 這裏多了個判斷woken狀態不一致的邏輯 if new&mutexWoken == 0 { panic("sync: inconsistent mutex state") } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&mutexLocked == 0 { break } runtime_Semacquire(&m.sema) awoke = true iter = 0 // 重置iter } } if raceenabled { raceAcquire(unsafe.Pointer(m)) } }
相比於初版的Mutex,這裏只在加鎖的方法裏面增長了自旋鎖的邏輯
當Mutex已經上鎖的時候,當前G在知足自旋條件下,進入自旋狀態,在自旋中,其餘G解鎖了Mutex,那麼當前G就設置了woken標記位,這樣其餘G在Unlock的時候就不會去等待隊列裏面喚醒G了,而後當前G就瓜熟蒂落的搶到了鎖
這樣自旋鎖在鎖粒度很是小的場景下的能對其性能帶來必定的優化。
引入自旋鎖以後,又帶來了一個問題。就是G等待隊列的長尾問題。由於從等待隊列裏面被喚醒,而後再去搶鎖,對自己就在執行的G來講,被喚醒的G實際上是很難搶過當前執行的G的,這樣的話,等待隊列裏面的G,就會被餓死(長時間獲取不到鎖),這樣對等待隊列的G來講實際上是不公平的。
因此Mutex後面引入了飢餓模式飢餓模式代碼
本次代碼變更仍是挺大的
先看下提交者的介紹
Add new starvation mode for Mutex.
In starvation mode ownership is directly handed off from
unlocking goroutine to the next waiter. New arriving goroutines
don't compete for ownership.
Unfair wait time is now limited to 1ms.
Also fix a long standing bug that goroutines were requeued
at the tail of the wait queue. That lead to even more unfair
acquisition times with multiple waiters.
Performance of normal mode is not considerably affected.簡單歸納一下,就是解決了等待G隊列的長尾問題
飢餓模式下,直接由unlock把鎖交給等待隊列中排在第一位的G,同時,飢餓模式下,新進來的G不會參與搶鎖也不會進入自旋狀態,會直接進入等待隊列的尾部。
飢餓模式的觸發條件,當一個G等待鎖時間超過1毫秒時,Mutex切換到飢餓模式
飢餓模式的取消條件,當一個G獲取到鎖且在等待隊列的末尾,或者這個G獲取鎖的等待時間在1ms內,那麼Mutex切換回正常模式
帶來的改變
Mutex.state的倒數第三位,變成了mutexStarving標記位,0表示正常模式,1表示飢餓模式,與此同時,支持的最大等待G數量從2^30^個 變成了2^29^個
接下來仍是主要關注Lock方法,我只在新增的邏輯上添加註釋了,我直接貼1.14的Lock代碼,較1.9的版本沒什麼改變
func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } // Slow path (outlined so that the fast path can be inlined) // 這裏封裝了一下 m.lockSlow() } func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false // 默認是正常模式 awoke := false iter := 0 old := m.state for { // 當前Mutex在飢餓模式下已經被鎖了的話,當前G不進入自旋 // 只有Mutex在正常模式且被鎖了的狀況下,而且知足自旋的條件,纔會進入到自旋邏輯裏面 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexWoken flag to inform Unlock // to not wake other blocked goroutines. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old // 若是當前不是飢餓模式 if old&mutexStarving == 0 { // 加鎖 new |= mutexLocked } // 若是Mutex已經被鎖,或者是在飢餓模式 if old&(mutexLocked|mutexStarving) != 0 { // 等待的G數量+1 new += 1 << mutexWaiterShift } // The current goroutine switches mutex to starvation mode. // But if the mutex is currently unlocked, don't do the switch. // Unlock expects that starving mutex has waiters, which will not // be true in this case. // 若是已是飢餓模式,而且Mutex是被鎖的狀態 if starving && old&mutexLocked != 0 { // 切換成飢餓模式 new |= mutexStarving } if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } // 更新state值 if atomic.CompareAndSwapInt32(&m.state, old, new) { // 非飢餓模式下搶鎖成功 if old&(mutexLocked|mutexStarving) == 0 { // 退出 break // locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. // 若是以前已經設置過waitStartTime的話,queueLifo就是true了 queueLifo := waitStartTime != 0 // 沒有設置過,獲取下運行時間 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 阻塞,等待被喚醒 runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 若是等待時間超過1ms,設置starving = true,不然就是false starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state // 若是Mutex已是飢餓模式 if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. // 若是當前G是在飢餓模式下被喚醒的 // 加個判斷state是否正確設置的邏輯 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // delta = -7 (1..... 0111) delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { // 退出飢餓模式 delta -= mutexStarving } // 更新state atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
Unlock方法改動就很是小了
func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) } } func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } // 不是飢餓模式 if new&mutexStarving == 0 { old := new for { // G等待隊列==0,直接返回 // (或者,處於woken模式,直接返回 // 或者,處於locked模式,直接返回 // 或者處於飢餓模式,直接返回) if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // 喚醒G等待隊列的首個G runtime_Semrelease(&m.sema, true, 1) } }
Mutex通過兩次演進,都解決了不一樣的問題。Mutex用法很是簡單,裏面的原理不感興趣的話其實不必深究,知道個大概的邏輯就好了。
補充: mutex的等待G隊列的順序是FIFO 飢餓模式下,性能其實很低,主要就是爲了解決長尾問題的