Golang 併發編程與同步原語

淺談 Go 語言實現原理

原文連接:https://draveness.me/golang/c...html

當提到併發編程、多線程編程時,咱們每每都離不開『鎖』這一律念,Go 語言做爲一個原生支持用戶態進程 Goroutine 的語言,也必定會爲開發者提供這一功能,鎖的主要做用就是保證多個線程或者 Goroutine 在訪問同一片內存時不會出現混亂的問題,鎖實際上是一種併發編程中的同步原語(Synchronization Primitives)。git

在這一節中咱們就會介紹 Go 語言中常見的同步原語 MutexRWMutexWaitGroupOnceCond 以及擴展原語 ErrGroupSemaphoreSingleFlight 的實現原理,同時也會涉及互斥鎖、信號量等併發編程中的常見概念。github

基本原語

Go 語言在 sync 包中提供了用於同步的一些基本原語,包括常見的互斥鎖 Mutex 與讀寫互斥鎖 RWMutex 以及 OnceWaitGroupgolang

golang-basic-sync-primitives

這些基本原語的主要做用是提供較爲基礎的同步功能,咱們應該使用 Channel 和通訊來實現更加高級的同步機制,咱們在這一節中並不會介紹標準庫中所有的原語,而是會介紹其中比較常見的 MutexRWMutexOnceWaitGroupCond,咱們並不會涉及剩下兩個用於存取數據的結構體 MapPool數據庫

Mutex

Go 語言中的互斥鎖在 sync 包中,它由兩個字段 statesema 組成,state 表示當前互斥鎖的狀態,而 sema 真正用於控制鎖狀態的信號量,這兩個加起來只佔 8 個字節空間的結構體就表示了 Go 語言中的互斥鎖。編程

type Mutex struct {
    state int32
    sema  uint32
}

狀態

互斥鎖的狀態是用 int32 來表示的,可是鎖的狀態並非互斥的,它的最低三位分別表示 mutexLockedmutexWokenmutexStarving,剩下的位置都用來表示當前有多少個 Goroutine 等待互斥鎖被釋放:數組

golang-mutex-state

互斥鎖在被建立出來時,全部的狀態位的默認值都是 0,當互斥鎖被鎖定時 mutexLocked 就會被置成 1、當互斥鎖被在正常模式下被喚醒時 mutexWoken 就會被被置成 1mutexStarving 用於表示當前的互斥鎖進入了狀態,最後的幾位是在當前互斥鎖上等待的 Goroutine 個數。緩存

飢餓模式

在瞭解具體的加鎖和解鎖過程以前,咱們須要先簡單瞭解一下 Mutex 在使用過程當中可能會進入的飢餓模式,飢餓模式是在 Go 語言 1.9 版本引入的特性,它的主要功能就是保證互斥鎖的獲取的『公平性』(Fairness)。數據結構

互斥鎖能夠同時處於兩種不一樣的模式,也就是正常模式和飢餓模式,在正常模式下,全部鎖的等待者都會按照先進先出的順序獲取鎖,可是若是一個剛剛被喚起的 Goroutine 遇到了新的 Goroutine 進程也調用了 Lock 方法時,大機率會獲取不到鎖,爲了減小這種狀況的出現,防止 Goroutine 被『餓死』,一旦 Goroutine 超過 1ms 沒有獲取到鎖,它就會將當前互斥鎖切換飢餓模式。多線程

golang-mutex-mode

在飢餓模式中,互斥鎖會被直接交給等待隊列最前面的 Goroutine,新的 Goroutine 在這時不能獲取鎖、也不會進入自旋的狀態,它們只會在隊列的末尾等待,若是一個 Goroutine 得到了互斥鎖而且它是隊列中最末尾的協程或者它等待的時間少於 1ms,那麼當前的互斥鎖就會被切換回正常模式。

相比於飢餓模式,正常模式下的互斥鎖可以提供更好地性能,飢餓模式的主要做用就是避免一些 Goroutine 因爲陷入等待沒法獲取鎖而形成較高的尾延時,這也是對 Mutex 的一個優化。

加鎖

互斥鎖 Mutex 的加鎖是靠 Lock 方法完成的,最新的 Go 語言源代碼中已經將 Lock 方法進行了簡化,方法的主幹只保留了最多見、簡單而且快速的狀況;當鎖的狀態是 0 時直接將 mutexLocked 位置成 1

func (m *Mutex) Lock() {
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }
    m.lockSlow()
}

可是當 Lock 方法被調用時 Mutex 的狀態不是 0 時就會進入 lockSlow 方法嘗試經過自旋或者其餘的方法等待鎖的釋放並獲取互斥鎖,該方法的主體是一個很是大 for 循環,咱們會將該方法分紅幾個部分介紹獲取鎖的過程:

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }

在這段方法的第一部分會判斷當前方法可否進入自旋來等待鎖的釋放,自旋(Spinnig)實際上是在多線程同步的過程當中使用的一種機制,當前的進程在進入自旋的過程當中會一直保持 CPU 的佔用,持續檢查某個條件是否爲真,在多核的 CPU 上,自旋的優勢是避免了 Goroutine 的切換,因此若是使用恰當會對性能帶來很是大的增益。

在 Go 語言的 Mutex 互斥鎖中,只有在普通模式下才可能進入自旋,除了模式的限制以外,runtime_canSpin 方法中會判斷當前方法是否能夠進入自旋,進入自旋的條件很是苛刻:

  1. 運行在多 CPU 的機器上;
  2. 當前 Goroutine 爲了獲取該鎖進入自旋的次數小於四次;
  3. 當前機器上至少存在一個正在運行的處理器 P 而且處理的運行隊列是空的;

一旦當前 Goroutine 可以進入自旋就會調用 runtime_doSpin,它最終調用匯編語言編寫的方法 procyield 並執行指定次數的 PAUSE 指令,PAUSE 指令什麼都不會作,可是會消耗 CPU 時間,每次自旋都會調用 30PAUSE,下面是該方法在 386 架構的機器上的實現:

TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL    cycles+0(FP), AX
again:
    PAUSE
    SUBL    $1, AX
    JNZ    again
    RET

處理了自旋相關的特殊邏輯以後,互斥鎖接下來就根據上下文計算當前互斥鎖最新的狀態了,幾個不一樣的條件分別會更新 state 中存儲的不一樣信息 mutexLockedmutexStarvingmutexWokenmutexWaiterShift

new := old
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            new &^= mutexWoken
        }

計算了新的互斥鎖狀態以後,咱們就會使用 atomic 包提供的 CAS 函數修改互斥鎖的狀態,若是當前的互斥鎖已經處於飢餓和鎖定的狀態,就會跳過當前步驟,調用 runtime_SemacquireMutex 方法:

if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            if old&mutexStarving != 0 {
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }
}

runtime_SemacquireMutex 方法的主要做用就是經過 Mutex 的使用互斥鎖中的信號量保證資源不會被兩個 Goroutine 獲取,從這裏咱們就能看出 Mutex 其實就是對更底層的信號量進行封裝,對外提供更加易用的 API,runtime_SemacquireMutex 會在方法中不斷調用 goparkunlock 將當前 Goroutine 陷入休眠等待信號量能夠被獲取。

一旦當前 Goroutine 能夠獲取信號量,就證實互斥鎖已經被解鎖,該方法就會馬上返回,Lock 方法的剩餘代碼也會繼續執行下去了,當前互斥鎖處於飢餓模式時,若是該 Goroutine 是隊列中最後的一個 Goroutine 或者等待鎖的時間小於 starvationThresholdNs(1ms),當前 Goroutine 就會直接得到互斥鎖而且從飢餓模式中退出並得到鎖。

解鎖

互斥鎖的解鎖過程相比之下就很是簡單,Unlock 方法會直接使用 atomic 包提供的 AddInt32,若是返回的新狀態不等於 0 就會進入 unlockSlow 方法:

func (m *Mutex) Unlock() {
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        m.unlockSlow(new)
    }
}

unlockSlow 方法首先會對鎖的狀態進行校驗,若是當前互斥鎖已經被解鎖過了就會直接拋出異常 sync: unlock of unlocked mutex 停止當前程序,在正常狀況下會根據當前互斥鎖的狀態是正常模式仍是飢餓模式進入不一樣的分支:

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        runtime_Semrelease(&m.sema, true, 1)
    }
}

若是當前互斥鎖的狀態是飢餓模式就會直接調用 runtime_Semrelease 方法直接將當前鎖交給下一個正在嘗試獲取鎖的等待者,等待者會在被喚醒以後設置 mutexLocked 狀態,因爲此時仍然處於 mutexStarving,因此新的 Goroutine 也沒法得到鎖。

在正常模式下,若是當前互斥鎖不存在等待者或者最低三位表示的狀態都爲 0,那麼當前方法就不須要喚醒其餘 Goroutine 能夠直接返回,當有 Goroutine 正在處於等待狀態時,仍是會經過 runtime_Semrelease 喚醒對應的 Goroutine 並移交鎖的全部權。

小結

經過對互斥鎖 Mutex 加鎖和解鎖過程的分析,咱們可以得出如下的一些結論,它們可以幫助咱們更好地理解互斥鎖的工做原理,互斥鎖的加鎖的過程比較複雜,涉及自旋、信號量以及 Goroutine 調度等概念:

  • 若是互斥鎖處於初始化狀態,就會直接經過置位 mutexLocked 加鎖;
  • 若是互斥鎖處於 mutexLocked 而且在普通模式下工做,就會進入自旋,執行 30 次 PAUSE 指令消耗 CPU 時間等待鎖的釋放;
  • 若是當前 Goroutine 等待鎖的時間超過了 1ms,互斥鎖就會被切換到飢餓模式;
  • 互斥鎖在正常狀況下會經過 runtime_SemacquireMutex 方法將調用 Lock 的 Goroutine 切換至休眠狀態,等待持有信號量的 Goroutine 喚醒當前協程;
  • 若是當前 Goroutine 是互斥鎖上的最後一個等待的協程或者等待的時間小於 1ms,當前 Goroutine 會將互斥鎖切換回正常模式;

互斥鎖的解鎖過程相對來講就比較簡單,雖然對於普通模式和飢餓模式的處理有一些不一樣,可是因爲代碼行數很少,因此邏輯清晰,也很是容易理解:

  • 若是互斥鎖已經被解鎖,那麼調用 Unlock 會直接拋出異常;
  • 若是互斥鎖處於飢餓模式,會直接將鎖的全部權交給隊列中的下一個等待者,等待者會負責設置 mutexLocked 標誌位;
  • 若是互斥鎖處於普通模式,而且沒有 Goroutine 等待鎖的釋放或者已經有被喚醒的 Goroutine 得到了鎖就會直接返回,在其餘狀況下回經過 runtime_Semrelease 喚醒對應的 Goroutine;

RWMutex

讀寫互斥鎖也是 Go 語言 sync 包爲咱們提供的接口之一,一個常見的服務對資源的讀寫比例會很是高,若是大多數的請求都是讀請求,它們之間不會相互影響,那麼咱們爲何不能將對資源讀和寫操做分離呢?這也就是 RWMutex 讀寫互斥鎖解決的問題,不限制對資源的併發讀,可是讀寫、寫寫操做沒法並行執行。

Y N
N N

讀寫互斥鎖在 Go 語言中的實現是 RWMutex,其中不只包含一個互斥鎖,還持有兩個信號量,分別用於寫等待讀和讀等待寫:

type RWMutex struct {
    w           Mutex
    writerSem   uint32
    readerSem   uint32
    readerCount int32
    readerWait  int32
}

readerCount 存儲了當前正在執行的讀操做的數量,最後的 readerWait 表示當寫操做被阻塞時等待的讀操做個數。

讀鎖

讀鎖的加鎖很是簡單,咱們經過 atomic.AddInt32 方法爲 readerCount 加一,若是該方法返回了負數說明當前有 Goroutine 得到了寫鎖,當前 Goroutine 就會調用 runtime_SemacquireMutex 陷入休眠等待喚醒:

func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
}

若是沒有寫操做獲取當前互斥鎖,當前方法就會在 readerCount 加一後返回;當 Goroutine 想要釋放讀鎖時會調用 RUnlock 方法:

func (rw *RWMutex) RUnlock() {
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        rw.rUnlockSlow(r)
    }
}

該方法會在減小正在讀資源的 readerCount,當前方法若是遇到了返回值小於零的狀況,說明有一個正在進行的寫操做,在這時就應該經過 rUnlockSlow 方法減小當前寫操做等待的讀操做數 readerWait 並在全部讀操做都被釋放以後觸發寫操做的信號量 writerSem

func (rw *RWMutex) rUnlockSlow(r int32) {
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {
        throw("sync: RUnlock of unlocked RWMutex")
    }
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

writerSem 在被觸發以後,嘗試獲取讀寫鎖的進程就會被喚醒並得到鎖。

讀寫鎖

當資源的使用者想要獲取讀寫鎖時,就須要經過 Lock 方法了,在 Lock 方法中首先調用了讀寫互斥鎖持有的 MutexLock 方法保證其餘獲取讀寫鎖的 Goroutine 進入等待狀態,隨後的 atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) 實際上是爲了阻塞後續的讀操做:

func (rw *RWMutex) Lock() {
    rw.w.Lock()
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
}

若是當前仍然有其餘 Goroutine 持有互斥鎖的讀鎖,該 Goroutine 就會調用 runtime_SemacquireMutex 進入休眠狀態,等待讀鎖釋放時觸發 writerSem 信號量將當前協程喚醒。

對資源的讀寫操做完成以後就會將經過 atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) 變回正數並經過 for 循環觸發全部因爲獲取讀鎖而陷入等待的 Goroutine:

func (rw *RWMutex) Unlock() {
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
        throw("sync: Unlock of unlocked RWMutex")
    }
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    rw.w.Unlock()
}

在方法的最後,RWMutex 會釋放持有的互斥鎖讓其餘的協程可以從新獲取讀寫鎖。

小結

相比狀態複雜的互斥鎖 Mutex 來講,讀寫互斥鎖 RWMutex 雖然提供的功能很是複雜,可是因爲站在了 Mutex 的『肩膀』上,因此總體的實現上會簡單不少。

  1. readerSem — 讀寫鎖釋放時通知因爲獲取讀鎖等待的 Goroutine;
  2. writerSem — 讀鎖釋放時通知因爲獲取讀寫鎖等待的 Goroutine;
  3. w 互斥鎖 — 保證寫操做之間的互斥;
  4. readerCount — 統計當前進行讀操做的協程數,觸發寫鎖時會將其減小 rwmutexMaxReaders 阻塞後續的讀操做;
  5. readerWait — 當前讀寫鎖等待的進行讀操做的協程數,在觸發 Lock 以後的每次 RUnlock 都會將其減一,當它歸零時該 Goroutine 就會得到讀寫鎖;
  6. 當讀寫鎖被釋放 Unlock 時首先會通知全部的讀操做,而後纔會釋放持有的互斥鎖,這樣可以保證讀操做不會被連續的寫操做『餓死』;

RWMutexMutex 之上提供了額外的讀寫分離功能,可以在讀請求遠遠多於寫請求時提供性能上的提高,咱們也能夠在場景合適時選擇讀寫互斥鎖。

WaitGroup

WaitGroup 是 Go 語言 sync 包中比較常見的同步機制,它能夠用於等待一系列的 Goroutine 的返回,一個比較常見的使用場景是批量執行 RPC 或者調用外部服務:

requests := []*Request{...}

wg := &sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
    go func(r *Request) {
        defer wg.Done()
        
        // res, err := service.call(r)
    }(request)
}

wg.Wait()

經過 WaitGroup 咱們能夠在多個 Goroutine 之間很是輕鬆地同步信息,本來順序執行的代碼也能夠在多個 Goroutine 中併發執行,加快了程序處理的速度,在上述代碼中只有在全部的 Goroutine 都執行完畢以後 Wait 方法纔會返回,程序能夠繼續執行其餘的邏輯。

golang-syncgroup

總而言之,它的做用就像它的名字同樣,,經過 Done 來傳遞任務完成的信號,比較經常使用於等待一組 Goroutine 中併發執行的任務所有結束。

結構體

WaitGroup 結構體中的成員變量很是簡單,其中的 noCopy 的主要做用就是保證 WaitGroup 不會被開發者經過再賦值的方式進行拷貝,進而致使一些詭異的行爲:

type WaitGroup struct {
    noCopy noCopy

    state1 [3]uint32
}

copylock 包就是一個用於檢查相似錯誤的分析器,它的原理就是在 編譯期間 檢查被拷貝的變量中是否包含 noCopy 或者 sync 關鍵字,若是包含當前關鍵字就會報出如下的錯誤:

package main

import (
    "fmt"
    "sync"
)

func main() {
    wg := sync.Mutex{}
    yawg := wg
    fmt.Println(wg, yawg)
}

$ go run proc.go
./prog.go:10:10: assignment copies lock value to yawg: sync.Mutex
./prog.go:11:14: call of fmt.Println copies lock value: sync.Mutex
./prog.go:11:18: call of fmt.Println copies lock value: sync.Mutex

這段代碼會在賦值和調用 fmt.Println 時發生值拷貝致使分析器報錯,你能夠經過訪問 連接 嘗試運行這段代碼。

除了 noCopy 以外,WaitGroup 結構體中還包含一個總共佔用 12 字節大小的數組,這個數組中會存儲當前結構體持有的狀態和信號量,在 64 位與 32 位的機器上表現也很是不一樣。

golang-waitgroup-state

WaitGroup 提供了私有方法 state 可以幫助咱們從 state1 字段中取出它的狀態和信號量。

操做

WaitGroup 對外暴露的接口只有三個 AddWaitDone,其中 Done 方法只是調用了 wg.Add(-1) 自己並無什麼特殊的邏輯,咱們來了解一下剩餘的兩個方法:

func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state()
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32)
    w := uint32(state)
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    if v > 0 || w == 0 {
        return
    }
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}

Add 方法的主要做用就是更新 WaitGroup 中持有的計數器 counter,64 位狀態的高 32 位,雖然 Add 方法傳入的參數能夠爲負數,可是一個 WaitGroup 的計數器只能是非負數,當調用 Add 方法致使計數器歸零而且還有等待的 Goroutine 時,就會經過 runtime_Semrelease 喚醒處於等待狀態的全部 Goroutine。

另外一個 WaitGroup 的方法 Wait 就會在當前計數器中保存的數據大於 0 時修改等待 Goroutine 的個數 waiter 並調用 runtime_Semacquire 陷入睡眠狀態。

func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()
    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        if v == 0 {
            return
        }
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            runtime_Semacquire(semap)
            if +statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            return
        }
    }
}

陷入睡眠的 Goroutine 就會等待 Add 方法在計數器爲 0 時喚醒。

小結

經過對 WaitGroup 的分析和研究,咱們可以得出如下的一些結論:

  • Add 不能在和 Wait 方法在 Goroutine 中併發調用,一旦出現就會形成程序崩潰;
  • WaitGroup 必須在 Wait 方法返回以後才能被從新使用;
  • Done 只是對 Add 方法的簡單封裝,咱們能夠向 Add 方法傳入任意負數(須要保證計數器非負)快速將計數器歸零以喚醒其餘等待的 Goroutine;
  • 能夠同時有多個 Goroutine 等待當前 WaitGroup 計數器的歸零,這些 Goroutine 也會被『同時』喚醒;

Once

Go 語言在標準庫的 sync 同步包中還提供了 Once 語義,它的主要功能其實也很好理解,保證在 Go 程序運行期間 Once 對應的某段代碼只會執行一次。

在以下所示的代碼中,Do 方法中傳入的函數只會被執行一次,也就是咱們在運行以下所示的代碼時只會看見一次 only once 的輸出結果:

func main() {
    o := &sync.Once{}
    for i := 0; i < 10; i++ {
        o.Do(func() {
            fmt.Println("only once")
        })
    }
}

$ go run main.go
only once

做爲 sync 包中的結構體,Once 有着很是簡單的數據結構,每個 Once 結構體中都只包含一個用於標識代碼塊是否被執行過的 done 以及一個互斥鎖 Mutex

type Once struct {
    done uint32
    m    Mutex
}

Once 結構體對外惟一暴露的方法就是 Do,該方法會接受一個入參爲空的函數,若是使用 atomic.LoadUint32 檢查到已經執行過函數了,就會直接返回,不然就會進入 doSlow 運行傳入的函數:

func (o *Once) Do(f func()) {
    if atomic.LoadUint32(&o.done) == 0 {
        o.doSlow(f)
    }
}

func (o *Once) doSlow(f func()) {
    o.m.Lock()
    defer o.m.Unlock()
    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}

doSlow 的實現也很是簡單,咱們先爲當前的 Goroutine 獲取互斥鎖,而後經過 defer 關鍵字將 done 成員變量設置成 1 並運行傳入的函數,不管當前函數是正常運行仍是拋出 panic,當前方法都會將 done 設置成 1 保證函數不會執行第二次。

小結

做爲用於保證函數執行次數的 Once 結構體,它使用互斥鎖和 atomic 提供的方法實現了某個函數在程序運行期間只能執行一次的語義,在使用的過程當中咱們也須要注意如下的內容:

  • Do 方法中傳入的函數只會被執行一次,哪怕函數中發生了 panic
  • 兩次調用 Do 方法傳入不一樣的函數時只會執行第一次調用的函數;

Cond

Go 語言在標準庫中提供的 Cond 實際上是一個條件變量,經過 Cond 咱們可讓一系列的 Goroutine 都在觸發某個事件或者條件時才被喚醒,每個 Cond 結構體都包含一個互斥鎖 L,咱們先來看一下 Cond 是如何使用的:

func main() {
    c := sync.NewCond(&sync.Mutex{})

    for i := 0; i < 10; i++ {
        go listen(c)
    }

    go broadcast(c)

    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt)
    <-ch
}

func broadcast(c *sync.Cond) {
    c.L.Lock()
    c.Broadcast()
    c.L.Unlock()
}

func listen(c *sync.Cond) {
    c.L.Lock()
    c.Wait()
    fmt.Println("listen")
    c.L.Unlock()
}

$ go run main.go
listen
listen
...
listen

在上述代碼中咱們同時運行了 11 個 Goroutine,其中的 10 個 Goroutine 會經過 Wait 等待指望的信號或者事件,而剩下的一個 Goroutine 會調用 Broadcast 方法通知全部陷入等待的 Goroutine,當調用 Boardcast 方法以後,就會打印出 10 次 "listen" 並結束調用。

golang-cond-broadcast

結構體

Cond 的結構體中包含 noCopycopyChecker 兩個字段,前者用於保證 Cond 不會再編譯期間拷貝,後者保證在運行期間發生拷貝會直接 panic,持有的另外一個鎖 L 實際上是一個接口 Locker,任意實現 LockUnlock 方法的結構體均可以做爲 NewCond 方法的參數:

type Cond struct {
    noCopy noCopy

    L Locker

    notify  notifyList
    checker copyChecker
}

結構體中最後的變量 notifyList 其實也就是爲了實現 Cond 同步機制,該結構體其實就是一個 Goroutine 的鏈表:

type notifyList struct {
    wait uint32
    notify uint32

    lock mutex
    head *sudog
    tail *sudog
}

在這個結構體中,headtail 分別指向的就是整個鏈表的頭和尾,而 waitnotify 分別表示當前正在等待的 Goroutine 和已經通知到的 Goroutine,咱們經過這兩個變量就能確認當前待通知和已通知的 Goroutine。

操做

Cond 對外暴露的 Wait 方法會將當前 Goroutine 陷入休眠狀態,它會先調用 runtime_notifyListAdd 將等待計數器 +1,而後解鎖並調用 runtime_notifyListWait 等待其餘 Goroutine 的喚醒:

func (c *Cond) Wait() {
    c.checker.check()
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

func notifyListAdd(l *notifyList) uint32 {
    return atomic.Xadd(&l.wait, 1) - 1
}

notifyListWait 方法的主要做用就是獲取當前的 Goroutine 並將它追加到 notifyList 鏈表的最末端:

func notifyListWait(l *notifyList, t uint32) {
    lock(&l.lock)

    if less(t, l.notify) {
        unlock(&l.lock)
        return
    }

    s := acquireSudog()
    s.g = getg()
    s.ticket = t
    if l.tail == nil {
        l.head = s
    } else {
        l.tail.next = s
    }
    l.tail = s
    goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
    releaseSudog(s)
}

除了將當前 Goroutine 追加到鏈表的末端以外,咱們還會調用 goparkunlock 陷入休眠狀態,該函數也是在 Go 語言切換 Goroutine 時常常會使用的方法,它會直接讓出當前處理器的使用權並等待調度器的喚醒。

golang-cond-notifylist

Cond 對外提供的 SignalBroadcast 方法就是用來喚醒調用 Wait 陷入休眠的 Goroutine,從兩個方法的名字來看,前者會喚醒隊列最前面的 Goroutine,後者會喚醒隊列中所有的 Goroutine:

func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

notifyListNotifyAll 方法會從鏈表中取出所有的 Goroutine 併爲它們依次調用 readyWithTime,該方法會經過 goready 將目標的 Goroutine 喚醒:

func notifyListNotifyAll(l *notifyList) {
    s := l.head
    l.head = nil
    l.tail = nil

    atomic.Store(&l.notify, atomic.Load(&l.wait))

    for s != nil {
        next := s.next
        s.next = nil
        readyWithTime(s, 4)
        s = next
    }
}

雖然它會依次喚醒所有的 Goroutine,可是這裏喚醒的順序其實也是按照加入隊列的前後順序,先加入的會先被 goready 喚醒,後加入的 Goroutine 可能就須要等待調度器的調度。

notifyListNotifyOne 函數就只會從 sudog 構成的鏈表中知足 sudog.ticket == l.notify 的 Goroutine 並經過 readyWithTime 喚醒:

func notifyListNotifyOne(l *notifyList) {
    t := l.notify
    atomic.Store(&l.notify, t+1)

    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
        if s.ticket == t {
            n := s.next
            if p != nil {
                p.next = n
            } else {
                l.head = n
            }
            if n == nil {
                l.tail = p
            }
            s.next = nil
            readyWithTime(s, 4)
            return
        }
    }
}

在通常狀況下咱們都會選擇在不知足特定條件時調用 Wait 陷入休眠,當某些 Goroutine 檢測到當前知足了喚醒的條件,就能夠選擇使用 Signal 通知一個或者 Broadcast 通知所有的 Goroutine 當前條件已經知足,能夠繼續完成工做了。

小結

Mutex 相比,Cond 仍是一個不被全部人都清楚和理解的同步機制,它提供了相似隊列的 FIFO 的等待機制,同時也提供了 SignalBroadcast 兩種不一樣的喚醒方法,相比於使用 for {} 忙碌等待,使用 Cond 可以在遇到長時間條件沒法知足時將當前處理器讓出的功能,若是咱們合理使用仍是可以在一些狀況下提高性能,在使用的過程當中咱們須要注意:

  • Wait 方法在調用以前必定要使用 L.Lock 持有該資源,不然會發生 panic 致使程序崩潰;
  • Signal 方法喚醒的 Goroutine 都是隊列最前面、等待最久的 Goroutine;
  • Broadcast 雖然是廣播通知所有等待的 Goroutine,可是真正被喚醒時也是按照必定順序的;

擴展原語

除了這些標準庫中提供的同步原語以外,Go 語言還在子倉庫 x/sync 中提供了額外的四種同步原語,ErrGroupSemaphoreSingleFlightSyncMap,其中的 SyncMap 其實就是 sync 包中的 sync.Map,它在 1.9 版本的 Go 語言中被引入了 x/sync 包,隨着 API 的成熟和穩定最後被移到了標準庫 sync 包中。

golang-extension-sync-primitives

咱們在這一節中就會介紹 Go 語言目前在擴展包中提供的三種原語,也就是 ErrGroupSemaphoreSingleFlight

ErrGroup

子倉庫 x/sync 中的包 errgroup 其實就爲咱們在一組 Goroutine 中提供了同步、錯誤傳播以及上下文取消的功能,咱們可使用以下所示的方式並行獲取網頁的數據:

var g errgroup.Group
var urls = []string{
    "http://www.golang.org/",
    "http://www.google.com/",
    "http://www.somestupidname.com/",
}
for i := range urls {
    url := urls[i]
    g.Go(func() error {
        resp, err := http.Get(url)
        if err == nil {
            resp.Body.Close()
        }
        return err
    })
}
if err := g.Wait(); err == nil {
    fmt.Println("Successfully fetched all URLs.")
}

Go 方法可以建立一個 Goroutine 並在其中執行傳入的函數,而 Wait 方法會等待 Go 方法建立的 Goroutine 所有返回後返回第一個非空的錯誤,若是全部的 Goroutine 都沒有返回錯誤,該函數就會返回 nil

結構體

errgroup 包中的 Group 結構體同時由三個比較重要的部分組成:

  1. 建立 Context 時返回的 cancel 函數,主要用於通知使用 context 的 Goroutine 因爲某些子任務出錯,能夠中止工做讓出資源了;
  2. 用於等待一組 Goroutine 完成子任務的 WaitGroup 同步原語;
  3. 用於接受子任務返回錯誤的 err 和保證 err 只會被賦值一次的 errOnce
type Group struct {
    cancel func()

    wg sync.WaitGroup

    errOnce sync.Once
    err     error
}

這些字段共同組成了 Group 結構體併爲咱們提供同步、錯誤傳播以及上下文取消等功能。

操做

errgroup 對外惟一暴露的構造器就是 WithContext 方法,咱們只能從一個 Context 中建立一個新的 Group 變量,WithCancel 返回的取消函數也僅會在 Group 結構體內部使用:

func WithContext(ctx context.Context) (*Group, context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    return &Group{cancel: cancel}, ctx
}

建立新的並行子任務須要使用 Go 方法,這個方法內部會對 WaitGroup 加一併建立一個新的 Goroutine,在 Goroutine 內部運行子任務並在返回錯誤時及時調用 cancel 並對 err 賦值,只有最先返回的錯誤纔會被上游感知到,後續的錯誤都會被捨棄:

func (g *Group) Go(f func() error) {
    g.wg.Add(1)

    go func() {
        defer g.wg.Done()

        if err := f(); err != nil {
            g.errOnce.Do(func() {
                g.err = err
                if g.cancel != nil {
                    g.cancel()
                }
            })
        }
    }()
}

func (g *Group) Wait() error {
    g.wg.Wait()
    if g.cancel != nil {
        g.cancel()
    }
    return g.err
}

Wait 方法其實就只是調用了 WaitGroup 的同步方法,在子任務所有完成時取消 Context 並返回可能出現的錯誤。

小結

errgroup 包中的 Group 同步原語的實現原理仍是很是簡單的,它沒有涉及很是底層和運行時包中的 API,只是對基本同步語義進行了簡單的封裝提供了更加複雜的功能,在使用時咱們也須要注意如下的幾個問題:

  • 出現錯誤或者等待結束後都會調用 Contextcancel 方法取消上下文;
  • 只有第一個出現的錯誤纔會被返回,剩餘的錯誤都會被直接拋棄;

Semaphore

信號量是在併發編程中比較常見的一種同步機制,它會保證持有的計數器在 0 到初始化的權重之間,每次獲取資源時都會將信號量中的計數器減去對應的數值,在釋放時從新加回來,當遇到計數器大於信號量大小時就會進入休眠等待其餘進程釋放信號,咱們經常會在控制訪問資源的進程數量時用到。

Golang 的擴展包中就提供了帶權重的信號量,咱們能夠按照不一樣的權重對資源的訪問進行管理,這個包對外也只提供了四個方法:

  • NewWeighted 用於建立新的信號量;
  • Acquire 獲取了指定權重的資源,若是當前沒有『空閒資源』,就會陷入休眠等待;
  • TryAcquire 也用於獲取指定權重的資源,可是若是當前沒有『空閒資源』,就會直接返回 false
  • Release 用於釋放指定權重的資源;

結構體

NewWeighted 方法的主要做用建立一個新的權重信號量,傳入信號量最大的權重就會返回一個新的 Weighted 結構體指針:

func NewWeighted(n int64) *Weighted {
    w := &Weighted{size: n}
    return w
}

type Weighted struct {
    size    int64
    cur     int64
    mu      sync.Mutex
    waiters list.List
}

Weighted 結構體中包含一個 waiters 列表其中存儲着等待獲取資源的『用戶』,除此以外它還包含當前信號量的上限以及一個計數器 cur,這個計數器的範圍就是 [0, size]

golang-semaphore

信號量中的計數器會隨着用戶對資源的訪問和釋放進行改變,引入的權重概念可以幫助咱們更好地對資源的訪問粒度進行控制,儘量知足全部常見的用例。

獲取

在上面咱們已經提到過 Acquire 方法就是用於獲取指定權重資源的方法,這個方法總共由三個不一樣的狀況組成:

  1. 當信號量中剩餘的資源大於獲取的資源而且沒有等待的 Goroutine 時就會直接獲取信號量;
  2. 當須要獲取的信號量大於 Weighted 的大小時,因爲不可能知足條件就會直接返回;
  3. 遇到其餘狀況時會將當前 Goroutine 加入到等待列表並經過 select 等待當前 Goroutine 被喚醒,被喚醒後就會獲取信號量;
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
    s.mu.Lock()
    if s.size-s.cur >= n && s.waiters.Len() == 0 {
        s.cur += n
        s.mu.Unlock()
        return nil
    }

    if n > s.size {
        s.mu.Unlock()
        <-ctx.Done()
        return ctx.Err()
    }

    ready := make(chan struct{})
    w := waiter{n: n, ready: ready}
    elem := s.waiters.PushBack(w)
    s.mu.Unlock()

    select {
    case <-ctx.Done():
        err := ctx.Err()
        s.mu.Lock()
        select {
        case <-ready:
            err = nil
        default:
            s.waiters.Remove(elem)
        }
        s.mu.Unlock()
        return err

    case <-ready:
        return nil
    }
}

另外一個用於獲取信號量的方法 TryAcquire 相比之下就很是簡單,它只會判斷當前信號量是否有充足的資源獲取,若是有充足的資源就會直接馬上返回 true 不然就會返回 false

func (s *Weighted) TryAcquire(n int64) bool {
    s.mu.Lock()
    success := s.size-s.cur >= n && s.waiters.Len() == 0
    if success {
        s.cur += n
    }
    s.mu.Unlock()
    return success
}

Acquire 相比,TryAcquire 因爲不會等待資源的釋放因此可能更適用於一些延時敏感、用戶須要馬上感知結果的場景。

釋放

最後要介紹的 Release 方法其實也很是簡單,當咱們對信號量進行釋放時,Release 方法會從頭至尾遍歷 waiters 列表中所有的等待者,若是釋放資源後的信號量有充足的剩餘資源就會經過 Channel 喚起指定的 Goroutine:

func (s *Weighted) Release(n int64) {
    s.mu.Lock()
    s.cur -= n
    for {
        next := s.waiters.Front()
        if next == nil {
            break
        }

        w := next.Value.(waiter)
        if s.size-s.cur < w.n {
            break
        }

        s.cur += w.n
        s.waiters.Remove(next)
        close(w.ready)
    }
    s.mu.Unlock()
}

