go中waitGroup源碼解讀

waitGroup源碼刨銑

前言

學習下waitGroup的實現html

本文是在go version go1.13.15 darwin/amd64上進行的golang

WaitGroup實現

看一個小demo編程

func waitGroup() {
	var wg sync.WaitGroup

	wg.Add(4)
	go func() {
		defer wg.Done()
		fmt.Println(1)
	}()

	go func() {
		defer wg.Done()
		fmt.Println(2)
	}()

	go func() {
		defer wg.Done()
		fmt.Println(3)
	}()

	go func() {
		defer wg.Done()
		fmt.Println(4)
	}()

	wg.Wait()
	fmt.Println("1 2 3 4 end")
}

一、啓動goroutine前將計數器經過Add(4)將計數器設置爲待啓動的goroutine個數。數組

二、啓動goroutine後,使用Wait()方法阻塞主協程,等待計數器變爲0。併發

三、每一個goroutine執行結束經過Done()方法將計數器減1。函數

四、計數器變爲0後,阻塞的goroutine被喚醒。佈局

看下具體的實現性能

// WaitGroup 不能被copy
type WaitGroup struct {
	noCopy noCopy

	state1 [3]uint32
}

noCopy

意思就是不讓copy,是如何實現的呢?學習

Go中沒有原生的禁止拷貝的方式,因此若是有的結構體,你但願使用者沒法拷貝,只能指針傳遞保證全局惟一的話,能夠這麼幹,定義 一個結構體叫 noCopy,實現以下的接口,而後嵌入到你想要禁止拷貝的結構體中,這樣go vet就能檢測出來。測試

// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
type noCopy struct{}

// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}

測試下

type noCopy struct{}

func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}

type Person struct {
	noCopy noCopy
	name   string
}

// go中的函數傳參都是值拷貝
func test(person Person) {
	fmt.Println(person)
}

func main() {
	var person Person
	test(person)
}

go vet main.go

$ go vet main.go
# command-line-arguments
./main.go:18:18: test passes lock by value: command-line-arguments.Person contains command-line-arguments.noCopy
./main.go:19:14: call of fmt.Println copies lock value: command-line-arguments.Person contains command-line-arguments.noCopy
./main.go:24:7: call of test copies lock value: command-line-arguments.Person contains command-line-arguments.noCopy

使用vet檢測到了不能copy的錯誤

state1

// 64 位值: 高 32 位用於計數,低 32 位用於等待計數
	// 64 位的原子操做要求 64 位對齊,但 32 位編譯器沒法保證這個要求
	// 所以分配 12 字節而後將他們對齊,其中 8 字節做爲狀態,其餘 4 字節用於存儲原語
	state1 [3]uint32

這點是wait_group很巧妙的一點,大神寫代碼的思路就是驚奇

這個設計很奇妙,經過內存對齊來處理wait_group中的waiter數、計數值、信號量。什麼是內存對齊可參考什麼是內存對齊,go中內存對齊分析

來分析下state1是如何內存對齊來處理幾個計數值的存儲

計算機爲了加快內存的訪問速度,會對內存進行對齊處理,CPU把內存看成一塊一塊的,塊的大小能夠是二、四、八、16字節大小,所以CPU讀取內存是一塊一塊讀取的。

合理的內存對齊能夠提升內存讀寫的性能,而且便於實現變量操做的原子性。

在不一樣平臺上的編譯器都有本身默認的 「對齊係數」,可經過預編譯命令#pragma pack(n)進行變動,n就是代指 「對齊係數」。

通常來說,咱們經常使用的平臺的係數以下:

  • 32 位:4

  • 64 位:8

state1這塊就兼容了兩種平臺的對齊係數

對於64未系統來說。內存訪問的步長是8。也就是cpu一次訪問8位偏移量的內存空間。當時對於32未的系統,內存的對齊係數是4,也就是訪問的步長是4個偏移量。

因此爲了兼容這兩種模式,這裏採用了uint32結構的數組,保證在不一樣類型的機器上都是12個字節,一個uint32是4字節。這樣對於32位的4步長訪問是沒有問題了,64位的好像也沒有解決,8步長的訪問會一次讀入兩個uint32的長度。

