你應該知道的 Go WaitGroup 剖析

hi,你們好,我是haohongfan。golang

本篇主要介紹 WaitGroup 的一些特性,讓咱們從本質上去了解 WaitGroup。關於 WaitGroup 的基本用法這裏就不作過多介紹了。相對於《這多是最容易理解的 Go Mutex 源碼剖析》來講,WaitGroup 就簡單的太多了。服務器

源碼剖析

Add()

add微信

Wait()

wait併發

type WaitGroup struct {
 noCopy noCopy
 state1 [3]uint32
}

WaitGroup 底層結構看起來簡單,但 WaitGroup.state1 其實表明三個字段:counter,waiter,sema。函數

  • counter :能夠理解爲一個計數器,計算通過 wg.Add(N), wg.Done() 後的值。性能

  • waiter :當前等待 WaitGroup 任務結束的等待者數量。其實就是調用 wg.Wait() 的次數,因此一般這個值是 1 。測試

  • sema :信號量,用來喚醒 Wait() 函數。ui

爲何要將 counter 和 waiter 放在一塊兒 ?

實際上是爲了保證 WaitGroup 狀態的完整性。舉個例子,看下面的一段源碼atom

// sync/waitgroup.go:L79 --> Add()
if v > 0 || w == 0 { // v => counter, w => waiter
    return
}
// ...
*statep = 0
for ; w != 0; w-- {
    runtime_Semrelease(semap, false0)
}

當同時發現 wg.counter <= 0 && wg.waiter != 0 時,纔會去喚醒等待的 waiters,讓等待的協程繼續運行。可是使用 WaitGroup 的調用方通常都是併發操做,若是不一樣時獲取的 counter 和 waiter 的話,就會形成獲取到的 counter 和 waiter 可能不匹配,形成程序 deadlock 或者程序提早結束等待。url

如何獲取 counter 和 waiter ?

對於 wg.state 的狀態變動,WaitGroup 的 Add(),Wait() 是使用 atomic 來作原子計算的(爲了不鎖競爭)。可是因爲 atomic 須要使用者保證其 64 位對齊,因此將 counter 和 waiter 都設置成 uint32,同時做爲一個變量,即知足了 atomic 的要求,同時也保證了獲取 waiter 和 counter 的狀態完整性。但這也就致使了 32位,64位機器上獲取 state 的方式並不相同。以下圖:簡單解釋下:

由於 64 位機器上自己就能保證 64 位對齊,因此按照 64 位對齊來取數據,拿到 state1[0], state1[1] 自己就是64 位對齊的。可是 32 位機器上並不能保證 64 位對齊,由於 32 位機器是 4 字節對齊,若是也按照 64 位機器取 state[0],state[1] 就有可能會形成 atmoic 的使用錯誤。

因而 32 位機器上空出第一個 32 位,也就使後面 64 位自然知足 64 位對齊,第一個 32 位放入 sema 恰好合適。早期 WaitGroup 的實現 sema 是和 state1 分開的,也就形成了使用 WaitGroup 就會形成 4 個字節浪費,不過 go1.11 以後就是如今的結構了。

爲何流程圖裏缺乏了 Done ?

其實並非,是由於 Done 的實現就是 Add. 只不過咱們常規用法 wg.Add(1) 是加 1 ,wg.Done() 是減 1,即 wg.Done() 能夠用 wg.Add(-1) 來代替。儘管咱們知道 wg.Add 能夠傳遞負數當 wg.Done  使用,可是仍是別這麼用。

退出waitgroup的條件

其實就一個條件, WaitGroup.counter 等於 0

平常開發中特殊需求

1. 控制超時/錯誤控制

雖然說 WaitGroup 可以讓主 Goroutine 等待子 Goroutine 退出,可是 WaitGroup 遇到一些特殊的需求,如:超時,錯誤控制,並不能很好的知足,須要作一些特殊的處理。

用戶在電商平臺中購買某個貨物,爲了計算用戶能優惠的金額,須要去獲取 A 系統(權益系統),B 系統(角色系統),C 系統(商品系統),D 系統(xx系統)。爲了提升程序性能,可能會同時發起多個 Goroutine 去訪問這些系統,必然會使用 WaitGroup 等待數據的返回,可是存在一些問題:

  1. 當某個系統發生錯誤,等待的 Goroutine 如何感知這些錯誤?

  2. 當某個系統響應過慢,等待的 Goroutine 如何控制訪問超時?

這些問題都是直接使用 WaitGroup 無法處理的。若是直接使用 channel 配合 WaitGroup 來控制超時和錯誤返回的話,封裝起來並不簡單,並且還容易出錯。咱們能夠採用 ErrGroup 來代替 WaitGroup。

有關 ErrGroup 的用法這裏就再也不闡述。golang.org/x/sync/errgroup

package main

import (
 "context"
 "fmt"
 "golang.org/x/sync/errgroup"
 "time"
)

func main() {
 ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
 defer cancel()
 errGroup, newCtx := errgroup.WithContext(ctx)

 done := make(chan struct{})
 go func() {
  for i := 0; i < 10; i++ {
   errGroup.Go(func() error {
    time.Sleep(time.Second * 10)
    return nil
   })
  }
  if err := errGroup.Wait(); err != nil {
   fmt.Printf("do err:%v\n", err)
   return
  }
  done <- struct{}{}
 }()

 select {
 case <-newCtx.Done():
  fmt.Printf("err:%v ", newCtx.Err())
  return
 case <-done:
 }
 fmt.Println("success")
}

2. 控制 Goroutine 數量

場景模擬:大概有 2000 - 3000 萬個數據須要處理,根據對服務器的測試,當啓動 200 個 Goroutine 處理時性能最佳。如何控制?

遇到諸如此類的問題時,單純使用 WaitGroup 是不行的。既要保證全部的數據都能被處理,同時也要保證同時最多隻有 200 個 Goroutine。這種問題須要 WaitGroup 配合 Channel 一塊使用。

package main

import (
 "fmt"
 "sync"
 "time"
)

func main() {
 var wg = sync.WaitGroup{}
 manyDataList := []int{12345678910}
 ch := make(chan bool3)
 for _, v := range manyDataList {
  wg.Add(1)
  go func(data int) {
   defer wg.Done()

   ch <- true
   fmt.Printf("go func: %d, time: %d\n", data, time.Now().Unix())
   time.Sleep(time.Second)
   <-ch
  }(v)
 }
 wg.Wait()
}

使用注意點

使用 WaitGroup 一樣不能被複制。具體例子就再也不分析了。具體分析過程能夠參見《這多是最容易理解的 Go Mutex 源碼剖析》

WaitGroup 的剖析到這裏基本就結束了。有什麼想跟我交流的,歡迎評論區留言。


歡迎關注個人公衆號,隨時關注個人動

本文分享自微信公衆號 - HHFCodeRv(hhfcodearts)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索