針對Golang 1.9的sync.WaitGroup進行分析,與Golang 1.10基本同樣除了將panic
改成了throw
以外其餘的都同樣。
源代碼位置:sync\waitgroup.go
。golang
type WaitGroup struct { noCopy noCopy // noCopy能夠嵌入到結構中,在第一次使用後不可複製,使用go vet做爲檢測使用 // 位值:高32位是計數器,低32位是goroution等待計數。 // 64位的原子操做須要64位的對齊,可是32位。編譯器不能確保它,因此分配了12個byte對齊的8個byte做爲狀態。 state1 [12]byte // byte=uint8範圍:0~255,只取前8個元素。轉爲2進制:0000 0000,0000 0000... ...0000 0000 sema uint32 // 信號量,用於喚醒goroution }
不知道你們是否和我同樣,不管是使用Java的CountDownLatch仍是Golang的WaitGroup,都會疑問,能夠裝下多個線程|協程等待呢?看了源碼後能夠回答了,能夠裝下算法
1111 1111 1111 ... 1111 \________32___________/
2^32個辣麼多!因此不須要擔憂單機狀況下會被撐爆了。數組
如下代碼已經去掉了與核心代碼無關的race代碼。併發
添加或者減小等待goroutine的數量。函數
添加的delta,多是負的,到WaitGroup計數器。ui
func (wg *WaitGroup) Add(delta int) { // 獲取到wg.state1數組中元素組成的二進制對應的十進制的值 statep := wg.state() // 高32位是計數器 state := atomic.AddUint64(statep, uint64(delta)<<32) // 獲取計數器 v := int32(state >> 32) w := uint32(state) // 計數器爲負數,報panic if v < 0 { panic("sync: negative WaitGroup counter") } // 添加與等待併發調用,報panic if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 計數器添加成功 if v > 0 || w == 0 { return } // 當等待計數器> 0時,而goroutine設置爲0。 // 此時不可能有同時發生的狀態突變: // - 增長不能與等待同時發生, // - 若是計數器counter == 0,再也不增長等待計數器 if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // Reset waiters count to 0. *statep = 0 for ; w != 0; w-- { // 目的是做爲一個簡單的wakeup原語,以供同步使用。true爲喚醒排在等待隊列的第一個goroutine runtime_Semrelease(&wg.sema, false) } }
// unsafe.Pointer其實就是相似C的void *,在golang中是用於各類指針相互轉換的橋樑。 // uintptr是golang的內置類型,是能存儲指針的整型,uintptr的底層類型是int,它和unsafe.Pointer可相互轉換。 // uintptr和unsafe.Pointer的區別就是:unsafe.Pointer只是單純的通用指針類型,用於轉換不一樣類型指針,它不能夠參與指針運算; // 而uintptr是用於指針運算的,GC 不把 uintptr 當指針,也就是說 uintptr 沒法持有對象,uintptr類型的目標會被回收。 // state()函數能夠獲取到wg.state1數組中元素組成的二進制對應的十進制的值 func (wg *WaitGroup) state() *uint64 { if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { return (*uint64)(unsafe.Pointer(&wg.state1)) } else { return (*uint64)(unsafe.Pointer(&wg.state1[4])) } }
至關於Add(-1)。atom
func (wg *WaitGroup) Done() { // 計數器減一 wg.Add(-1) }
執行阻塞,直到全部的WaitGroup數量變成0。線程
func (wg *WaitGroup) Wait() { // 獲取到wg.state1數組中元素組成的二進制對應的十進制的值 statep := wg.state() // cas算法 for { state := atomic.LoadUint64(statep) // 高32位是計數器 v := int32(state >> 32) w := uint32(state) // 計數器爲0,結束等待 if v == 0 { // Counter is 0, no need to wait. return } // 增長等待goroution計數,對低32位加1,不須要移位 if atomic.CompareAndSwapUint64(statep, state, state+1) { // 目的是做爲一個簡單的sleep原語,以供同步使用 runtime_Semacquire(&wg.sema) if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } return } } }