因此,下面的讀取也進行了操做,將兩個uint32的內存放到一個uint64中返回,這樣就同時解決了32位和64位訪問步長的問題。

因此,64位系統和32位系統,state1counter,waiter,semaphore的內存佈局是不同的。

state [0] state [1] state [2]
64位 waiter counter semaphore
32位 semaphore waiter counter

counter位於高地址位,waiter位於地址位

waitgroup

下面是state的代碼

func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
	// 判斷是不是64位
	if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
		return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
	} else {
		return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
	}
}

對於count和wait在高低地址位的體現,在add中的代碼可體現

// // 將 delta 加到 statep 的前 32 位上,即加到計數器上
	state := atomic.AddUint64(statep, uint64(delta)<<32)
	v := int32(state >> 32) // count
	w := uint32(state)  // wait

經過補碼的移位來看下分析下

statep               0000 0000 0000 0000 0000 0000 0000 0001 0000 0000 0000 0000 0000 0000 0000 0000  

int64(1)             0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0001

int64(1)<<32:       0000 0000 0000 0000 0000 0000 0000 0001 0000 0000 0000 0000 0000 0000 0000 0000  

statep+int64(1)<<32  

state:              0000 0000 0000 0000 0000 0000 0000 0001 0000 0000 0000 0000 0000 0000 0000 0000  

int32(state >> 32)   0000 0000 0000 0000 0000 0000 0000 0001

uint32(state):      0000 0000 0000 0000 0000 0000 0000 0000

再來看下-1操做

statep               0000 0000 0000 0000 0000 0000 0000 0001 0000 0000 0000 0000 0000 0000 0000 0000  

int64(-1)            1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111

int64(-11)<<32:     1111 1111 1111 1111 1111 1111 1111 1111 0000 0000 0000 0000 0000 0000 0000 0000  

statep+int64(-11)<<32  

state:              0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000  

int32(state >> 32)   0000 0000 0000 0000 0000 0000 0000 0000

uint32(state):      0000 0000 0000 0000 0000 0000 0000 0000

關於補碼的操做可參考原碼, 反碼, 補碼 詳解

信號量(semaphore)

信號量是Unix系統提供的一種保護共享資源的機制,用於防止多個線程同時訪問某個資源。

可簡單理解爲信號量爲一個數值:

  • 當信號量>0時,表示資源可用,獲取信號量時系統自動將信號量減1;

  • 當信號量==0時,表示資源暫不可用,獲取信號量時,當前線程會進入睡眠,當信號量爲正時被喚醒。

WaitGroup中的實現就用到了這個,在下面的代碼實現就能看到

Add(Done)

// Add將增量(可能爲負)添加到WaitGroup計數器中。
// 若是計數器爲零,則釋放等待時阻塞的全部goroutine。
// 若是計數器變爲負數,請添加恐慌。
//
// 請注意,當計數器爲 0 時發生的帶有正的 delta 的調用必須在 Wait 以前。
// 當計數器大於 0 時,帶有負 delta 的調用或帶有正 delta 調用可能在任什麼時候候發生。
// 一般,這意味着對Add的調用應在語句以前執行建立要等待的goroutine或其餘事件。
// 若是將WaitGroup重用於等待幾個獨立的事件集,新的Add調用必須在全部先前的Wait調用返回以後發生。
func (wg *WaitGroup) Add(delta int) {
	// 獲取counter,waiter,以及semaphore對應的指針
	statep, semap := wg.state()
	...
	// 將 delta 加到 statep 的前 32 位上,即加到計數器上
	state := atomic.AddUint64(statep, uint64(delta)<<32)
	// 高地址位counter
	v := int32(state >> 32)
	// 低地址爲waiter
	w := uint32(state)
	...
	// 計數器不容許爲負數
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
	// wait不等於0說明已經執行了Wait,此時不允許Add
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// 計數器的值大於或者沒有waiter在等待,直接返回
	if v > 0 || w == 0 {
		return
	}
	// 運行到這裏只有一種狀況 v == 0 && w != 0

	// 這時 Goroutine 已經將計數器清零,且等待器大於零(併發調用致使)
	// 這時不容許出現併發使用致使的狀態突變,不然就應該 panic
	// - Add 不能與 Wait 併發調用
	// - Wait 在計數器已經歸零的狀況下,不能再繼續增長等待器了
	// 仍然檢查來保證 WaitGroup 不會被濫用

	// 這一點很重要,這段代碼同時也保證了這是最後的一個須要等待阻塞的goroutine
	// 而後在下面經過runtime_Semrelease,喚醒被信號量semap阻塞的waiter
	if *statep != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// 結束後將等待器清零
	*statep = 0
	for ; w != 0; w-- {
		// 釋放信號量,經過runtime_Semacquire喚醒被阻塞的waiter
		runtime_Semrelease(semap, false, 0)
	}
}

梳理下流程

一、首先獲取存儲在state1中對應的幾個變量的指針;

二、counter存儲在高位,增長的時候須要左移32位;

三、counter的數量不能小於0,小於0拋出panic;

四、一樣也會判斷,已經的執行wait以後,不能在增長counter;

五、(這點很重要,我本身看了很久才明白)計數器的值大於或者沒有waiter在等待,直接返回

// 計數器的值大於或者沒有waiter在等待,直接返回
	if v > 0 || w == 0 {
		return
	}

由於waiter的值只會被執行一次+1操做,因此這段代碼保證了只有在v == 0 && w != 0,也就是最後一個Done()操做的時候,走到下面的代碼,釋放信號量,喚醒被信號量阻塞的Wait(),結束整個WaitGroup

Done()也是調用了Add

func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

Wait

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
	// 獲取counter,waiter,以及semaphore對應的指針
	statep, semap := wg.state()
	...
	for {
		// 獲取對應的counter和waiter數量
		state := atomic.LoadUint64(statep)
		v := int32(state >> 32)
		w := uint32(state)
		// Counter爲0,不須要等待
		if v == 0 {
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			return
		}
		// 原子(cas)增長waiter的數量(只會被+1操做一次)
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
			...
			// 這塊用到了,咱們上文講的那個信號量
			// 等待被runtime_Semrelease釋放的信號量喚醒
			// 若是 *semap > 0 則會減 1,等於0則被阻塞
			runtime_Semacquire(semap)

			// 在這種狀況下,若是 *statep 不等於 0 ,則說明使用失誤,直接 panic
			if *statep != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			...
			return
		}
	}
}

梳理下流程

一、首先獲取存儲在state1中對應的幾個變量的指針;

二、一個for循環,來阻塞等待全部的goroutine退出;

三、若是counter爲0,不須要等待,直接退出便可;

四、原子(cas)增長waiter的數量(只會被+1操做一次);

五、整個Wait()會被runtime_Semacquire阻塞,直到等到退出的信號量;

六、Done()會在最後一次的時候經過runtime_Semrelease發出取消阻塞的信號,而後被runtime_Semacquire阻塞的Wait()就能夠退出了;

七、整個WaitGroup執行成功。

waitgroup

總結

代碼中我感到設計比較巧妙的有兩個部分:

一、state1的處理,保證內存對齊,設置高低位內存來存儲不一樣的值,同時32位和64位平臺的處理方式還不一樣;

二、信號量的阻塞退出,這塊最後一個Done退出的時候,纔會觸發阻塞信號量,退出Wait(),而後結束整個waitGroup。再此以前,當Wait()在成功將waiter變量+1操做以後,就會被runtime_Semacquire阻塞,直到最後一個Done,信號的發出。

對於WaitGroup的使用

一、計數器的值不能爲負數,多是Add(-1)觸發的,也多是Done()觸發的,不然會panic;

二、Add數量的添加,要發生在Wait()以前;

三、WaitGroup是能夠重用的,可是須要等上一批的goroutine 都調用Wait完畢後才能繼續重用WaitGroup

參考

【《Go專家編程》Go WaitGroup實現原理】https://my.oschina.net/renhc/blog/2249061
【Go中由WaitGroup引起對內存對齊思考】https://cloud.tencent.com/developer/article/1776930
【Golang 之 WaitGroup 源碼解析】https://www.linkinstar.wiki/2020/03/15/golang/source-code/sync-waitgroup-source-code/
【sync.WaitGroup】https://golang.design/under-the-hood/zh-cn/part1basic/ch05sync/waitgroup/

相關文章
相關標籤/搜索