內存對齊是一個比較大的話題,其核心機制是編譯器根據結構體內部元素的size結合平臺和編譯器自身的規則來進行補位, 而在sync.WaitGroup裏面就有用到,也是我感受可能在WaitGroup全部實現的核心特性裏面最重要的一條了緩存
在WaitGroup裏面只有state1 [3]uint32這一個元素,經過類型咱們能夠計算uint32是4個字節,長度3的數組總長度12,其實以前這個地方是[12]byte, 切換uint32是go語言裏面爲了讓底層的編譯器保證按照4個字節對齊而作的切換bash
首先go語言須要兼容32位和64位平臺,可是在32位平臺上對64字節的uint操做可能不是原子的,好比在讀取一個字長度的時候,另一個字的數據頗有可能已經發生改變了(在32位操做系統上,字長是4,而uint64長度爲8), 因此在實際計數的時候,其實sync.WaitGroup也就使用了4個字節來進行性能
在cpu內有一個cache line的緩存,這個緩存一般是8個字節的長度,在intel的cpu中,會保證針對一個cache line的操做是原子,若是隻有8個字節頗有可能會出現上面的這種狀況,即垮了兩個cache line, 這樣不管是在原子操做仍是性能上可能都會有問題測試
import ( "unsafe" ) type a struct { b byte } type w struct { state1 [3]uint32 } func main() { b := a{} println(unsafe.Sizeof(b), uintptr(unsafe.Pointer(&b)), uintptr(unsafe.Pointer(&b))%8 == 0) wg := w{} println(unsafe.Sizeof(wg), uintptr(unsafe.Pointer(&wg.state1)), uintptr(unsafe.Pointer(&wg.state1))%8 == 0) println(unsafe.Sizeof(wg), uintptr(unsafe.Pointer(&wg.state1[1])), uintptr(unsafe.Pointer(&wg.state1[1]))%8 == 0) }
1 824633919343 false 12 824633919356 false 12 824633919360 true
在sync.WaitGroup中對上面的提到的8字節的uint64也是分段計數,即高位記錄須要等待 Done的數量,而低位記錄當前正在Wait等待結束的計數
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2] } else { return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0] } }
func (wg *WaitGroup) Add(delta int) { // 獲取當前計數 statep, semap := wg.state() if race.Enabled { _ = *statep // trigger nil deref early if delta < 0 { // Synchronize decrements with Wait. race.ReleaseMerge(unsafe.Pointer(wg)) } race.Disable() defer race.Enable() } // 使用高32位進行counter計數 state := atomic.AddUint64(statep, uint64(delta)<<32) v := int32(state >> 32) // 獲取當前須要等待done的數量 w := uint32(state) // 獲取低32位即waiter等待計數 if race.Enabled && delta > 0 && v == int32(delta) { // The first increment must be synchronized with Wait. // Need to model this as a read, because there can be // several concurrent wg.counter transitions from 0. race.Read(unsafe.Pointer(semap)) } if v < 0 { panic("sync: negative WaitGroup counter") } if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 若是當前v>0,則表示還須要繼續未完成的goroutine進行Done操做 // 若是w ==0,則表示當前並無goroutine在wait等待結束 // 以上兩種狀況直接返回便可 if v > 0 || w == 0 { return } // 當waiters > 0 的時候,而且當前v==0,這個時候若是檢查發現state狀態先後發生改變,則 // 證實當前有人修改過,則刪除 // 若是走到這個地方則證實通過以前的操做後,當前的v==0,w!=0,就證實以前一輪的Done已經所有完成,如今須要喚醒全部在wait的goroutine // 此時若是發現當前的*statep值又發生了改變,則證實有有人進行了Add操做 // 也就是這裏的WaitGroup濫用 if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 將當前state的狀態設置爲0,就能夠進行下次的重用了 *statep = 0 for ; w != 0; w-- { // 釋放全部排隊的waiter runtime_Semrelease(semap, false) } }
func (wg *WaitGroup) Done() { // 減去一個-1 wg.Add(-1) }
func (wg *WaitGroup) Wait() { statep, semap := wg.state() if race.Enabled { _ = *statep // trigger nil deref early race.Disable() } for { // 獲取state的狀態 state := atomic.LoadUint64(statep) v := int32(state >> 32) // 獲取高32位的count w := uint32(state) // 獲取當前正在Wait的數量 if v == 0 { // 若是當前v ==0就直接return, 表示當前不須要等待 // Counter is 0, no need to wait. if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } // 進行低位的waiter計數統計 if atomic.CompareAndSwapUint64(statep, state, state+1) { if race.Enabled && w == 0 { // Wait must be synchronized with the first Add. // Need to model this is as a write to race with the read in Add. // As a consequence, can do the write only for the first waiter, // otherwise concurrent Waits will race with each other. race.Write(unsafe.Pointer(semap)) } // 若是成功則進行排隊休眠等待喚醒 runtime_Semacquire(semap) // 若是喚醒後發現state的狀態不爲0,則證實在喚醒的過程當中WaitGroup又被重用,則panic if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } } }