原文連接:源碼剖析sync.WaitGroup(文末思考題你能解釋一下嗎?)java
哈嘍,你們好,我是asong
,這是我併發編程系列的第三篇文章,上一篇咱們一塊兒分析了sync.once
的使用與實現,今天咱們一塊兒來看一看sync.WaitGroup
的使用與實現.快過年了,這是年前最後一篇推文了,待我積累一下,年後加大力度寫乾貨,在這裏先預祝你們新春快樂,身體健康,萬事如意!golang
sync.WaitGroup
官方文檔對sync.WatiGroup
的描述是:一個waitGroup
對象能夠等待一組協程結束,也就等待一組goroutine
返回。有了sync.Waitgroup
咱們能夠將本來順序執行的代碼在多個Goroutine
中併發執行,加快程序處理的速度。其實他與java
中的CountdownLatch
,阻塞等待全部任務完成以後再繼續執行。咱們來看官網給的一個例子,這個例子使用waitGroup
阻塞主進程,併發獲取多個URL
,直到完成全部獲取:面試
package main import ( "sync" ) type httpPkg struct{} func (httpPkg) Get(url string) {} var http httpPkg func main() { var wg sync.WaitGroup var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", } for _, url := range urls { // Increment the WaitGroup counter. wg.Add(1) // Launch a goroutine to fetch the URL. go func(url string) { // Decrement the counter when the goroutine completes. defer wg.Done() // Fetch the URL. http.Get(url) }(url) } // Wait for all HTTP fetches to complete. wg.Wait() }
首先咱們須要聲明一個sync.WaitGroup
對象,在主gorourine
調用Add()
方法設置要等待的goroutine
數量,每個Goroutine
在運行結束時要調用Done()
方法,同時使用Wait()
方法進行阻塞直到全部的goroutine
完成。算法
sync.waitGroup
咱們在平常開發中爲了提升接口響應時間,有一些場景須要在多個goroutine
中作一些互不影響的業務,這樣能夠節省很多時間,可是須要協調多個goroutine
,沒有sync.WaitGroup
的時候,咱們可使用通道來解決這個問題,咱們把主Goroutine
當成銅鑼扛把子a song,把每個Goroutine
當成一個馬仔,asong
管理這些馬仔,讓這些馬仔去收保護費,我今天派10個馬仔去收保護費,每個馬仔收好了保護費就在帳本上打一個✅,當全部馬仔都收好了保護費,帳本上就被打滿了✅,活全被幹完了,很出色,而後酒吧走起,浪一浪,全場的消費鬆公子買單,寫成代碼能夠這樣表示:編程
func exampleImplWaitGroup() { done := make(chan struct{}) // 收10份保護費 count := 10 // 10個馬仔 for i:=0;i < count;i++{ go func(i int) { defer func() { done <- struct {}{} }() fmt.Printf("馬仔%d號收保護費\n",i) }(i) } for i:=0;i< count;i++{ <- done fmt.Printf("馬仔%d號已經收完保護費\n",i) } fmt.Println("全部馬仔已經幹完活了,開始酒吧消費~") }
雖然這樣能夠實現,可是咱們每次使用都要保證主Goroutine
最後從通道接收的次數須要與以前其餘的Goroutine
發送元素的次數相同,實現起來不夠優雅,在這種場景下咱們就能夠選用sync.WaitGroup
來幫助咱們實現同步。segmentfault
前面咱們已經知道sync.waitGroup
的基本使用了,接下來咱們就一塊兒看看他是怎樣實現的~,只有知其因此然,才能寫出更健壯的代碼。設計模式
Go version: 1.15.3數組
首先咱們看一下sync.WaitGroup
的結構:緩存
// A WaitGroup must not be copied after first use. type WaitGroup struct { noCopy noCopy // 64-bit value: high 32 bits are counter, low 32 bits are waiter count. // 64-bit atomic operations require 64-bit alignment, but 32-bit // compilers do not ensure it. So we allocate 12 bytes and then use // the aligned 8 bytes in them as state, and the other 4 as storage // for the sema. state1 [3]uint32 }
總共就有兩個字段,nocopy
是爲了保證該結構不會被進行拷貝,這是一種保護機制,會在後面進行介紹;state1
主要是存儲着狀態和信號量,這裏使用的8字節對齊處理的方式頗有意思,我先來一塊兒看看這種處理。安全
state1
狀態和信號量處理state1
這裏總共被分配了12
個字節,這裏被設計了三種狀態:
8
個字節做爲狀態,高32
位爲計數的數量,低32
位爲等待的goroutine
數量4
個字節做爲信號量存儲提供了(wg *WaitGroup) state() (statep *uint64, semap *uint32)
幫助咱們從state1
字段中取出他的狀態和信號量,爲何要這樣設計呢?
咱們在分析atomic
和Go看源碼必會知識之unsafe包有說到過,64位原子操做須要64位對齊,可是32位編譯器不能保證這一點,因此爲了保證waitGroup
在32
位平臺上使用的話,就必須保證在任什麼時候候,64位
操做不會報錯。因此也就不能分紅兩個字段來寫,考慮到字段順序不一樣、平臺不一樣,內存對齊也就不一樣。所以這裏採用動態識別當前咱們操做的64
位數究竟是不是在8
字節對齊的位置上面,咱們來分析一下state
方法:
// state returns pointers to the state and sema fields stored within wg.state1. func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2] } else { return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0] } }
當數組的首地址是處於一個8
字節對齊的位置上時,那麼就將這個數組的前8
個字節做爲64
位值使用表示狀態,後4
個字節做爲32
位值表示信號量(semaphore
)。同理若是首地址沒有處於8
字節對齊的位置上時,那麼就將前4
個字節做爲semaphore
,後8
個字節做爲64
位數值。畫個圖表示一下:
Add()
、Done()
方法sync.WaitGroup
提供了Add()
方法增長一個計數器,Done()
方法減掉一個計數,Done()
方法實現比較簡單,內部調用的Add()
方法實現的計數器減一操做,也就是增減邏輯都在Add()
方法中,因此咱們重點看一下Add()
是如何實現的:
func (wg *WaitGroup) Add(delta int) { // 獲取狀態(Goroutine Counter 和 Waiter Counter)和信號量 statep, semap := wg.state() if race.Enabled { _ = *statep // trigger nil deref early if delta < 0 { // Synchronize decrements with Wait. race.ReleaseMerge(unsafe.Pointer(wg)) } race.Disable() defer race.Enable() } // 原子操做,goroutine counter累加delta state := atomic.AddUint64(statep, uint64(delta)<<32) // 獲取當前goroutine counter的值(高32位) v := int32(state >> 32) // 獲取當前waiter counter的值(低32位) w := uint32(state) if race.Enabled && delta > 0 && v == int32(delta) { // The first increment must be synchronized with Wait. // Need to model this as a read, because there can be // several concurrent wg.counter transitions from 0. race.Read(unsafe.Pointer(semap)) } // Goroutine counter是不容許爲負數的,不然會發生panic if v < 0 { panic("sync: negative WaitGroup counter") } // 當wait的Goroutine不爲0時,累加後的counter值和delta相等,說明Add()和Wait()同時調用了,因此發生panic,由於正確的作法是先Add()後Wait(),也就是已經調用了wait()就不容許再添加任務了 if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 正常`Add()`方法後,`goroutine Counter`計數器大於0或者`waiter Counter`計數器等於0時,不須要釋放信號量 if v > 0 || w == 0 { return } // 能走到這裏說明當前Goroutine Counter計數器爲0,Waiter Counter計數器大於0, 到這裏數據也就是容許發生變更了,若是發生變更了,則出發panic if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 重置狀態,併發出信號量告訴wait全部任務已經完成 *statep = 0 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) } }
上面的代碼有一部分是race
靜態檢測,下面的分析會省略這一部分,由於它並非本文的重點。
註釋我都添加到對應的代碼行上了,你是否都看懂了,沒看懂沒關係,由於Add()
是與Wait()
方法一塊使用的,因此有些邏輯與wait()
裏的邏輯是相互照應的,因此當咱們看完wait()
方法的實如今總結一下大家就明白了。
Wait()
方法sync.Wait()
方法會阻塞主Goroutine
直到WaitGroup
計數器變爲0。咱們一塊兒來看一下Wait()
方法的源碼:
// Wait blocks until the WaitGroup counter is zero. func (wg *WaitGroup) Wait() { // 獲取狀態(Goroutine Counter 和 Waiter Counter)和信號量 statep, semap := wg.state() if race.Enabled { _ = *statep // trigger nil deref early race.Disable() } for { // 使用原子操做讀取state,是爲了保證Add中的寫入操做已經完成 state := atomic.LoadUint64(statep) // 獲取當前goroutine counter的值(高32位) v := int32(state >> 32) // 獲取當前waiter counter的值(低32位) w := uint32(state) // 若是沒有任務,或者任務已經在調用`wait`方法前已經執行完成了,就不用阻塞了 if v == 0 { // Counter is 0, no need to wait. if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } // 使用CAS操做對`waiter Counter`計數器進行+1操做,外面有for循環保證這裏能夠進行重試操做 if atomic.CompareAndSwapUint64(statep, state, state+1) { if race.Enabled && w == 0 { // Wait must be synchronized with the first Add. // Need to model this is as a write to race with the read in Add. // As a consequence, can do the write only for the first waiter, // otherwise concurrent Waits will race with each other. race.Write(unsafe.Pointer(semap)) } // 在這裏獲取信號量,使線程進入睡眠狀態,與Add方法中最後的增長信號量相對應,也就是當最後一個任務調用Done方法 // 後會調用Add方法對goroutine counter的值減到0,就會走到最後的增長信號量 runtime_Semacquire(semap) // 在Add方法中增長信號量時已經將statep的值設爲0了,若是這裏不是0,說明在wait以後又調用了Add方法,使用時機不對,觸發panic if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } } }
分了源碼,咱們能夠總結以下:
Add
方法與wait
方法不能夠併發同時調用,Add
方法要在wait
方法以前調用.Add()
設置的值必須與實際等待的goroutine
個數一致,不然會panic
.wait
方法後,必需要在wait
方法返回之後才能再次從新使用waitGroup
,也就是Wait
沒有返回以前不要在調用Add
方法,不然會發生Panic
.Done
只是對Add
方法的簡單封裝,咱們能夠向 Add
方法傳入任意負數(須要保證計數器非負)快速將計數器歸零以喚醒等待的 Goroutine
.waitGroup
對象只能有一份,不能夠拷貝給其餘變量,不然會形成意想不到的Bug
.no copy
機制在前文看waitGroup
結構時,有一個nocopy
字段,爲何要有nocopy
呢?咱們先看這樣一個例子:
type User struct { Name string Info *Info } type Info struct { Age int Number int } func main() { u := User{ Name: "asong", Info: &Info{ Age: 10, Number: 24, }, } u1 := u u1.Name = "Golang夢工廠" u1.Info.Age = 30 fmt.Println(u.Info.Age,u.Name) fmt.Println(u1.Info.Age,u1.Name) } // 運行結果 30 asong 30 Golang夢工廠
結構體User
中有兩個字段Name
和Info
,Name
是String
類型,Info
是指向結構體Info
的指針類型,咱們首先聲明瞭一個u
變量,對他進行復制拷貝獲得變量u1
,在u1
中對兩個字段進行改變,能夠看到Info
字段發生了更改,而Name
就沒發生更改,這就引起了安全問題,若是結構體對象包含指針字段,當該對象被拷貝時,會使得兩個對象中的指針字段變得再也不安全。
Go
語言中提供了兩種copy
檢查,一種是在運行時進行檢查,一種是經過靜態檢查。不過運行檢查是比較影響程序的執行性能的,Go官方目前只提供了strings.Builder和sync.Cond的runtime拷貝檢查機制,對於其餘須要nocopy對象類型來講,使用go vet工具來作靜態編譯檢查。運行檢查的實現能夠經過比較所屬對象是否發生變動就能夠判斷,而靜態檢查是提供了一個nocopy
對象,只要是該對象或對象中存在nocopy
字段,他就實現了sync.Locker
接口, 它擁有Lock()和Unlock()方法,以後,能夠經過go vet功能,來檢查代碼中該對象是否有被copy。
在文章的最後總結一下使用waitGroup
易錯的知識點,防止你們再次犯錯。
waitGroup
中計數器的值是不能小於0的,源碼中咱們就能夠看到,一旦小於0就會引起panic。Add
方法與Wait
方法的順序,不可併發同時調用這兩個方法,不然就會引起panic,同時在調用了wait
方法在其沒有釋放前不要再次調用Add
方法,這樣也會引起panic
,waitGroup
是能夠複用的,可是須要保證其計數週期的完整性。WaitGroup
對象不是一個引用類型,經過函數傳值的時候須要使用地址,由於Go
語言只有值傳遞,傳遞WaitGroup
是值的話,就會致使會發生panic
,看這樣一個例子:func main() { wg := sync.WaitGroup{} wg.Add(1) doDeadLock(wg) wg.Wait() } func doDeadLock(wg sync.WaitGroup) { defer wg.Done() fmt.Println("do something") } //運行結果:panic: sync: negative WaitGroup counter
發生這個問題的緣由就是在doDeadLock()
方法中wg
是一個新對象,直接調用Done
方法,計數器就會出現負數,因此引起panic
,爲了安全起見,對於這種傳結構體的場景通常建議都傳指針就行了,基本能夠避免一些問題。
panic
,很重要的一點,也是很容易出錯的地方。最後給你們出一個思考題,下面這段代碼會不會發生panic
:
func main() { wg := sync.WaitGroup{} wg.Add(100) for i := 0; i < 100; i++ { go func() { defer wg.Done() fmt.Println(i) }() } wg.Wait() }
在最後,祝你們新年快樂,心想事成,萬事如意~~~
好啦,這篇文章就到這裏啦,素質三連(分享、點贊、在看)都是筆者持續創做更多優質內容的動力!
建立了一個Golang學習交流羣,歡迎各位大佬們踊躍入羣,咱們一塊兒學習交流。入羣方式:加我vx拉你入羣,或者公衆號獲取入羣二維碼
結尾給你們發一個小福利吧,最近我在看[微服務架構設計模式]這一本書,講的很好,本身也收集了一本PDF,有須要的小夥能夠到自行下載。獲取方式:關注公衆號:[Golang夢工廠],後臺回覆:[微服務],便可獲取。
我翻譯了一份GIN中文文檔,會按期進行維護,有須要的小夥伴後臺回覆[gin]便可下載。
翻譯了一份Machinery中文文檔,會按期進行維護,有須要的小夥伴們後臺回覆[machinery]便可獲取。
我是asong,一名普普統統的程序猿,讓咱們一塊兒慢慢變強吧。歡迎各位的關注,咱們下期見~~~
推薦往期文章: