go語言sync庫中的WaitGroup是用於等待一個協程或者一組攜程。使用Add函數增長計數器,使用Done函數減小計數器。當使用Wait函數等待計數器歸零以後則喚醒主攜程。須要注意的是:函數
type WaitGroup struct { noCopy noCopy // 位值:高32位是計數器,低32位是goroution等待計數。 state1 [12]byte // 信號量,用於喚醒goroution sema uint32 } func (wg *WaitGroup) state() *uint64 { if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { return (*uint64)(unsafe.Pointer(&wg.state1)) } else { return (*uint64)(unsafe.Pointer(&wg.state1[4])) } }
func (wg *WaitGroup) Add(delta int) { // 獲取狀態碼 statep := wg.state() if race.Enabled { _ = *statep // trigger nil deref early if delta < 0 { // Synchronize decrements with Wait. race.ReleaseMerge(unsafe.Pointer(wg)) } race.Disable() defer race.Enable() } // 把傳入的delta用原子操做加入到statep, state := atomic.AddUint64(statep, uint64(delta)<<32) // 獲取計數器數值 v := int32(state >> 32) // 獲取等待數量 w := uint32(state) if race.Enabled && delta > 0 && v == int32(delta) { // The first increment must be synchronized with Wait. // Need to model this as a read, because there can be // several concurrent wg.counter transitions from 0. race.Read(unsafe.Pointer(&wg.sema)) } // 計數器小於0 報錯 if v < 0 { panic("sync: negative WaitGroup counter") } if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 若是等待爲0或者計數器大於0 意味着沒有等待或者還有讀鎖 不須要喚醒goroutine則返回 add操做完畢 if v > 0 || w == 0 { return } if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // // 喚醒全部等待的線程 for ; w != 0; w-- { runtime_Semrelease(&wg.sema, false) } } // Done函數 調用了Add函數傳入-1 至關於鎖的數量減1 func (wg *WaitGroup) Done() { wg.Add(-1) } func (wg *WaitGroup) Wait() { // 獲取waitGroup的狀態碼 statep := wg.state() if race.Enabled { _ = *statep // trigger nil deref early race.Disable() } // 循環 for { // 調用load獲取狀態 state := atomic.LoadUint64(statep) // 獲取計數器數值 v := int32(state >> 32) // 獲取等待數量 w := uint32(state) if v == 0 { // Counter is 0, no need to wait. if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } // 添加等待數量 若是cas失敗則從新獲取狀態 避免計數有錯 if atomic.CompareAndSwapUint64(statep, state, state+1) { if race.Enabled && w == 0 { // Wait must be synchronized with the first Add. // Need to model this is as a write to race with the read in Add. // As a consequence, can do the write only for the first waiter, // otherwise concurrent Waits will race with each other. race.Write(unsafe.Pointer(&wg.sema)) } // 阻塞goroutine 等待喚醒 runtime_Semacquire(&wg.sema) if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } } }