淺談 Go 語言實現原理原文連接:https://draveness.me/golang/c...html
當提到併發編程、多線程編程時,咱們每每都離不開『鎖』這一律念,Go 語言做爲一個原生支持用戶態進程 Goroutine 的語言,也必定會爲開發者提供這一功能,鎖的主要做用就是保證多個線程或者 Goroutine 在訪問同一片內存時不會出現混亂的問題,鎖實際上是一種併發編程中的同步原語(Synchronization Primitives)。git
在這一節中咱們就會介紹 Go 語言中常見的同步原語 Mutex
、RWMutex
、WaitGroup
、Once
和 Cond
以及擴展原語 ErrGroup
、Semaphore
和 SingleFlight
的實現原理,同時也會涉及互斥鎖、信號量等併發編程中的常見概念。github
Go 語言在 sync 包中提供了用於同步的一些基本原語,包括常見的互斥鎖 Mutex
與讀寫互斥鎖 RWMutex
以及 Once
、WaitGroup
。golang
這些基本原語的主要做用是提供較爲基礎的同步功能,咱們應該使用 Channel 和通訊來實現更加高級的同步機制,咱們在這一節中並不會介紹標準庫中所有的原語,而是會介紹其中比較常見的 Mutex
、RWMutex
、Once
、WaitGroup
和 Cond
,咱們並不會涉及剩下兩個用於存取數據的結構體 Map
和 Pool
。數據庫
Go 語言中的互斥鎖在 sync
包中,它由兩個字段 state
和 sema
組成,state
表示當前互斥鎖的狀態,而 sema
真正用於控制鎖狀態的信號量,這兩個加起來只佔 8 個字節空間的結構體就表示了 Go 語言中的互斥鎖。編程
type Mutex struct { state int32 sema uint32 }
互斥鎖的狀態是用 int32
來表示的,可是鎖的狀態並非互斥的,它的最低三位分別表示 mutexLocked
、mutexWoken
和 mutexStarving
,剩下的位置都用來表示當前有多少個 Goroutine 等待互斥鎖被釋放:數組
互斥鎖在被建立出來時,全部的狀態位的默認值都是 0
,當互斥鎖被鎖定時 mutexLocked
就會被置成 1
、當互斥鎖被在正常模式下被喚醒時 mutexWoken
就會被被置成 1
、mutexStarving
用於表示當前的互斥鎖進入了狀態,最後的幾位是在當前互斥鎖上等待的 Goroutine 個數。緩存
在瞭解具體的加鎖和解鎖過程以前,咱們須要先簡單瞭解一下 Mutex
在使用過程當中可能會進入的飢餓模式,飢餓模式是在 Go 語言 1.9 版本引入的特性,它的主要功能就是保證互斥鎖的獲取的『公平性』(Fairness)。數據結構
互斥鎖能夠同時處於兩種不一樣的模式,也就是正常模式和飢餓模式,在正常模式下,全部鎖的等待者都會按照先進先出的順序獲取鎖,可是若是一個剛剛被喚起的 Goroutine 遇到了新的 Goroutine 進程也調用了 Lock
方法時,大機率會獲取不到鎖,爲了減小這種狀況的出現,防止 Goroutine 被『餓死』,一旦 Goroutine 超過 1ms 沒有獲取到鎖,它就會將當前互斥鎖切換飢餓模式。多線程
在飢餓模式中,互斥鎖會被直接交給等待隊列最前面的 Goroutine,新的 Goroutine 在這時不能獲取鎖、也不會進入自旋的狀態,它們只會在隊列的末尾等待,若是一個 Goroutine 得到了互斥鎖而且它是隊列中最末尾的協程或者它等待的時間少於 1ms,那麼當前的互斥鎖就會被切換回正常模式。
相比於飢餓模式,正常模式下的互斥鎖可以提供更好地性能,飢餓模式的主要做用就是避免一些 Goroutine 因爲陷入等待沒法獲取鎖而形成較高的尾延時,這也是對 Mutex
的一個優化。
互斥鎖 Mutex
的加鎖是靠 Lock
方法完成的,最新的 Go 語言源代碼中已經將 Lock
方法進行了簡化,方法的主幹只保留了最多見、簡單而且快速的狀況;當鎖的狀態是 0
時直接將 mutexLocked
位置成 1
:
func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } m.lockSlow() }
可是當 Lock
方法被調用時 Mutex
的狀態不是 0
時就會進入 lockSlow
方法嘗試經過自旋或者其餘的方法等待鎖的釋放並獲取互斥鎖,該方法的主體是一個很是大 for
循環,咱們會將該方法分紅幾個部分介紹獲取鎖的過程:
func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue }
在這段方法的第一部分會判斷當前方法可否進入自旋來等待鎖的釋放,自旋(Spinnig)實際上是在多線程同步的過程當中使用的一種機制,當前的進程在進入自旋的過程當中會一直保持 CPU 的佔用,持續檢查某個條件是否爲真,在多核的 CPU 上,自旋的優勢是避免了 Goroutine 的切換,因此若是使用恰當會對性能帶來很是大的增益。
在 Go 語言的 Mutex
互斥鎖中,只有在普通模式下才可能進入自旋,除了模式的限制以外,runtime_canSpin
方法中會判斷當前方法是否能夠進入自旋,進入自旋的條件很是苛刻:
P
而且處理的運行隊列是空的;一旦當前 Goroutine 可以進入自旋就會調用 runtime_doSpin
,它最終調用匯編語言編寫的方法 procyield
並執行指定次數的 PAUSE
指令,PAUSE
指令什麼都不會作,可是會消耗 CPU 時間,每次自旋都會調用 30
次 PAUSE
,下面是該方法在 386 架構的機器上的實現:
TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AX again: PAUSE SUBL $1, AX JNZ again RET
處理了自旋相關的特殊邏輯以後,互斥鎖接下來就根據上下文計算當前互斥鎖最新的狀態了,幾個不一樣的條件分別會更新 state
中存儲的不一樣信息 mutexLocked
、mutexStarving
、mutexWoken
和 mutexWaiterShift
:
new := old if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { new &^= mutexWoken }
計算了新的互斥鎖狀態以後,咱們就會使用 atomic
包提供的 CAS 函數修改互斥鎖的狀態,若是當前的互斥鎖已經處於飢餓和鎖定的狀態,就會跳過當前步驟,調用 runtime_SemacquireMutex
方法:
if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 1) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } }
runtime_SemacquireMutex
方法的主要做用就是經過 Mutex
的使用互斥鎖中的信號量保證資源不會被兩個 Goroutine 獲取,從這裏咱們就能看出 Mutex
其實就是對更底層的信號量進行封裝,對外提供更加易用的 API,runtime_SemacquireMutex
會在方法中不斷調用 goparkunlock
將當前 Goroutine 陷入休眠等待信號量能夠被獲取。
一旦當前 Goroutine 能夠獲取信號量,就證實互斥鎖已經被解鎖,該方法就會馬上返回,Lock
方法的剩餘代碼也會繼續執行下去了,當前互斥鎖處於飢餓模式時,若是該 Goroutine 是隊列中最後的一個 Goroutine 或者等待鎖的時間小於 starvationThresholdNs(1ms)
,當前 Goroutine 就會直接得到互斥鎖而且從飢餓模式中退出並得到鎖。
互斥鎖的解鎖過程相比之下就很是簡單,Unlock
方法會直接使用 atomic
包提供的 AddInt32
,若是返回的新狀態不等於 0
就會進入 unlockSlow
方法:
func (m *Mutex) Unlock() { new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { m.unlockSlow(new) } }
unlockSlow
方法首先會對鎖的狀態進行校驗,若是當前互斥鎖已經被解鎖過了就會直接拋出異常 sync: unlock of unlocked mutex
停止當前程序,在正常狀況下會根據當前互斥鎖的狀態是正常模式仍是飢餓模式進入不一樣的分支:
func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { runtime_Semrelease(&m.sema, true, 1) } }
若是當前互斥鎖的狀態是飢餓模式就會直接調用 runtime_Semrelease
方法直接將當前鎖交給下一個正在嘗試獲取鎖的等待者,等待者會在被喚醒以後設置 mutexLocked
狀態,因爲此時仍然處於 mutexStarving
,因此新的 Goroutine 也沒法得到鎖。
在正常模式下,若是當前互斥鎖不存在等待者或者最低三位表示的狀態都爲 0
,那麼當前方法就不須要喚醒其餘 Goroutine 能夠直接返回,當有 Goroutine 正在處於等待狀態時,仍是會經過 runtime_Semrelease
喚醒對應的 Goroutine 並移交鎖的全部權。
經過對互斥鎖 Mutex
加鎖和解鎖過程的分析,咱們可以得出如下的一些結論,它們可以幫助咱們更好地理解互斥鎖的工做原理,互斥鎖的加鎖的過程比較複雜,涉及自旋、信號量以及 Goroutine 調度等概念:
mutexLocked
加鎖;mutexLocked
而且在普通模式下工做,就會進入自旋,執行 30 次 PAUSE
指令消耗 CPU 時間等待鎖的釋放;1ms
,互斥鎖就會被切換到飢餓模式;runtime_SemacquireMutex
方法將調用 Lock
的 Goroutine 切換至休眠狀態,等待持有信號量的 Goroutine 喚醒當前協程;1ms
,當前 Goroutine 會將互斥鎖切換回正常模式;互斥鎖的解鎖過程相對來講就比較簡單,雖然對於普通模式和飢餓模式的處理有一些不一樣,可是因爲代碼行數很少,因此邏輯清晰,也很是容易理解:
Unlock
會直接拋出異常;mutexLocked
標誌位;runtime_Semrelease
喚醒對應的 Goroutine;讀寫互斥鎖也是 Go 語言 sync
包爲咱們提供的接口之一,一個常見的服務對資源的讀寫比例會很是高,若是大多數的請求都是讀請求,它們之間不會相互影響,那麼咱們爲何不能將對資源讀和寫操做分離呢?這也就是 RWMutex
讀寫互斥鎖解決的問題,不限制對資源的併發讀,可是讀寫、寫寫操做沒法並行執行。
讀 | 寫 | |
---|---|---|
讀 | Y | N |
寫 | N | N |
讀寫互斥鎖在 Go 語言中的實現是 RWMutex
,其中不只包含一個互斥鎖,還持有兩個信號量,分別用於寫等待讀和讀等待寫:
type RWMutex struct { w Mutex writerSem uint32 readerSem uint32 readerCount int32 readerWait int32 }
readerCount
存儲了當前正在執行的讀操做的數量,最後的 readerWait
表示當寫操做被阻塞時等待的讀操做個數。
讀鎖的加鎖很是簡單,咱們經過 atomic.AddInt32
方法爲 readerCount
加一,若是該方法返回了負數說明當前有 Goroutine 得到了寫鎖,當前 Goroutine 就會調用 runtime_SemacquireMutex
陷入休眠等待喚醒:
func (rw *RWMutex) RLock() { if atomic.AddInt32(&rw.readerCount, 1) < 0 { runtime_SemacquireMutex(&rw.readerSem, false, 0) } }
若是沒有寫操做獲取當前互斥鎖,當前方法就會在 readerCount
加一後返回;當 Goroutine 想要釋放讀鎖時會調用 RUnlock
方法:
func (rw *RWMutex) RUnlock() { if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { rw.rUnlockSlow(r) } }
該方法會在減小正在讀資源的 readerCount
,當前方法若是遇到了返回值小於零的狀況,說明有一個正在進行的寫操做,在這時就應該經過 rUnlockSlow
方法減小當前寫操做等待的讀操做數 readerWait
並在全部讀操做都被釋放以後觸發寫操做的信號量 writerSem
:
func (rw *RWMutex) rUnlockSlow(r int32) { if r+1 == 0 || r+1 == -rwmutexMaxReaders { throw("sync: RUnlock of unlocked RWMutex") } if atomic.AddInt32(&rw.readerWait, -1) == 0 { runtime_Semrelease(&rw.writerSem, false, 1) } }
writerSem
在被觸發以後,嘗試獲取讀寫鎖的進程就會被喚醒並得到鎖。
當資源的使用者想要獲取讀寫鎖時,就須要經過 Lock
方法了,在 Lock
方法中首先調用了讀寫互斥鎖持有的 Mutex
的 Lock
方法保證其餘獲取讀寫鎖的 Goroutine 進入等待狀態,隨後的 atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders)
實際上是爲了阻塞後續的讀操做:
func (rw *RWMutex) Lock() { rw.w.Lock() r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) } }
若是當前仍然有其餘 Goroutine 持有互斥鎖的讀鎖,該 Goroutine 就會調用 runtime_SemacquireMutex
進入休眠狀態,等待讀鎖釋放時觸發 writerSem
信號量將當前協程喚醒。
對資源的讀寫操做完成以後就會將經過 atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
變回正數並經過 for 循環觸發全部因爲獲取讀鎖而陷入等待的 Goroutine:
func (rw *RWMutex) Unlock() { r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { throw("sync: Unlock of unlocked RWMutex") } for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } rw.w.Unlock() }
在方法的最後,RWMutex
會釋放持有的互斥鎖讓其餘的協程可以從新獲取讀寫鎖。
相比狀態複雜的互斥鎖 Mutex
來講,讀寫互斥鎖 RWMutex
雖然提供的功能很是複雜,可是因爲站在了 Mutex
的『肩膀』上,因此總體的實現上會簡單不少。
readerSem
— 讀寫鎖釋放時通知因爲獲取讀鎖等待的 Goroutine;writerSem
— 讀鎖釋放時通知因爲獲取讀寫鎖等待的 Goroutine;w
互斥鎖 — 保證寫操做之間的互斥;readerCount
— 統計當前進行讀操做的協程數,觸發寫鎖時會將其減小 rwmutexMaxReaders
阻塞後續的讀操做;readerWait
— 當前讀寫鎖等待的進行讀操做的協程數,在觸發 Lock
以後的每次 RUnlock
都會將其減一,當它歸零時該 Goroutine 就會得到讀寫鎖;Unlock
時首先會通知全部的讀操做,而後纔會釋放持有的互斥鎖,這樣可以保證讀操做不會被連續的寫操做『餓死』;RWMutex
在 Mutex
之上提供了額外的讀寫分離功能,可以在讀請求遠遠多於寫請求時提供性能上的提高,咱們也能夠在場景合適時選擇讀寫互斥鎖。
WaitGroup
是 Go 語言 sync
包中比較常見的同步機制,它能夠用於等待一系列的 Goroutine 的返回,一個比較常見的使用場景是批量執行 RPC 或者調用外部服務:
requests := []*Request{...} wg := &sync.WaitGroup{} wg.Add(len(requests)) for _, request := range requests { go func(r *Request) { defer wg.Done() // res, err := service.call(r) }(request) } wg.Wait()
經過 WaitGroup
咱們能夠在多個 Goroutine 之間很是輕鬆地同步信息,本來順序執行的代碼也能夠在多個 Goroutine 中併發執行,加快了程序處理的速度,在上述代碼中只有在全部的 Goroutine 都執行完畢以後 Wait
方法纔會返回,程序能夠繼續執行其餘的邏輯。
總而言之,它的做用就像它的名字同樣,,經過 Done
來傳遞任務完成的信號,比較經常使用於等待一組 Goroutine 中併發執行的任務所有結束。
WaitGroup
結構體中的成員變量很是簡單,其中的 noCopy
的主要做用就是保證 WaitGroup
不會被開發者經過再賦值的方式進行拷貝,進而致使一些詭異的行爲:
type WaitGroup struct { noCopy noCopy state1 [3]uint32 }
copylock 包就是一個用於檢查相似錯誤的分析器,它的原理就是在 編譯期間 檢查被拷貝的變量中是否包含 noCopy
或者 sync
關鍵字,若是包含當前關鍵字就會報出如下的錯誤:
package main import ( "fmt" "sync" ) func main() { wg := sync.Mutex{} yawg := wg fmt.Println(wg, yawg) } $ go run proc.go ./prog.go:10:10: assignment copies lock value to yawg: sync.Mutex ./prog.go:11:14: call of fmt.Println copies lock value: sync.Mutex ./prog.go:11:18: call of fmt.Println copies lock value: sync.Mutex
這段代碼會在賦值和調用 fmt.Println
時發生值拷貝致使分析器報錯,你能夠經過訪問 連接 嘗試運行這段代碼。
除了 noCopy
以外,WaitGroup
結構體中還包含一個總共佔用 12 字節大小的數組,這個數組中會存儲當前結構體持有的狀態和信號量,在 64 位與 32 位的機器上表現也很是不一樣。
WaitGroup
提供了私有方法 state
可以幫助咱們從 state1
字段中取出它的狀態和信號量。
WaitGroup
對外暴露的接口只有三個 Add
、Wait
和 Done
,其中 Done
方法只是調用了 wg.Add(-1)
自己並無什麼特殊的邏輯,咱們來了解一下剩餘的兩個方法:
func (wg *WaitGroup) Add(delta int) { statep, semap := wg.state() state := atomic.AddUint64(statep, uint64(delta)<<32) v := int32(state >> 32) w := uint32(state) if v < 0 { panic("sync: negative WaitGroup counter") } if v > 0 || w == 0 { return } *statep = 0 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) } }
Add
方法的主要做用就是更新 WaitGroup
中持有的計數器 counter
,64 位狀態的高 32 位,雖然 Add
方法傳入的參數能夠爲負數,可是一個 WaitGroup
的計數器只能是非負數,當調用 Add
方法致使計數器歸零而且還有等待的 Goroutine 時,就會經過 runtime_Semrelease
喚醒處於等待狀態的全部 Goroutine。
另外一個 WaitGroup
的方法 Wait
就會在當前計數器中保存的數據大於 0
時修改等待 Goroutine 的個數 waiter
並調用 runtime_Semacquire
陷入睡眠狀態。
func (wg *WaitGroup) Wait() { statep, semap := wg.state() for { state := atomic.LoadUint64(statep) v := int32(state >> 32) if v == 0 { return } if atomic.CompareAndSwapUint64(statep, state, state+1) { runtime_Semacquire(semap) if +statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } return } } }
陷入睡眠的 Goroutine 就會等待 Add
方法在計數器爲 0
時喚醒。
經過對 WaitGroup
的分析和研究,咱們可以得出如下的一些結論:
Add
不能在和 Wait
方法在 Goroutine 中併發調用,一旦出現就會形成程序崩潰;WaitGroup
必須在 Wait
方法返回以後才能被從新使用;Done
只是對 Add
方法的簡單封裝,咱們能夠向 Add
方法傳入任意負數(須要保證計數器非負)快速將計數器歸零以喚醒其餘等待的 Goroutine;WaitGroup
計數器的歸零,這些 Goroutine 也會被『同時』喚醒;Go 語言在標準庫的 sync
同步包中還提供了 Once
語義,它的主要功能其實也很好理解,保證在 Go 程序運行期間 Once
對應的某段代碼只會執行一次。
在以下所示的代碼中,Do
方法中傳入的函數只會被執行一次,也就是咱們在運行以下所示的代碼時只會看見一次 only once
的輸出結果:
func main() { o := &sync.Once{} for i := 0; i < 10; i++ { o.Do(func() { fmt.Println("only once") }) } } $ go run main.go only once
做爲 sync
包中的結構體,Once
有着很是簡單的數據結構,每個 Once
結構體中都只包含一個用於標識代碼塊是否被執行過的 done
以及一個互斥鎖 Mutex
:
type Once struct { done uint32 m Mutex }
Once
結構體對外惟一暴露的方法就是 Do
,該方法會接受一個入參爲空的函數,若是使用 atomic.LoadUint32
檢查到已經執行過函數了,就會直接返回,不然就會進入 doSlow
運行傳入的函數:
func (o *Once) Do(f func()) { if atomic.LoadUint32(&o.done) == 0 { o.doSlow(f) } } func (o *Once) doSlow(f func()) { o.m.Lock() defer o.m.Unlock() if o.done == 0 { defer atomic.StoreUint32(&o.done, 1) f() } }
doSlow
的實現也很是簡單,咱們先爲當前的 Goroutine 獲取互斥鎖,而後經過 defer
關鍵字將 done
成員變量設置成 1
並運行傳入的函數,不管當前函數是正常運行仍是拋出 panic
,當前方法都會將 done
設置成 1
保證函數不會執行第二次。
做爲用於保證函數執行次數的 Once
結構體,它使用互斥鎖和 atomic
提供的方法實現了某個函數在程序運行期間只能執行一次的語義,在使用的過程當中咱們也須要注意如下的內容:
Do
方法中傳入的函數只會被執行一次,哪怕函數中發生了 panic
;Do
方法傳入不一樣的函數時只會執行第一次調用的函數;Go 語言在標準庫中提供的 Cond
實際上是一個條件變量,經過 Cond
咱們可讓一系列的 Goroutine 都在觸發某個事件或者條件時才被喚醒,每個 Cond
結構體都包含一個互斥鎖 L
,咱們先來看一下 Cond
是如何使用的:
func main() { c := sync.NewCond(&sync.Mutex{}) for i := 0; i < 10; i++ { go listen(c) } go broadcast(c) ch := make(chan os.Signal, 1) signal.Notify(ch, os.Interrupt) <-ch } func broadcast(c *sync.Cond) { c.L.Lock() c.Broadcast() c.L.Unlock() } func listen(c *sync.Cond) { c.L.Lock() c.Wait() fmt.Println("listen") c.L.Unlock() } $ go run main.go listen listen ... listen
在上述代碼中咱們同時運行了 11 個 Goroutine,其中的 10 個 Goroutine 會經過 Wait
等待指望的信號或者事件,而剩下的一個 Goroutine 會調用 Broadcast
方法通知全部陷入等待的 Goroutine,當調用 Boardcast
方法以後,就會打印出 10 次 "listen"
並結束調用。
Cond
的結構體中包含 noCopy
和 copyChecker
兩個字段,前者用於保證 Cond
不會再編譯期間拷貝,後者保證在運行期間發生拷貝會直接 panic
,持有的另外一個鎖 L
實際上是一個接口 Locker
,任意實現 Lock
和 Unlock
方法的結構體均可以做爲 NewCond
方法的參數:
type Cond struct { noCopy noCopy L Locker notify notifyList checker copyChecker }
結構體中最後的變量 notifyList
其實也就是爲了實現 Cond
同步機制,該結構體其實就是一個 Goroutine
的鏈表:
type notifyList struct { wait uint32 notify uint32 lock mutex head *sudog tail *sudog }
在這個結構體中,head
和 tail
分別指向的就是整個鏈表的頭和尾,而 wait
和 notify
分別表示當前正在等待的 Goroutine 和已經通知到的 Goroutine,咱們經過這兩個變量就能確認當前待通知和已通知的 Goroutine。
Cond
對外暴露的 Wait
方法會將當前 Goroutine 陷入休眠狀態,它會先調用 runtime_notifyListAdd
將等待計數器 +1
,而後解鎖並調用 runtime_notifyListWait
等待其餘 Goroutine 的喚醒:
func (c *Cond) Wait() { c.checker.check() t := runtime_notifyListAdd(&c.notify) c.L.Unlock() runtime_notifyListWait(&c.notify, t) c.L.Lock() } func notifyListAdd(l *notifyList) uint32 { return atomic.Xadd(&l.wait, 1) - 1 }
notifyListWait
方法的主要做用就是獲取當前的 Goroutine 並將它追加到 notifyList
鏈表的最末端:
func notifyListWait(l *notifyList, t uint32) { lock(&l.lock) if less(t, l.notify) { unlock(&l.lock) return } s := acquireSudog() s.g = getg() s.ticket = t if l.tail == nil { l.head = s } else { l.tail.next = s } l.tail = s goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3) releaseSudog(s) }
除了將當前 Goroutine 追加到鏈表的末端以外,咱們還會調用 goparkunlock
陷入休眠狀態,該函數也是在 Go 語言切換 Goroutine 時常常會使用的方法,它會直接讓出當前處理器的使用權並等待調度器的喚醒。
Cond
對外提供的 Signal
和 Broadcast
方法就是用來喚醒調用 Wait
陷入休眠的 Goroutine,從兩個方法的名字來看,前者會喚醒隊列最前面的 Goroutine,後者會喚醒隊列中所有的 Goroutine:
func (c *Cond) Signal() { c.checker.check() runtime_notifyListNotifyOne(&c.notify) } func (c *Cond) Broadcast() { c.checker.check() runtime_notifyListNotifyAll(&c.notify) }
notifyListNotifyAll
方法會從鏈表中取出所有的 Goroutine 併爲它們依次調用 readyWithTime
,該方法會經過 goready
將目標的 Goroutine 喚醒:
func notifyListNotifyAll(l *notifyList) { s := l.head l.head = nil l.tail = nil atomic.Store(&l.notify, atomic.Load(&l.wait)) for s != nil { next := s.next s.next = nil readyWithTime(s, 4) s = next } }
雖然它會依次喚醒所有的 Goroutine,可是這裏喚醒的順序其實也是按照加入隊列的前後順序,先加入的會先被 goready
喚醒,後加入的 Goroutine 可能就須要等待調度器的調度。
而 notifyListNotifyOne
函數就只會從 sudog
構成的鏈表中知足 sudog.ticket == l.notify
的 Goroutine 並經過 readyWithTime
喚醒:
func notifyListNotifyOne(l *notifyList) { t := l.notify atomic.Store(&l.notify, t+1) for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } s.next = nil readyWithTime(s, 4) return } } }
在通常狀況下咱們都會選擇在不知足特定條件時調用 Wait
陷入休眠,當某些 Goroutine 檢測到當前知足了喚醒的條件,就能夠選擇使用 Signal
通知一個或者 Broadcast
通知所有的 Goroutine 當前條件已經知足,能夠繼續完成工做了。
與 Mutex
相比,Cond
仍是一個不被全部人都清楚和理解的同步機制,它提供了相似隊列的 FIFO 的等待機制,同時也提供了 Signal
和 Broadcast
兩種不一樣的喚醒方法,相比於使用 for {}
忙碌等待,使用 Cond
可以在遇到長時間條件沒法知足時將當前處理器讓出的功能,若是咱們合理使用仍是可以在一些狀況下提高性能,在使用的過程當中咱們須要注意:
Wait
方法在調用以前必定要使用 L.Lock
持有該資源,不然會發生 panic
致使程序崩潰;Signal
方法喚醒的 Goroutine 都是隊列最前面、等待最久的 Goroutine;Broadcast
雖然是廣播通知所有等待的 Goroutine,可是真正被喚醒時也是按照必定順序的;除了這些標準庫中提供的同步原語以外,Go 語言還在子倉庫 x/sync
中提供了額外的四種同步原語,ErrGroup
、Semaphore
、SingleFlight
和 SyncMap
,其中的 SyncMap
其實就是 sync
包中的 sync.Map
,它在 1.9 版本的 Go 語言中被引入了 x/sync
包,隨着 API 的成熟和穩定最後被移到了標準庫 sync
包中。
咱們在這一節中就會介紹 Go 語言目前在擴展包中提供的三種原語,也就是 ErrGroup
、Semaphore
和 SingleFlight
。
子倉庫 x/sync
中的包 errgroup 其實就爲咱們在一組 Goroutine 中提供了同步、錯誤傳播以及上下文取消的功能,咱們可使用以下所示的方式並行獲取網頁的數據:
var g errgroup.Group var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", } for i := range urls { url := urls[i] g.Go(func() error { resp, err := http.Get(url) if err == nil { resp.Body.Close() } return err }) } if err := g.Wait(); err == nil { fmt.Println("Successfully fetched all URLs.") }
Go
方法可以建立一個 Goroutine 並在其中執行傳入的函數,而 Wait
方法會等待 Go
方法建立的 Goroutine 所有返回後返回第一個非空的錯誤,若是全部的 Goroutine 都沒有返回錯誤,該函數就會返回 nil
。
errgroup
包中的 Group
結構體同時由三個比較重要的部分組成:
Context
時返回的 cancel
函數,主要用於通知使用 context
的 Goroutine 因爲某些子任務出錯,能夠中止工做讓出資源了;WaitGroup
同步原語;err
和保證 err
只會被賦值一次的 errOnce
;type Group struct { cancel func() wg sync.WaitGroup errOnce sync.Once err error }
這些字段共同組成了 Group
結構體併爲咱們提供同步、錯誤傳播以及上下文取消等功能。
errgroup
對外惟一暴露的構造器就是 WithContext
方法,咱們只能從一個 Context
中建立一個新的 Group
變量,WithCancel
返回的取消函數也僅會在 Group
結構體內部使用:
func WithContext(ctx context.Context) (*Group, context.Context) { ctx, cancel := context.WithCancel(ctx) return &Group{cancel: cancel}, ctx }
建立新的並行子任務須要使用 Go
方法,這個方法內部會對 WaitGroup
加一併建立一個新的 Goroutine,在 Goroutine 內部運行子任務並在返回錯誤時及時調用 cancel
並對 err
賦值,只有最先返回的錯誤纔會被上游感知到,後續的錯誤都會被捨棄:
func (g *Group) Go(f func() error) { g.wg.Add(1) go func() { defer g.wg.Done() if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { g.cancel() } }) } }() } func (g *Group) Wait() error { g.wg.Wait() if g.cancel != nil { g.cancel() } return g.err }
Wait
方法其實就只是調用了 WaitGroup
的同步方法,在子任務所有完成時取消 Context
並返回可能出現的錯誤。
errgroup
包中的 Group
同步原語的實現原理仍是很是簡單的,它沒有涉及很是底層和運行時包中的 API,只是對基本同步語義進行了簡單的封裝提供了更加複雜的功能,在使用時咱們也須要注意如下的幾個問題:
Context
的 cancel
方法取消上下文;信號量是在併發編程中比較常見的一種同步機制,它會保證持有的計數器在 0
到初始化的權重之間,每次獲取資源時都會將信號量中的計數器減去對應的數值,在釋放時從新加回來,當遇到計數器大於信號量大小時就會進入休眠等待其餘進程釋放信號,咱們經常會在控制訪問資源的進程數量時用到。
Golang 的擴展包中就提供了帶權重的信號量,咱們能夠按照不一樣的權重對資源的訪問進行管理,這個包對外也只提供了四個方法:
NewWeighted
用於建立新的信號量;Acquire
獲取了指定權重的資源,若是當前沒有『空閒資源』,就會陷入休眠等待;TryAcquire
也用於獲取指定權重的資源,可是若是當前沒有『空閒資源』,就會直接返回 false
;Release
用於釋放指定權重的資源;NewWeighted
方法的主要做用建立一個新的權重信號量,傳入信號量最大的權重就會返回一個新的 Weighted
結構體指針:
func NewWeighted(n int64) *Weighted { w := &Weighted{size: n} return w } type Weighted struct { size int64 cur int64 mu sync.Mutex waiters list.List }
Weighted
結構體中包含一個 waiters
列表其中存儲着等待獲取資源的『用戶』,除此以外它還包含當前信號量的上限以及一個計數器 cur
,這個計數器的範圍就是 [0, size]
:
信號量中的計數器會隨着用戶對資源的訪問和釋放進行改變,引入的權重概念可以幫助咱們更好地對資源的訪問粒度進行控制,儘量知足全部常見的用例。
在上面咱們已經提到過 Acquire
方法就是用於獲取指定權重資源的方法,這個方法總共由三個不一樣的狀況組成:
Weighted
的大小時,因爲不可能知足條件就會直接返回;select
等待當前 Goroutine 被喚醒,被喚醒後就會獲取信號量;func (s *Weighted) Acquire(ctx context.Context, n int64) error { s.mu.Lock() if s.size-s.cur >= n && s.waiters.Len() == 0 { s.cur += n s.mu.Unlock() return nil } if n > s.size { s.mu.Unlock() <-ctx.Done() return ctx.Err() } ready := make(chan struct{}) w := waiter{n: n, ready: ready} elem := s.waiters.PushBack(w) s.mu.Unlock() select { case <-ctx.Done(): err := ctx.Err() s.mu.Lock() select { case <-ready: err = nil default: s.waiters.Remove(elem) } s.mu.Unlock() return err case <-ready: return nil } }
另外一個用於獲取信號量的方法 TryAcquire
相比之下就很是簡單,它只會判斷當前信號量是否有充足的資源獲取,若是有充足的資源就會直接馬上返回 true
不然就會返回 false
:
func (s *Weighted) TryAcquire(n int64) bool { s.mu.Lock() success := s.size-s.cur >= n && s.waiters.Len() == 0 if success { s.cur += n } s.mu.Unlock() return success }
與 Acquire
相比,TryAcquire
因爲不會等待資源的釋放因此可能更適用於一些延時敏感、用戶須要馬上感知結果的場景。
最後要介紹的 Release
方法其實也很是簡單,當咱們對信號量進行釋放時,Release
方法會從頭至尾遍歷 waiters
列表中所有的等待者,若是釋放資源後的信號量有充足的剩餘資源就會經過 Channel 喚起指定的 Goroutine:
func (s *Weighted) Release(n int64) { s.mu.Lock() s.cur -= n for { next := s.waiters.Front() if next == nil { break } w := next.Value.(waiter) if s.size-s.cur < w.n { break } s.cur += w.n s.waiters.Remove(next) close(w.ready) } s.mu.Unlock() }
固然也可能會出現剩餘資源沒法喚起 Goroutine 的狀況,在這時當前方法就會釋放鎖後直接返回,經過對這段代碼的分析咱們也能發現,若是一個信號量須要的佔用的資源很是多,他可能會長時間沒法獲取鎖,這可能也是 Acquire
方法引入另外一個參數 Context
的緣由,爲信號量的獲取設置一個超時時間。
帶權重的信號量確實有着更多的應用場景,這也是 Go 語言對外提供的惟一一種信號量實現,在使用的過程當中咱們須要注意如下的幾個問題:
Acquire
和 TryAcquire
方法均可以用於獲取資源,前者用於同步獲取會等待鎖的釋放,後者會在沒法獲取鎖時直接返回;Release
方法會按照 FIFO 的順序喚醒能夠被喚醒的 Goroutine;Release
的釋放策略可能會等待比較長的時間;singleflight 是 Go 語言擴展包中提供了另外一種同步原語,這其實也是做者最喜歡的一種同步擴展機制,它可以在一個服務中抑制對下游的屢次重複請求,一個比較常見的使用場景是 — 咱們在使用 Redis 對數據庫中的一些熱門數據進行了緩存並設置了超時時間,緩存超時的一瞬間可能有很是多的並行請求發現了 Redis 中已經不包含任何緩存因此大量的流量會打到數據庫上影響服務的延時和質量。
可是 singleflight
就能有效地解決這個問題,它的主要做用就是對於同一個 Key
最終只會進行一次函數調用,在這個上下文中就是隻會進行一次數據庫查詢,查詢的結果會寫回 Redis 並同步給全部請求對應 Key
的用戶:
這其實就減小了對下游的瞬時流量,在獲取下游資源很是耗時,例如:訪問緩存、數據庫等場景下就很是適合使用 singleflight
對服務進行優化,在上述的這個例子中咱們就能夠在想 Redis 和數據庫中獲取數據時都使用 singleflight
提供的這一功能減小下游的壓力;它的使用其實也很是簡單,咱們能夠直接使用 singleflight.Group{}
建立一個新的 Group
結構體,而後經過調用 Do
方法就能對相同的請求進行抑制:
type service struct { requestGroup singleflight.Group } func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) { v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) { rows, err := // select * from tables if err != nil { return nil, err } return rows, nil }) if err != nil { return nil, err } return Response{ rows: rows, }, nil }
上述代碼使用請求的哈希做爲抑制相同請求的鍵,咱們也能夠選擇一些比較關鍵或者重要的字段做爲 Do
方法的第一個參數避免對下游的瞬時大量請求。
Group
結構體自己由一個互斥鎖 Mutex
和一個從 Key
到 call
結構體指針的映射表組成,每個 call
結構體都保存了當前此次調用對應的信息:
type Group struct { mu sync.Mutex m map[string]*call } type call struct { wg sync.WaitGroup val interface{} err error dups int chans []chan<- Result }
call
結構體中的 val
和 err
字段都是在執行傳入的函數時只會被賦值一次,它們也只會在 WaitGroup
等待結束都被讀取,而 dups
和 chans
字段分別用於存儲當前 singleflight
抑制的請求數量以及在結果返回時將信息傳遞給調用方。
singleflight
包提供了兩個用於抑制相同請求的方法,其中一個是同步等待的方法 Do
,另外一個是返回 Channel 的 DoChan
,這兩個方法在功能上沒有太多的區別,只是在接口的表現上稍有不一樣。
每次 Do
方法的調用時都會獲取互斥鎖並嘗試對 Group
持有的映射表進行懶加載,隨後判斷是否已經存在 key
對應的函數調用:
當不存在對應的 call
結構體時:
call
結構體指針;WaitGroup
持有的計數器;call
結構體指針添加到映射表;Mutex
;doCall
方法等待結果的返回;當已經存在對應的 call
結構體時;
dups
計數器,它表示當前重複的調用次數;Mutex
;WaitGroup.Wait
等待請求的返回;func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ g.mu.Unlock() c.wg.Wait() return c.val, c.err, true } c := new(call) c.wg.Add(1) g.m[key] = c g.mu.Unlock() g.doCall(c, key, fn) return c.val, c.err, c.dups > 0 }
由於 val
和 err
兩個字段都只會在 doCall
方法中被賦值,因此當 doCall
方法和 WaitGroup.Wait
方法返回時,這兩個值就會返回給 Do
函數的調用者。
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { c.val, c.err = fn() c.wg.Done() g.mu.Lock() delete(g.m, key) for _, ch := range c.chans { ch <- Result{c.val, c.err, c.dups > 0} } g.mu.Unlock() }
doCall
中會運行傳入的函數 fn
,該函數的返回值就會賦值給 c.val
和 c.err
,函數執行結束後就會調用 WaitGroup.Done
方法通知全部被抑制的請求,當前函數已經執行完成,能夠從 call
結構體中取出返回值並返回了;在這以後,doCall
方法會獲取持有的互斥鎖並經過管道將信息同步給使用 DoChan
方法的調用方。
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ c.chans = append(c.chans, ch) g.mu.Unlock() return ch } c := &call{chans: []chan<- Result{ch}} c.wg.Add(1) g.m[key] = c g.mu.Unlock() go g.doCall(c, key, fn) return ch }
DoChan
方法和 Do
的區別就是,它使用 Goroutine 異步執行 doCall
並向 call
持有的 chans
切片中追加 chan Result
變量,這也是它可以提供異步傳值的緣由。
singleflight
包提供的 Group
接口確實很是好用,當咱們須要這種抑制對下游的相同請求時就能夠經過這個方法來增長吞吐量和服務質量,在使用的過程當中咱們也須要注意如下的幾個問題:
Do
和 DoChan
一個用於同步阻塞調用傳入的函數,一個用於異步調用傳入的參數並經過 Channel 接受函數的返回值;Forget
方法能夠通知 singleflight
在持有的映射表中刪除某個鍵,接下來對該鍵的調用就會直接執行方法而不是等待前面的函數返回;咱們在這一節中介紹了 Go 語言標準庫中提供的基本原語以及擴展包中的擴展原語,這些併發編程的原語可以幫助咱們更好地利用 Go 語言的特性構建高吞吐量、低延時的服務,並解決因爲併發帶來的錯誤,到這裏咱們再從新回顧一下這一節介紹的內容:
Mutex
互斥鎖
mutexLocked
加鎖;mutexLocked
而且在普通模式下工做,就會進入自旋,執行 30 次 PAUSE
指令消耗 CPU 時間等待鎖的釋放;1ms
,互斥鎖就會被切換到飢餓模式;runtime_SemacquireMutex
方法將調用 Lock
的 Goroutine 切換至休眠狀態,等待持有信號量的 Goroutine 喚醒當前協程;1ms
,當前 Goroutine 會將互斥鎖切換回正常模式;Unlock
會直接拋出異常;mutexLocked
標誌位;runtime_Semrelease
喚醒對應的 Goroutine;RWMutex
讀寫互斥鎖
readerSem
— 讀寫鎖釋放時通知因爲獲取讀鎖等待的 Goroutine;writerSem
— 讀鎖釋放時通知因爲獲取讀寫鎖等待的 Goroutine;w
互斥鎖 — 保證寫操做之間的互斥;readerCount
— 統計當前進行讀操做的協程數,觸發寫鎖時會將其減小 rwmutexMaxReaders
阻塞後續的讀操做;readerWait
— 當前讀寫鎖等待的進行讀操做的協程數,在觸發 Lock
以後的每次 RUnlock
都會將其減一,當它歸零時該 Goroutine 就會得到讀寫鎖;Unlock
時首先會通知全部的讀操做,而後纔會釋放持有的互斥鎖,這樣可以保證讀操做不會被連續的寫操做『餓死』;WaitGroup
等待一組 Goroutine 結束
Add
不能在和 Wait
方法在 Goroutine 中併發調用,一旦出現就會形成程序崩潰;WaitGroup
必須在 Wait
方法返回以後才能被從新使用;Done
只是對 Add
方法的簡單封裝,咱們能夠向 Add
方法傳入任意負數(須要保證計數器非負)快速將計數器歸零以喚醒其餘等待的 Goroutine;WaitGroup
計數器的歸零,這些 Goroutine 也會被『同時』喚醒;Once
程序運行期間僅執行一次
Do
方法中傳入的函數只會被執行一次,哪怕函數中發生了 panic
;Do
方法傳入不一樣的函數時只會執行第一次調用的函數;Cond
發生指定事件時喚醒
Wait
方法在調用以前必定要使用 L.Lock
持有該資源,不然會發生 panic
致使程序崩潰;Signal
方法喚醒的 Goroutine 都是隊列最前面、等待最久的 Goroutine;Broadcast
雖然是廣播通知所有等待的 Goroutine,可是真正被喚醒時也是按照必定順序的;ErrGroup
爲一組 Goroutine 提供同步、錯誤傳播以及上下文取消的功能
Context
的 cancel
方法取消上下文;Semaphore
帶權重的信號量
Acquire
和 TryAcquire
方法均可以用於獲取資源,前者用於同步獲取會等待鎖的釋放,後者會在沒法獲取鎖時直接返回;Release
方法會按照 FIFO 的順序喚醒能夠被喚醒的 Goroutine;Release
的釋放策略可能會等待比較長的時間;SingleFlight
用於抑制對下游的重複請求
Do
和 DoChan
一個用於同步阻塞調用傳入的函數,一個用於異步調用傳入的參數並經過 Channel 接受函數的返回值;Forget
方法能夠通知 singleflight
在持有的映射表中刪除某個鍵,接下來對該鍵的調用就會直接執行方法而不是等待前面的函數返回;這些同步原語的實現不只要考慮 API 接口的易用、解決併發編程中可能遇到的線程競爭問題,還須要對尾延時進行優化避免某些 Goroutine 沒法獲取鎖或者資源而被餓死,對同步原語的學習也可以加強咱們隊併發編程的理解和認識,也是瞭解併發編程沒法跨越的一個步驟。