golang sync.WaitGroup 源碼分析

最近在學習golang源碼,學習golang源碼是學習golang的很是好的途徑。golang

先來記錄一波sync包的學習。版本 go1.14.2 darwin/amd64併發

sync.WaitGroup

咱們通常使用sync.waitGroup 作併發控制,使用方式通常以下函數

func main() {
    wg := sync.WaitGroup{}

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(index int) {
            defer wg.Done()
            fmt.Println(index)
        }(i)
    }
    
    wg.Wait()
}

因此咱們先看下waitGroup 主要有是哪一個函數,Add, Done, Wait 函數,在看函數源碼前,咱們先看下WaitGroup 結構性能

type WaitGroup struct {
    noCopy noCopy          //禁止拷貝,若是有拷貝構建不會報錯,能夠用go vet 或 go tool vet 檢測是否有拷貝錯誤,
    state1 [3]uint32      //重要,存有 計數器,等待數,信號量的值
}

其中 state1 成員存放的值在64 位系統中以下:學習

<img src="/Users/guanjingyun/Library/Application Support/typora-user-images/image-20201226143123260.png" alt="image-20201226143123260" style="zoom:33%;" />ui

接下來看下Add方法atom

func (wg *WaitGroup) Add(delta int) {
  statep, semap := wg.state()   //獲取wg狀態,statep地址(高32位的計數器值和低32位的等待數量值), semap 信號量
    if race.Enabled {  //數據競態檢測,默認是false,開啓消耗cpu性能 ,先無論
        _ = *statep
        if delta < 0 { 
            race.ReleaseMerge(unsafe.Pointer(wg))
        }
        race.Disable()
        defer race.Enable()
    }
    state := atomic.AddUint64(statep, uint64(delta)<<32) //原子操做給計數器加上delta的值
    v := int32(state >> 32)     //高32位 計數器
    w := uint32(state)   //低32位 等待數
    if race.Enabled && delta > 0 && v == int32(delta) {  //數據競態檢測,先無論
        race.Read(unsafe.Pointer(semap))
    }
    if v < 0 {                                  //計數器 <0 
        panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {    //在add以前,已經調用過wait函數??(沒太看明白)
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 { // wait數 爲0 或 計數>0 直接返回
        return
    }

    if *statep != state {   // ? 理論上應該相等
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    
    *statep = 0
    for ; w != 0; w-- {  //計數器 爲0, 釋放全部wait 信號量,
        runtime_Semrelease(semap, false, 0)
    }
}

Done函數, 很簡單了:code

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

Wait函數, 把數據競態檢測 部分去了,爲代碼看起來簡潔協程

func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()  //獲取wg狀態,statep地址(高32位的計數器值和低32位的等待數量值), semap 信號量
    for {
        state := atomic.LoadUint64(statep) 
        v := int32(state >> 32)  //如上
        w := uint32(state)
        if v == 0 {  //計數爲0,不用wait
            return
        }

        if atomic.CompareAndSwapUint64(statep, state, state+1) { //等待數加1
            runtime_Semacquire(semap)    //獲取信號量
            if *statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            return
        }
    }
}

runtime_Semacquire 和 runtime_Semrelease 獲取和釋放信號量,用於休眠和喚醒 協程,具體實現等下次深刻研究下rem

相關文章
相關標籤/搜索