固然也可能會出現剩餘資源沒法喚起 Goroutine 的狀況,在這時當前方法就會釋放鎖後直接返回,經過對這段代碼的分析咱們也能發現,若是一個信號量須要的佔用的資源很是多,他可能會長時間沒法獲取鎖,這可能也是 Acquire 方法引入另外一個參數 Context 的緣由,爲信號量的獲取設置一個超時時間。

小結

帶權重的信號量確實有着更多的應用場景,這也是 Go 語言對外提供的惟一一種信號量實現,在使用的過程當中咱們須要注意如下的幾個問題:

  • AcquireTryAcquire 方法均可以用於獲取資源,前者用於同步獲取會等待鎖的釋放,後者會在沒法獲取鎖時直接返回;
  • Release 方法會按照 FIFO 的順序喚醒能夠被喚醒的 Goroutine;
  • 若是一個 Goroutine 獲取了較多地資源,因爲 Release 的釋放策略可能會等待比較長的時間;

SingleFlight

singleflight 是 Go 語言擴展包中提供了另外一種同步原語,這其實也是做者最喜歡的一種同步擴展機制,它可以在一個服務中抑制對下游的屢次重複請求,一個比較常見的使用場景是 — 咱們在使用 Redis 對數據庫中的一些熱門數據進行了緩存並設置了超時時間,緩存超時的一瞬間可能有很是多的並行請求發現了 Redis 中已經不包含任何緩存因此大量的流量會打到數據庫上影響服務的延時和質量。

golang-query-without-single-flight

可是 singleflight 就能有效地解決這個問題,它的主要做用就是對於同一個 Key 最終只會進行一次函數調用,在這個上下文中就是隻會進行一次數據庫查詢,查詢的結果會寫回 Redis 並同步給全部請求對應 Key 的用戶:

golang-extension-single-flight

這其實就減小了對下游的瞬時流量,在獲取下游資源很是耗時,例如:訪問緩存、數據庫等場景下就很是適合使用 singleflight 對服務進行優化,在上述的這個例子中咱們就能夠在想 Redis 和數據庫中獲取數據時都使用 singleflight 提供的這一功能減小下游的壓力;它的使用其實也很是簡單,咱們能夠直接使用 singleflight.Group{} 建立一個新的 Group 結構體,而後經過調用 Do 方法就能對相同的請求進行抑制:

type service struct {
    requestGroup singleflight.Group
}

func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {
    v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) {
        rows, err := // select * from tables
        if err != nil {
            return nil, err
        }
        return rows, nil
    })
    if err != nil {
        return nil, err
    }
    
    return Response{
        rows: rows,
    }, nil
}

上述代碼使用請求的哈希做爲抑制相同請求的鍵,咱們也能夠選擇一些比較關鍵或者重要的字段做爲 Do 方法的第一個參數避免對下游的瞬時大量請求。

結構體

Group 結構體自己由一個互斥鎖 Mutex 和一個從 Keycall 結構體指針的映射表組成,每個 call 結構體都保存了當前此次調用對應的信息:

type Group struct {
    mu sync.Mutex
    m  map[string]*call
}

type call struct {
    wg sync.WaitGroup

    val interface{}
    err error

    dups  int
    chans []chan<- Result
}

call 結構體中的 valerr 字段都是在執行傳入的函數時只會被賦值一次,它們也只會在 WaitGroup 等待結束都被讀取,而 dupschans 字段分別用於存儲當前 singleflight 抑制的請求數量以及在結果返回時將信息傳遞給調用方。

操做

singleflight 包提供了兩個用於抑制相同請求的方法,其中一個是同步等待的方法 Do,另外一個是返回 Channel 的 DoChan,這兩個方法在功能上沒有太多的區別,只是在接口的表現上稍有不一樣。

每次 Do 方法的調用時都會獲取互斥鎖並嘗試對 Group 持有的映射表進行懶加載,隨後判斷是否已經存在 key 對應的函數調用:

  1. 當不存在對應的 call 結構體時:

    1. 初始化一個新的 call 結構體指針;
    2. 增長 WaitGroup 持有的計數器;
    3. call 結構體指針添加到映射表;
    4. 釋放持有的互斥鎖 Mutex
    5. 阻塞地調用 doCall 方法等待結果的返回;
  2. 當已經存在對應的 call 結構體時;

    1. 增長 dups 計數器,它表示當前重複的調用次數;
    2. 釋放持有的互斥鎖 Mutex
    3. 經過 WaitGroup.Wait 等待請求的返回;
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        c.dups++
        g.mu.Unlock()
        c.wg.Wait()
        return c.val, c.err, true
    }
    c := new(call)
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    g.doCall(c, key, fn)
    return c.val, c.err, c.dups > 0
}

由於 valerr 兩個字段都只會在 doCall 方法中被賦值,因此當 doCall 方法和 WaitGroup.Wait 方法返回時,這兩個值就會返回給 Do 函數的調用者。

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
    c.val, c.err = fn()
    c.wg.Done()

    g.mu.Lock()
    delete(g.m, key)
    for _, ch := range c.chans {
        ch <- Result{c.val, c.err, c.dups > 0}
    }
    g.mu.Unlock()
}

doCall 中會運行傳入的函數 fn,該函數的返回值就會賦值給 c.valc.err,函數執行結束後就會調用 WaitGroup.Done 方法通知全部被抑制的請求,當前函數已經執行完成,能夠從 call 結構體中取出返回值並返回了;在這以後,doCall 方法會獲取持有的互斥鎖並經過管道將信息同步給使用 DoChan 方法的調用方。

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
    ch := make(chan Result, 1)
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        c.dups++
        c.chans = append(c.chans, ch)
        g.mu.Unlock()
        return ch
    }
    c := &call{chans: []chan<- Result{ch}}
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    go g.doCall(c, key, fn)

    return ch
}

DoChan 方法和 Do的區別就是,它使用 Goroutine 異步執行 doCall 並向 call 持有的 chans 切片中追加 chan Result 變量,這也是它可以提供異步傳值的緣由。

小結

singleflight 包提供的 Group 接口確實很是好用,當咱們須要這種抑制對下游的相同請求時就能夠經過這個方法來增長吞吐量和服務質量,在使用的過程當中咱們也須要注意如下的幾個問題:

  • DoDoChan 一個用於同步阻塞調用傳入的函數,一個用於異步調用傳入的參數並經過 Channel 接受函數的返回值;
  • Forget 方法能夠通知 singleflight 在持有的映射表中刪除某個鍵,接下來對該鍵的調用就會直接執行方法而不是等待前面的函數返回;
  • 一旦調用的函數返回了錯誤,全部在等待的 Goroutine 也都會接收到一樣的錯誤;

總結

咱們在這一節中介紹了 Go 語言標準庫中提供的基本原語以及擴展包中的擴展原語,這些併發編程的原語可以幫助咱們更好地利用 Go 語言的特性構建高吞吐量、低延時的服務,並解決因爲併發帶來的錯誤,到這裏咱們再從新回顧一下這一節介紹的內容:

  • Mutex 互斥鎖

    • 若是互斥鎖處於初始化狀態,就會直接經過置位 mutexLocked 加鎖;
    • 若是互斥鎖處於 mutexLocked 而且在普通模式下工做,就會進入自旋,執行 30 次 PAUSE 指令消耗 CPU 時間等待鎖的釋放;
    • 若是當前 Goroutine 等待鎖的時間超過了 1ms,互斥鎖就會被切換到飢餓模式;
    • 互斥鎖在正常狀況下會經過 runtime_SemacquireMutex 方法將調用 Lock 的 Goroutine 切換至休眠狀態,等待持有信號量的 Goroutine 喚醒當前協程;
    • 若是當前 Goroutine 是互斥鎖上的最後一個等待的協程或者等待的時間小於 1ms,當前 Goroutine 會將互斥鎖切換回正常模式;
    • 若是互斥鎖已經被解鎖,那麼調用 Unlock 會直接拋出異常;
    • 若是互斥鎖處於飢餓模式,會直接將鎖的全部權交給隊列中的下一個等待者,等待者會負責設置 mutexLocked 標誌位;
    • 若是互斥鎖處於普通模式,而且沒有 Goroutine 等待鎖的釋放或者已經有被喚醒的 Goroutine 得到了鎖就會直接返回,在其餘狀況下回經過 runtime_Semrelease 喚醒對應的 Goroutine;
  • RWMutex 讀寫互斥鎖

    • readerSem — 讀寫鎖釋放時通知因爲獲取讀鎖等待的 Goroutine;
    • writerSem — 讀鎖釋放時通知因爲獲取讀寫鎖等待的 Goroutine;
    • w 互斥鎖 — 保證寫操做之間的互斥;
    • readerCount — 統計當前進行讀操做的協程數,觸發寫鎖時會將其減小 rwmutexMaxReaders 阻塞後續的讀操做;
    • readerWait — 當前讀寫鎖等待的進行讀操做的協程數,在觸發 Lock 以後的每次 RUnlock 都會將其減一,當它歸零時該 Goroutine 就會得到讀寫鎖;
    • 當讀寫鎖被釋放 Unlock 時首先會通知全部的讀操做,而後纔會釋放持有的互斥鎖,這樣可以保證讀操做不會被連續的寫操做『餓死』;
  • WaitGroup 等待一組 Goroutine 結束

    • Add 不能在和 Wait 方法在 Goroutine 中併發調用,一旦出現就會形成程序崩潰;
    • WaitGroup 必須在 Wait 方法返回以後才能被從新使用;
    • Done 只是對 Add 方法的簡單封裝,咱們能夠向 Add 方法傳入任意負數(須要保證計數器非負)快速將計數器歸零以喚醒其餘等待的 Goroutine;
    • 能夠同時有多個 Goroutine 等待當前 WaitGroup 計數器的歸零,這些 Goroutine 也會被『同時』喚醒;
  • Once 程序運行期間僅執行一次

    • Do 方法中傳入的函數只會被執行一次,哪怕函數中發生了 panic
    • 兩次調用 Do 方法傳入不一樣的函數時只會執行第一次調用的函數;
  • Cond 發生指定事件時喚醒

    • Wait 方法在調用以前必定要使用 L.Lock 持有該資源,不然會發生 panic 致使程序崩潰;
    • Signal 方法喚醒的 Goroutine 都是隊列最前面、等待最久的 Goroutine;
    • Broadcast 雖然是廣播通知所有等待的 Goroutine,可是真正被喚醒時也是按照必定順序的;
  • ErrGroup 爲一組 Goroutine 提供同步、錯誤傳播以及上下文取消的功能

    • 出現錯誤或者等待結束後都會調用 Contextcancel 方法取消上下文;
    • 只有第一個出現的錯誤纔會被返回,剩餘的錯誤都會被直接拋棄;
  • Semaphore 帶權重的信號量

    • AcquireTryAcquire 方法均可以用於獲取資源,前者用於同步獲取會等待鎖的釋放,後者會在沒法獲取鎖時直接返回;
    • Release 方法會按照 FIFO 的順序喚醒能夠被喚醒的 Goroutine;
    • 若是一個 Goroutine 獲取了較多地資源,因爲 Release 的釋放策略可能會等待比較長的時間;
  • SingleFlight 用於抑制對下游的重複請求

    • DoDoChan 一個用於同步阻塞調用傳入的函數,一個用於異步調用傳入的參數並經過 Channel 接受函數的返回值;
    • Forget 方法能夠通知 singleflight 在持有的映射表中刪除某個鍵,接下來對該鍵的調用就會直接執行方法而不是等待前面的函數返回;
    • 一旦調用的函數返回了錯誤,全部在等待的 Goroutine 也都會接收到一樣的錯誤;

這些同步原語的實現不只要考慮 API 接口的易用、解決併發編程中可能遇到的線程競爭問題,還須要對尾延時進行優化避免某些 Goroutine 沒法獲取鎖或者資源而被餓死,對同步原語的學習也可以加強咱們隊併發編程的理解和認識,也是瞭解併發編程沒法跨越的一個步驟。

Reference

相關文章
相關標籤/搜索