源碼剖析sync.WaitGroup(文末思考題你能解釋一下嗎?)

原文連接:源碼剖析sync.WaitGroup(文末思考題你能解釋一下嗎?)java

前言

哈嘍,你們好,我是 asong,這是我併發編程系列的第三篇文章,上一篇咱們一塊兒分析了 sync.once的使用與實現,今天咱們一塊兒來看一看 sync.WaitGroup的使用與實現.

快過年了,這是年前最後一篇推文了,待我積累一下,年後加大力度寫乾貨,在這裏先預祝你們新春快樂,身體健康,萬事如意!golang

什麼是sync.WaitGroup

官方文檔對sync.WatiGroup的描述是:一個waitGroup對象能夠等待一組協程結束,也就等待一組goroutine返回。有了sync.Waitgroup咱們能夠將本來順序執行的代碼在多個Goroutine中併發執行,加快程序處理的速度。其實他與java中的CountdownLatch,阻塞等待全部任務完成以後再繼續執行。咱們來看官網給的一個例子,這個例子使用waitGroup阻塞主進程,併發獲取多個URL,直到完成全部獲取:面試

package main

import (
    "sync"
)

type httpPkg struct{}

func (httpPkg) Get(url string) {}

var http httpPkg

func main() {
    var wg sync.WaitGroup
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/",
    }
    for _, url := range urls {
        // Increment the WaitGroup counter.
        wg.Add(1)
        // Launch a goroutine to fetch the URL.
        go func(url string) {
            // Decrement the counter when the goroutine completes.
            defer wg.Done()
            // Fetch the URL.
            http.Get(url)
        }(url)
    }
    // Wait for all HTTP fetches to complete.
    wg.Wait()
}

首先咱們須要聲明一個sync.WaitGroup對象,在主gorourine調用Add()方法設置要等待的goroutine數量,每個Goroutine在運行結束時要調用Done()方法,同時使用Wait()方法進行阻塞直到全部的goroutine完成。算法

爲何要用sync.waitGroup

咱們在平常開發中爲了提升接口響應時間,有一些場景須要在多個goroutine中作一些互不影響的業務,這樣能夠節省很多時間,可是須要協調多個goroutine,沒有sync.WaitGroup的時候,咱們可使用通道來解決這個問題,咱們把主Goroutine當成銅鑼扛把子a song,把每個Goroutine當成一個馬仔,asong管理這些馬仔,讓這些馬仔去收保護費,我今天派10個馬仔去收保護費,每個馬仔收好了保護費就在帳本上打一個✅,當全部馬仔都收好了保護費,帳本上就被打滿了✅,活全被幹完了,很出色,而後酒吧走起,浪一浪,全場的消費鬆公子買單,寫成代碼能夠這樣表示:編程

func exampleImplWaitGroup()  {
    done := make(chan struct{}) // 收10份保護費
    count := 10 // 10個馬仔
    for i:=0;i < count;i++{
        go func(i int) {
            defer func() {
                done <- struct {}{}
            }()
            fmt.Printf("馬仔%d號收保護費\n",i)
        }(i)
    }
    for i:=0;i< count;i++{
        <- done
        fmt.Printf("馬仔%d號已經收完保護費\n",i)
    }
    fmt.Println("全部馬仔已經幹完活了,開始酒吧消費~")
}

雖然這樣能夠實現,可是咱們每次使用都要保證主Goroutine最後從通道接收的次數須要與以前其餘的Goroutine發送元素的次數相同,實現起來不夠優雅,在這種場景下咱們就能夠選用sync.WaitGroup來幫助咱們實現同步。segmentfault

源碼剖析

前面咱們已經知道sync.waitGroup的基本使用了,接下來咱們就一塊兒看看他是怎樣實現的~,只有知其因此然,才能寫出更健壯的代碼。設計模式

Go version: 1.15.3數組

首先咱們看一下sync.WaitGroup的結構:緩存

