sync.WaitGroup裏面的實現邏輯其實蠻簡單的,在看過以前的sync.Mutex和synx.RWMutex以後,閱讀起來應該很是簡單,而惟一有差別的其實就是sync.WaitGroup裏面的state1html
sync.WaitGroup主要用於等待一組goroutine退出,本質上其實就是一個計數器,咱們能夠經過Add指定咱們須要等待退出的goroutine的數量,而後經過Done來遞減,若是爲0,則能夠退出git
內存對齊是一個比較大的話題,其核心機制是編譯器根據結構體內部元素的size結合平臺和編譯器自身的規則來進行補位, 而在sync.WaitGroup裏面就有用到,也是我感受可能在WaitGroup全部實現的核心特性裏面最重要的一條了github
在WaitGroup裏面只有state1 [3]uint32這一個元素,經過類型咱們能夠計算uint32是4個字節,長度3的數組總長度12,其實以前這個地方是[12]byte, 切換uint32是go語言裏面爲了讓底層的編譯器保證按照4個字節對齊而作的切換數組
8字節即兩個4字節,也就是兩個uint32的長度,實際上也是一個uint64的長度,在sync.WaitGroup裏面經過uint64來進行等待數量的計數緩存
這裏有一個相對比較hack的問題,我翻閱過不少文章,都沒有找到能讓我徹底信服的答案,接下來就是我本身的臆測了bash
首先go語言須要兼容32位和64位平臺,可是在32位平臺上對64字節的uint操做可能不是原子的,好比在讀取一個字長度的時候,另一個字的數據頗有可能已經發生改變了(在32位操做系統上,字長是4,而uint64長度爲8), 因此在實際計數的時候,其實sync.WaitGroup也就使用了4個字節來進行ide
在cpu內有一個cache line的緩存,這個緩存一般是8個字節的長度,在intel的cpu中,會保證針對一個cache line的操做是原子,若是隻有8個字節頗有可能會出現上面的這種狀況,即垮了兩個cache line, 這樣不管是在原子操做仍是性能上可能都會有問題源碼分析
我這裏簡單構造了一個8字節的長度指針,來作演示,經過讀取底層數組的指針和偏移指針(state1數組的第2個元素即index=1)的地址,能夠驗證猜測即在通過編譯器進行內存分配對齊以後,若是當前元素的指針的地址不能爲8整除,則其第地址+4的地址,能夠被8整除(這裏感受更多的是在編譯器層才能看到真正的東西,而我對編譯器自己並不感興趣,因此我只須要一個證實,能夠驗證結果便可)性能
import ( "unsafe" ) type a struct { b byte } type w struct { state1 [3]uint32 } func main() { b := a{} println(unsafe.Sizeof(b), uintptr(unsafe.Pointer(&b)), uintptr(unsafe.Pointer(&b))%8 == 0) wg := w{} println(unsafe.Sizeof(wg), uintptr(unsafe.Pointer(&wg.state1)), uintptr(unsafe.Pointer(&wg.state1))%8 == 0) println(unsafe.Sizeof(wg), uintptr(unsafe.Pointer(&wg.state1[1])), uintptr(unsafe.Pointer(&wg.state1[1]))%8 == 0) }
輸出結果測試
1 824633919343 false 12 824633919356 false 12 824633919360 true
在sync.WaitGroup中對上面的提到的8字節的uint64也是分段計數,即高位記錄須要等待 Done的數量,而低位記錄當前正在Wait等待結束的計數
1.核心原理就是經過以前說的64位的uint64來進行計數,採用高位記錄須要Done的數量,低位記錄Wait的數量 2.若是發現當前count>0則Wait的goroutine會進行排隊 3.任務完成後的goroutine則進行Done操做,直到count==0,則完成,就喚醒全部由於wait操做睡眠的goroutine
就像基礎部分說的那樣,針對12字節的[3]uint32會根據當前指針的地址來進行計算,肯定採用哪一個分段進行計數和作爲信號量等待,詳細的說明上面已經提過,這裏只是根據採起的分段,而後將對應的分段轉換爲*uint64的指針和一個uint32的指針就能夠了
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2] } else { return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0] } }
func (wg *WaitGroup) Add(delta int) { // 獲取當前計數 statep, semap := wg.state() if race.Enabled { _ = *statep // trigger nil deref early if delta < 0 { // Synchronize decrements with Wait. race.ReleaseMerge(unsafe.Pointer(wg)) } race.Disable() defer race.Enable() } // 使用高32位進行counter計數 state := atomic.AddUint64(statep, uint64(delta)<<32) v := int32(state >> 32) // 獲取當前須要等待done的數量 w := uint32(state) // 獲取低32位即waiter等待計數 if race.Enabled && delta > 0 && v == int32(delta) { // The first increment must be synchronized with Wait. // Need to model this as a read, because there can be // several concurrent wg.counter transitions from 0. race.Read(unsafe.Pointer(semap)) } if v < 0 { panic("sync: negative WaitGroup counter") } if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 若是當前v>0,則表示還須要繼續未完成的goroutine進行Done操做 // 若是w ==0,則表示當前並無goroutine在wait等待結束 // 以上兩種狀況直接返回便可 if v > 0 || w == 0 { return } // 當waiters > 0 的時候,而且當前v==0,這個時候若是檢查發現state狀態先後發生改變,則 // 證實當前有人修改過,則刪除 // 若是走到這個地方則證實通過以前的操做後,當前的v==0,w!=0,就證實以前一輪的Done已經所有完成,如今須要喚醒全部在wait的goroutine // 此時若是發現當前的*statep值又發生了改變,則證實有有人進行了Add操做 // 也就是這裏的WaitGroup濫用 if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 將當前state的狀態設置爲0,就能夠進行下次的重用了 *statep = 0 for ; w != 0; w-- { // 釋放全部排隊的waiter runtime_Semrelease(semap, false) } }
func (wg *WaitGroup) Done() { // 減去一個-1 wg.Add(-1) }
func (wg *WaitGroup) Wait() { statep, semap := wg.state() if race.Enabled { _ = *statep // trigger nil deref early race.Disable() } for { // 獲取state的狀態 state := atomic.LoadUint64(statep) v := int32(state >> 32) // 獲取高32位的count w := uint32(state) // 獲取當前正在Wait的數量 if v == 0 { // 若是當前v ==0就直接return, 表示當前不須要等待 // Counter is 0, no need to wait. if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } // 進行低位的waiter計數統計 if atomic.CompareAndSwapUint64(statep, state, state+1) { if race.Enabled && w == 0 { // Wait must be synchronized with the first Add. // Need to model this is as a write to race with the read in Add. // As a consequence, can do the write only for the first waiter, // otherwise concurrent Waits will race with each other. race.Write(unsafe.Pointer(semap)) } // 若是成功則進行排隊休眠等待喚醒 runtime_Semacquire(semap) // 若是喚醒後發現state的狀態不爲0,則證實在喚醒的過程當中WaitGroup又被重用,則panic if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } } }
關注公告號閱讀更多源碼分析文章
更多文章關注 www.sreguide.com