圖解Go裏面的WaitGroup瞭解編程語言核心實現源碼

1. 基礎築基

sync.WaitGroup裏面的實現邏輯其實蠻簡單的,在看過以前的sync.Mutex和synx.RWMutex以後,閱讀起來應該很是簡單,而惟一有差別的其實就是sync.WaitGroup裏面的state1html

1.1 等待機制

image.png sync.WaitGroup主要用於等待一組goroutine退出,本質上其實就是一個計數器,咱們能夠經過Add指定咱們須要等待退出的goroutine的數量,而後經過Done來遞減,若是爲0,則能夠退出git

1.2 內存對齊

image.png 內存對齊是一個比較大的話題,其核心機制是編譯器根據結構體內部元素的size結合平臺和編譯器自身的規則來進行補位, 而在sync.WaitGroup裏面就有用到,也是我感受可能在WaitGroup全部實現的核心特性裏面最重要的一條了github

在WaitGroup裏面只有state1 [3]uint32這一個元素,經過類型咱們能夠計算uint32是4個字節,長度3的數組總長度12,其實以前這個地方是[12]byte, 切換uint32是go語言裏面爲了讓底層的編譯器保證按照4個字節對齊而作的切換數組

1.3 8字節

image.png 8字節即兩個4字節,也就是兩個uint32的長度,實際上也是一個uint64的長度,在sync.WaitGroup裏面經過uint64來進行等待數量的計數緩存

這裏有一個相對比較hack的問題,我翻閱過不少文章,都沒有找到能讓我徹底信服的答案,接下來就是我本身的臆測了bash

1.4 8字節的臆測

首先go語言須要兼容32位和64位平臺,可是在32位平臺上對64字節的uint操做可能不是原子的,好比在讀取一個字長度的時候,另一個字的數據頗有可能已經發生改變了(在32位操做系統上,字長是4,而uint64長度爲8), 因此在實際計數的時候,其實sync.WaitGroup也就使用了4個字節來進行ide

image.png 在cpu內有一個cache line的緩存,這個緩存一般是8個字節的長度,在intel的cpu中,會保證針對一個cache line的操做是原子,若是隻有8個字節頗有可能會出現上面的這種狀況,即垮了兩個cache line, 這樣不管是在原子操做仍是性能上可能都會有問題源碼分析

1.5 測試8字節指針

我這裏簡單構造了一個8字節的長度指針,來作演示,經過讀取底層數組的指針和偏移指針(state1數組的第2個元素即index=1)的地址,能夠驗證猜測即在通過編譯器進行內存分配對齊以後,若是當前元素的指針的地址不能爲8整除,則其第地址+4的地址,能夠被8整除(這裏感受更多的是在編譯器層才能看到真正的東西,而我對編譯器自己並不感興趣,因此我只須要一個證實,能夠驗證結果便可)性能

import (
	"unsafe"
)

type a struct {
	b byte
}

type w struct {
	state1 [3]uint32
}

func main() {
	b := a{}
	println(unsafe.Sizeof(b), uintptr(unsafe.Pointer(&b)), uintptr(unsafe.Pointer(&b))%8 == 0)
	wg := w{}
	println(unsafe.Sizeof(wg), uintptr(unsafe.Pointer(&wg.state1)), uintptr(unsafe.Pointer(&wg.state1))%8 == 0)
	println(unsafe.Sizeof(wg), uintptr(unsafe.Pointer(&wg.state1[1])), uintptr(unsafe.Pointer(&wg.state1[1]))%8 == 0)
}

輸出結果測試

1 824633919343 false
12 824633919356 false
12 824633919360 true

1.6 分段計數

image.png 在sync.WaitGroup中對上面的提到的8字節的uint64也是分段計數,即高位記錄須要等待 Done的數量,而低位記錄當前正在Wait等待結束的計數

2. 源碼速讀

image.png 1.核心原理就是經過以前說的64位的uint64來進行計數,採用高位記錄須要Done的數量,低位記錄Wait的數量 2.若是發現當前count>0則Wait的goroutine會進行排隊 3.任務完成後的goroutine則進行Done操做,直到count==0,則完成,就喚醒全部由於wait操做睡眠的goroutine

2.1 計數與信號量

image.png 就像基礎部分說的那樣,針對12字節的[3]uint32會根據當前指針的地址來進行計算,肯定採用哪一個分段進行計數和作爲信號量等待,詳細的說明上面已經提過,這裏只是根據採起的分段,而後將對應的分段轉換爲*uint64的指針和一個uint32的指針就能夠了

func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
    if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
        return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
    } else {
        return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
    }
}

2.2 添加等待計數

func (wg *WaitGroup) Add(delta int) {
    // 獲取當前計數
    statep, semap := wg.state()
    if race.Enabled {
        _ = *statep // trigger nil deref early
        if delta < 0 {
            // Synchronize decrements with Wait.
            race.ReleaseMerge(unsafe.Pointer(wg))
        }
        race.Disable()
        defer race.Enable()
    }
    // 使用高32位進行counter計數
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32) // 獲取當前須要等待done的數量
    w := uint32(state) // 獲取低32位即waiter等待計數
    if race.Enabled && delta > 0 && v == int32(delta) {
        // The first increment must be synchronized with Wait.
        // Need to model this as a read, because there can be
        // several concurrent wg.counter transitions from 0.
        race.Read(unsafe.Pointer(semap))
    }
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // 若是當前v>0,則表示還須要繼續未完成的goroutine進行Done操做
    // 若是w ==0,則表示當前並無goroutine在wait等待結束
    // 以上兩種狀況直接返回便可
    if v > 0 || w == 0 {
        return
    }
    // 當waiters > 0 的時候,而且當前v==0,這個時候若是檢查發現state狀態先後發生改變,則
    // 證實當前有人修改過,則刪除
    // 若是走到這個地方則證實通過以前的操做後,當前的v==0,w!=0,就證實以前一輪的Done已經所有完成,如今須要喚醒全部在wait的goroutine
    // 此時若是發現當前的*statep值又發生了改變,則證實有有人進行了Add操做
    // 也就是這裏的WaitGroup濫用
    if *statep != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // 將當前state的狀態設置爲0,就能夠進行下次的重用了
    *statep = 0
    for ; w != 0; w-- {
        // 釋放全部排隊的waiter
        runtime_Semrelease(semap, false)
    }
}

2.2 Done完成一個等待事件

func (wg *WaitGroup) Done() {
    // 減去一個-1
    wg.Add(-1)
}

2.3 等待全部操做完成

func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()
    if race.Enabled {
        _ = *statep // trigger nil deref early
        race.Disable()
    }
    for {
        // 獲取state的狀態
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32) // 獲取高32位的count
        w := uint32(state) // 獲取當前正在Wait的數量
        if v == 0 { // 若是當前v ==0就直接return, 表示當前不須要等待
            // Counter is 0, no need to wait.
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
        // 進行低位的waiter計數統計
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            if race.Enabled && w == 0 {
                // Wait must be synchronized with the first Add.
                // Need to model this is as a write to race with the read in Add.
                // As a consequence, can do the write only for the first waiter,
                // otherwise concurrent Waits will race with each other.
                race.Write(unsafe.Pointer(semap))
            }
            // 若是成功則進行排隊休眠等待喚醒
            runtime_Semacquire(semap)
            // 若是喚醒後發現state的狀態不爲0,則證實在喚醒的過程當中WaitGroup又被重用,則panic
            if *statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
    }
}

參考文章

關於cpu cache line大小 原子操做

關注公告號閱讀更多源碼分析文章21天大棚

更多文章關注 www.sreguide.com


本篇文章由一文多發平臺ArtiPub自動發佈
相關文章
相關標籤/搜索