// A WaitGroup must not be copied after first use.
type WaitGroup struct {
    noCopy noCopy

    // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
    // 64-bit atomic operations require 64-bit alignment, but 32-bit
    // compilers do not ensure it. So we allocate 12 bytes and then use
    // the aligned 8 bytes in them as state, and the other 4 as storage
    // for the sema.
    state1 [3]uint32
}

總共就有兩個字段,nocopy是爲了保證該結構不會被進行拷貝,這是一種保護機制,會在後面進行介紹;state1主要是存儲着狀態和信號量,這裏使用的8字節對齊處理的方式頗有意思,我先來一塊兒看看這種處理。安全

state1狀態和信號量處理

state1這裏總共被分配了12個字節,這裏被設計了三種狀態:

  • 其中對齊的8個字節做爲狀態,高32位爲計數的數量,低32位爲等待的goroutine數量
  • 其中的4個字節做爲信號量存儲

提供了(wg *WaitGroup) state() (statep *uint64, semap *uint32)幫助咱們從state1字段中取出他的狀態和信號量,爲何要這樣設計呢?

咱們在分析atomicGo看源碼必會知識之unsafe包有說到過,64位原子操做須要64位對齊,可是32位編譯器不能保證這一點,因此爲了保證waitGroup32位平臺上使用的話,就必須保證在任什麼時候候,64位操做不會報錯。因此也就不能分紅兩個字段來寫,考慮到字段順序不一樣、平臺不一樣,內存對齊也就不一樣。所以這裏採用動態識別當前咱們操做的64位數究竟是不是在8字節對齊的位置上面,咱們來分析一下state方法:

// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
    if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
        return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
    } else {
        return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
    }
}

當數組的首地址是處於一個8字節對齊的位置上時,那麼就將這個數組的前8個字節做爲64位值使用表示狀態,後4個字節做爲32位值表示信號量(semaphore)。同理若是首地址沒有處於8字節對齊的位置上時,那麼就將前4個字節做爲semaphore,後8個字節做爲64位數值。畫個圖表示一下:

image

Add()Done()方法

sync.WaitGroup提供了Add()方法增長一個計數器,Done()方法減掉一個計數,Done()方法實現比較簡單,內部調用的Add()方法實現的計數器減一操做,也就是增減邏輯都在Add()方法中,因此咱們重點看一下Add()是如何實現的:

func (wg *WaitGroup) Add(delta int) {
  // 獲取狀態(Goroutine Counter 和 Waiter Counter)和信號量
    statep, semap := wg.state()
    if race.Enabled {
        _ = *statep // trigger nil deref early
        if delta < 0 {
            // Synchronize decrements with Wait.
            race.ReleaseMerge(unsafe.Pointer(wg))
        }
        race.Disable()
        defer race.Enable()
    }
  // 原子操做,goroutine counter累加delta
    state := atomic.AddUint64(statep, uint64(delta)<<32)
  // 獲取當前goroutine counter的值(高32位)
    v := int32(state >> 32)
  // 獲取當前waiter counter的值(低32位)
    w := uint32(state)
    if race.Enabled && delta > 0 && v == int32(delta) {
        // The first increment must be synchronized with Wait.
        // Need to model this as a read, because there can be
        // several concurrent wg.counter transitions from 0.
        race.Read(unsafe.Pointer(semap))
    }
  // Goroutine counter是不容許爲負數的,不然會發生panic
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
  // 當wait的Goroutine不爲0時,累加後的counter值和delta相等,說明Add()和Wait()同時調用了,因此發生panic,由於正確的作法是先Add()後Wait(),也就是已經調用了wait()就不容許再添加任務了
    if w != 0 && delta > 0 && v == int32(delta) {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
  // 正常`Add()`方法後,`goroutine Counter`計數器大於0或者`waiter Counter`計數器等於0時,不須要釋放信號量
    if v > 0 || w == 0 {
        return
    }
    // 能走到這裏說明當前Goroutine Counter計數器爲0,Waiter Counter計數器大於0, 到這裏數據也就是容許發生變更了,若是發生變更了,則出發panic
    if *statep != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // 重置狀態,併發出信號量告訴wait全部任務已經完成
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}

上面的代碼有一部分是race靜態檢測,下面的分析會省略這一部分,由於它並非本文的重點。

註釋我都添加到對應的代碼行上了,你是否都看懂了,沒看懂沒關係,由於Add()是與Wait()方法一塊使用的,因此有些邏輯與wait()裏的邏輯是相互照應的,因此當咱們看完wait()方法的實如今總結一下大家就明白了。

Wait()方法

sync.Wait()方法會阻塞主Goroutine直到WaitGroup計數器變爲0。咱們一塊兒來看一下Wait()方法的源碼:

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
  // 獲取狀態(Goroutine Counter 和 Waiter Counter)和信號量
    statep, semap := wg.state()
    if race.Enabled {
        _ = *statep // trigger nil deref early
        race.Disable()
    }
    for {
    // 使用原子操做讀取state,是爲了保證Add中的寫入操做已經完成
        state := atomic.LoadUint64(statep)
    // 獲取當前goroutine counter的值(高32位)
        v := int32(state >> 32)
     // 獲取當前waiter counter的值(低32位)
        w := uint32(state)
    // 若是沒有任務,或者任務已經在調用`wait`方法前已經執行完成了,就不用阻塞了
        if v == 0 {
            // Counter is 0, no need to wait.
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
        // 使用CAS操做對`waiter Counter`計數器進行+1操做,外面有for循環保證這裏能夠進行重試操做
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            if race.Enabled && w == 0 {
                // Wait must be synchronized with the first Add.
                // Need to model this is as a write to race with the read in Add.
                // As a consequence, can do the write only for the first waiter,
                // otherwise concurrent Waits will race with each other.
                race.Write(unsafe.Pointer(semap))
            }
      // 在這裏獲取信號量,使線程進入睡眠狀態,與Add方法中最後的增長信號量相對應,也就是當最後一個任務調用Done方法
      // 後會調用Add方法對goroutine counter的值減到0,就會走到最後的增長信號量
            runtime_Semacquire(semap)
      // 在Add方法中增長信號量時已經將statep的值設爲0了,若是這裏不是0,說明在wait以後又調用了Add方法,使用時機不對,觸發panic
            if *statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
    }
}

源碼總結

分了源碼,咱們能夠總結以下:

  • Add方法與wait方法不能夠併發同時調用,Add方法要在wait方法以前調用.
  • Add()設置的值必須與實際等待的goroutine個數一致,不然會panic.
  • 調用了wait方法後,必需要在wait方法返回之後才能再次從新使用waitGroup,也就是Wait沒有返回以前不要在調用Add方法,不然會發生Panic.
  • Done 只是對Add 方法的簡單封裝,咱們能夠向 Add方法傳入任意負數(須要保證計數器非負)快速將計數器歸零以喚醒等待的 Goroutine.
  • waitGroup對象只能有一份,不能夠拷貝給其餘變量,不然會形成意想不到的Bug.

no copy機制

在前文看waitGroup結構時,有一個nocopy字段,爲何要有nocopy呢?咱們先看這樣一個例子:

type User struct {
    Name string
    Info *Info
}

type Info struct {
    Age int
    Number int
}


func main()  {
    u := User{
        Name: "asong",
        Info: &Info{
            Age: 10,
            Number: 24,
        },
    }
    u1 := u
    u1.Name = "Golang夢工廠"
    u1.Info.Age = 30
    fmt.Println(u.Info.Age,u.Name)
    fmt.Println(u1.Info.Age,u1.Name)
}
// 運行結果
30 asong
30 Golang夢工廠

結構體User中有兩個字段NameInfoNameString類型,Info是指向結構體Info的指針類型,咱們首先聲明瞭一個u變量,對他進行復制拷貝獲得變量u1,在u1中對兩個字段進行改變,能夠看到Info字段發生了更改,而Name就沒發生更改,這就引起了安全問題,若是結構體對象包含指針字段,當該對象被拷貝時,會使得兩個對象中的指針字段變得再也不安全。

Go語言中提供了兩種copy檢查,一種是在運行時進行檢查,一種是經過靜態檢查。不過運行檢查是比較影響程序的執行性能的,Go官方目前只提供了strings.Builder和sync.Cond的runtime拷貝檢查機制,對於其餘須要nocopy對象類型來講,使用go vet工具來作靜態編譯檢查。運行檢查的實現能夠經過比較所屬對象是否發生變動就能夠判斷,而靜態檢查是提供了一個nocopy對象,只要是該對象或對象中存在nocopy字段,他就實現了sync.Locker接口, 它擁有Lock()和Unlock()方法,以後,能夠經過go vet功能,來檢查代碼中該對象是否有被copy。

踩坑事項

在文章的最後總結一下使用waitGroup易錯的知識點,防止你們再次犯錯。

  1. waitGroup中計數器的值是不能小於0的,源碼中咱們就能夠看到,一旦小於0就會引起panic。
  2. 必定要住注意調用Add方法與Wait方法的順序,不可併發同時調用這兩個方法,不然就會引起panic,同時在調用了wait方法在其沒有釋放前不要再次調用Add方法,這樣也會引起panicwaitGroup是能夠複用的,可是須要保證其計數週期的完整性。
  3. WaitGroup對象不是一個引用類型,經過函數傳值的時候須要使用地址,由於Go語言只有值傳遞,傳遞WaitGroup是值的話,就會致使會發生panic,看這樣一個例子:
func main()  {
    wg := sync.WaitGroup{}
    wg.Add(1)
    doDeadLock(wg)
    wg.Wait()
}
func doDeadLock(wg sync.WaitGroup)  {
    defer wg.Done()
    fmt.Println("do something")
}
//運行結果:panic: sync: negative WaitGroup counter

發生這個問題的緣由就是在doDeadLock()方法中wg是一個新對象,直接調用Done方法,計數器就會出現負數,因此引起panic,爲了安全起見,對於這種傳結構體的場景通常建議都傳指針就行了,基本能夠避免一些問題。

  1. Add()設置的值必須與實際等待的goroutine個數一致,不然會panic,很重要的一點,也是很容易出錯的地方。

思考題

最後給你們出一個思考題,下面這段代碼會不會發生panic

func main() {
    wg := sync.WaitGroup{}
    wg.Add(100)
    for i := 0; i < 100; i++ {
        go func() {
            defer wg.Done()
            fmt.Println(i)
        }()
    }
    wg.Wait()
}

結尾

在最後,祝你們新年快樂,心想事成,萬事如意~~~

好啦,這篇文章就到這裏啦,素質三連(分享、點贊、在看)都是筆者持續創做更多優質內容的動力!

建立了一個Golang學習交流羣,歡迎各位大佬們踊躍入羣,咱們一塊兒學習交流。入羣方式:加我vx拉你入羣,或者公衆號獲取入羣二維碼

結尾給你們發一個小福利吧,最近我在看[微服務架構設計模式]這一本書,講的很好,本身也收集了一本PDF,有須要的小夥能夠到自行下載。獲取方式:關注公衆號:[Golang夢工廠],後臺回覆:[微服務],便可獲取。

我翻譯了一份GIN中文文檔,會按期進行維護,有須要的小夥伴後臺回覆[gin]便可下載。

翻譯了一份Machinery中文文檔,會按期進行維護,有須要的小夥伴們後臺回覆[machinery]便可獲取。

我是asong,一名普普統統的程序猿,讓咱們一塊兒慢慢變強吧。歡迎各位的關注,咱們下期見~~~

推薦往期文章:

相關文章
相關標籤/搜索