Go 1.13版本中有幾個比較大的修改,其中之一是sync.Pool
修改了部分實現,減少某些極端狀況下的性能開銷。文中內容來源於筆者讀完sync.Pool源代碼的思考和總結,內容以Go 1.13中的實現爲準,少許內容涉及到Go 1.13以前,若有誤區請讀者多多指教。編程
在本文內容開始以前須要理解幾個在Go runtime中的概念,以便於更好的理解sync.Pool
中一些實現。數組
Go中調度器是GMP模型,簡單理解G就是goroutine;M能夠類比內核線程,是執行G的地方;P是調度G以及爲G的執行準備所需資源。通常狀況下,P的數量CPU的可用核心數,也可由runtime.GOMAXPROCS
指定。本文的重點並不是goroutine調度器,在此不作詳細解釋,感興趣能夠翻閱延伸閱讀的文章。 緩存
Go有這樣的調度規則:某個G不能一直佔用M,在某個時刻的時候,runtime(參見sysmon
)會判斷當前M是否能夠被搶佔,即M上正在執行的G讓出。P在合理的時刻將G調度到合理的M上執行,在runtime裏面,每一個P維護一個本地存放待執行G的隊列localq,同時還存在一個全局的待執行G的隊列globalq;調度就是P從localq或globalq中取出G到對應的M上執行,所謂搶佔,runtime將G搶佔移出運行狀態,拷貝G的執行棧放入待執行隊列中,多是某個P的localq,也多是globalq,等待下一次調度,所以當被搶佔的G重回待執行隊列時有可能此時的P與前一次運行的P並不是同一個。 數據結構
所謂禁止搶佔,即當前執行G不容許被搶佔調度,直到禁止搶佔標記解除。Go runtime實現了G的禁止搶佔與解除禁止搶佔。架構
func runtime_procPin() int
禁止搶佔,標記當前G在M上不會被搶佔,並返回當前所在P的ID。併發
func runtime_procUnpin()
解除G的禁止搶佔狀態,以後G可被搶佔。app
type poolDequeue struct { headTail uint64 vals []eface } type eface struct { typ, val unsafe.Pointer }
poolDequeue
被實現爲單生產者多消費者的固定大小的無鎖Ring式隊列。生產者能夠從head插入head刪除,而消費者僅可從tail刪除。headTail
指向了隊列的頭和尾,經過位運算將head和tail位置存入headTail
變量中。編程語言
func (d *poolDequeue) pushHead(val interface{}) bool { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) // Ring式隊列,頭尾相等則隊列已滿 if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { return false } slot := &d.vals[head&uint32(len(d.vals)-1)] // 原子操做拿到slot.typ typ := atomic.LoadPointer(&slot.typ) if typ != nil { return false } if val == nil { val = dequeueNil(nil) } // slot佔位,將val存入vals中 *(*interface{})(unsafe.Pointer(slot)) = val // 更改隊列指向頭 atomic.AddUint64(&d.headTail, 1<<dequeueBits) return true }
slot
,根據slot.typ
判斷當前slot是否已被存放數據,注意這裏使用了atomic.LoadPointer
取代鎖操做。val
賦值給slot
,這裏實現的比較巧妙,slot
是eface
類型,將slot
轉爲interface{}
類型,這樣val
能以interface{}
賦值給slot
讓slot.typ
和slot.val
指向其內存塊,這樣slot.typ
和slot.val
均不爲空,這也就是第2點條件判斷的來由。插入成功後head加1,指向隊頭的前一個空位,因爲插入刪除都涉及到對headTail
的修改,此處使用原子操做取代鎖。func (d *poolDequeue) popHead() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) // 判斷隊列是否爲空 if tail == head { return nil, false } // head位置是隊頭的前一個位置,因此此處要先退一位 head-- ptrs2 := d.pack(head, tail) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { slot = &d.vals[head&uint32(len(d.vals)-1)] break } } // 取出val val := *(*interface{})(unsafe.Pointer(slot)) if val == dequeueNil(nil) { val = nil } // 重置slot,typ和val均爲nil *slot = eface{} return val, true }
popHead
的代碼比較簡單,流程上無非就是判斷隊列是否空,拿出slot
轉換爲interface{}
而後重置slot
。popHead
與pushHead
有一點須要注意,在popHead
中,是先修改了headTail
,而後再取slot
,而在pushHead
中是先插入數據,而後再修改headTail
,至於爲何這裏先留一個疑問,後面將會詳細解釋。函數
func (d *poolDequeue) popTail() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) // 判斷隊列是否空 if tail == head { return nil, false } ptrs2 := d.pack(head, tail+1) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { slot = &d.vals[tail&uint32(len(d.vals)-1)] break } } // 取出val val := *(*interface{})(unsafe.Pointer(slot)) if val == dequeueNil(nil) { val = nil } // 重置slot.typ slot.val = nil atomic.StorePointer(&slot.typ, nil) return val, true }
popTail
的代碼相似popHead
,只是刪除對象從隊首變成隊尾,注意結尾部分代碼,使用了atomic.StorePointer
取代鎖操做。性能
poolDequeue
被實現爲Ring式隊列,而poolChain
則是基於poolDequeue
實現爲雙向鏈表。
type poolChain struct { head *poolChainElt tail *poolChainElt } type poolChainElt struct { poolDequeue next, prev *poolChainElt }
同理,poolChain
也實現了pushHead
,popHead
和popTail
。
func (c *poolChain) pushHead(val interface{}) { d := c.head if d == nil { // poolDequeue初始長度爲8 const initSize = 8 d = new(poolChainElt) d.vals = make([]eface, initSize) c.head = d storePoolChainElt(&c.tail, d) } if d.pushHead(val) { return } // 前一個poolDequeue長度的2倍 newSize := len(d.vals) * 2 if newSize >= dequeueLimit { newSize = dequeueLimit } // 首尾相連,構成鏈表 d2 := &poolChainElt{prev: d} d2.vals = make([]eface, newSize) c.head = d2 storePoolChainElt(&d.next, d2) d2.pushHead(val) }
到這裏大概就明白了,poolDequeue
是在poolChain
的pushHead
中建立的,且每次建立的長度都是前一個poolDequeue
長度的2倍,初始長度爲8。
func (c *poolChain) popHead() (interface{}, bool) { d := c.head for d != nil { if val, ok := d.popHead(); ok { return val, ok } d = loadPoolChainElt(&d.prev) } return nil, false }
popHead
以隊首向隊尾爲方向遍歷鏈表,對每一個poolDequeue
執行popHead
嘗試取出存放的對象。
func (c *poolChain) popTail() (interface{}, bool) { d := loadPoolChainElt(&c.tail) if d == nil { return nil, false } for { d2 := loadPoolChainElt(&d.next) if val, ok := d.popTail(); ok { return val, ok } if d2 == nil { return nil, false } if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { storePoolChainElt(&d2.prev, nil) } d = d2 } }
popTail
以隊尾向隊隊首爲方向遍歷鏈表,對每一個poolDequeue
執行popTail
嘗試從尾部取出存放的對象。
宏觀上來看,poolChain
的結構以下圖:
poolChain
爲sync.Pool的底層數據結構,接下來一覽Pool的實現。
type Pool struct { // ... // 每一個P的本地隊列,實際類型爲[P]poolLocal local unsafe.Pointer // [P]poolLocal的大小,<= P localSize uintptr victim unsafe.Pointer victimSize uintptr // 自定義的對象建立回調,當pool中無可用對象時會調用此函數 New func() interface{} } type poolLocalInternal struct { // 每一個P的私有共享,使用時無需加鎖 private interface{} // 對象列表,本地P能夠pushHead/popHead,其餘P僅能popTail shared poolChain } type poolLocal struct { poolLocalInternal // 僞共享,僅佔位用,防止在cache line上分配多個poolLocalInternal pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte }
接下來看看Pool的具體實現。
func (p *Pool) pin() (*poolLocal, int) { pid := runtime_procPin() s := atomic.LoadUintptr(&p.localSize) l := p.local if uintptr(pid) < s { return indexLocal(l, pid), pid } return p.pinSlow() }
pin
首先標記了當前G禁止搶佔,在runtime_procUnpin
以前,當前G和P不會被搶佔。此處之因此標記禁止搶佔是由於下文中有使用到P ID,若是被搶佔了,有可能接下里使用的P ID與所綁定的P並不是同一個。
在得到P ID以後,當P ID小於p.local
數組長度時在p.local
數組裏找到P對應的poolLocal
對象,不然進入pinSlow
函數建立新的poolLocal
。
func (p *Pool) pinSlow() (*poolLocal, int) { runtime_procUnpin() allPoolsMu.Lock() defer allPoolsMu.Unlock() pid := runtime_procPin() s := p.localSize l := p.local if uintptr(pid) < s { return indexLocal(l, pid), pid } if p.local == nil { allPools = append(allPools, p) } // 當前P的數量 size := runtime.GOMAXPROCS(0) local := make([]poolLocal, size) atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) atomic.StoreUintptr(&p.localSize, uintptr(size)) return &local[pid], pid }
pinSlow
的實現比較簡單,即當前P ID在Pool中沒有對應的poolLocal
對象時,則建立一個新的poolLocal
對象,舊的poolLocal
將會進入GC。仔細觀察pinSlow
函數發現先執行了runtime_procUnpin
,隨後有執行了runtime_procPin
,後者的目的在於獲取最新的P ID,這裏的意圖有點難以理解,在仔細閱讀和理解以後,發現目的在於儘可能減小[]poolLocal
的建立次數,由於pinSlow
以前和pinSlow
裏面可能會由於解除禁止搶佔而致使綁定的P不一致,萬一新綁定的P裏存在可用的poolLocal
呢。
func (p *Pool) Get() interface{} { // ... l, pid := p.pin() x := l.private l.private = nil if x == nil { x, _ = l.shared.popHead() if x == nil { x = p.getSlow(pid) } } // 解除禁止搶佔 runtime_procUnpin() // ... if x == nil && p.New != nil { x = p.New() } return x }
poolLocal
和P ID。poolLocal.private
不爲空,則表示可複用此對象;若爲空,則在poolLocal.shared
隊列中獲取對象。poolLocal.shared
無可用對象,則進入getSlow
獲取對象。nil
。func (p *Pool) getSlow(pid int) interface{} { size := atomic.LoadUintptr(&p.localSize) locals := p.local // 從其餘P中竊取對象 for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i+1)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } // 嘗試從victim cache中取對象 size = atomic.LoadUintptr(&p.victimSize) if uintptr(pid) >= size { return nil } locals = p.victim l := indexLocal(locals, pid) if x := l.private; x != nil { l.private = nil return x } for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } // 清空victim cache atomic.StoreUintptr(&p.victimSize, 0) return nil }
進入getSlow
的前提是當前P本地無可用對象,因而轉頭去其餘P的裏竊取對象,getSlow
就是作這件事情。
Pool.local
中的其餘P,確認其餘P的shared
裏是否有可用對象,若是有,則從鏈尾取出。縱觀整個Get過程會發現,從當前P的poolLocal
中取對象時使用的時popHead
,而從其餘P的poolLocal
中竊取對象時使用的時popTail
,再回到上文中對poolChain
的定義,能夠知道,當前P對本地poolLocal是生產者,對其餘P的poolLocal而言是消費者。
再次回到poolDequeue
和poolChain
上。咱們知道某一時刻P只會調度一個G,那麼對於生產者而言,調用pushHead
和popHead
並不須要加鎖,由於當前P操做的是本地poolLocal
;當消費者是其餘P,在進行popTail
操做時,則會對pushHead
以及popHead
造成競爭關係,對這種問題,poolDequeue
的實現直指要害。
首先注意eface
這個結構,若插入成功eface
下的兩個字段會指向要緩存對象的內存地址,在pushHead
中使用了原子操做判斷typ
字段是否爲nil
,存在這樣一種可能性:pushHead
所取到的slot
正在popTail
裏準備重置,這種狀況下pushHead
會直接返回失敗。
回到競爭問題上,pushHead
的流程能夠簡化爲先取slot
,再判斷是否可插入最後修改headTail
,而popTail
的流程能夠簡化爲先修改headTail
再取slot
而後重置slot
,pushHead
修改head位置,popTail
修改tail位置,因此對於headTail
字段使用原子操做避免便可讀寫衝突。
疑問是爲什麼popTail
中須要先修改headTail
呢,由於存在其餘P都會到當前P上竊取對象,當多個P都調用本地P的popTail
時,競爭現象就會更加明顯,因此此時應儘早修改headTail
,一旦某個P竊取到了其餘P就沒法再竊取此對象。
func (p *Pool) Put(x interface{}) { if x == nil { return } // ... l, _ := p.pin() if l.private == nil { l.private = x x = nil } if x != nil { l.shared.pushHead(x) } runtime_procUnpin() // ... }
Put
的實現比較簡單,優先將對象存入private
,若private
已存在則放入shared鏈表中,pin
中會標記禁止搶佔,所以須要在pin
結束以及Put邏輯結束後取消禁止搶佔。
Victim Cache本是計算機架構裏面的一個概念,是CPU硬件處理緩存的一種技術,sync.Pool
引入的意圖在於下降GC壓力同時提升命中率,本文並不須要詳解Victim Cache的原理,分析sync.Pool
便可明白其意圖。對於Pool而言有一點須要明白,這個Pool並不是是無限制擴展的,不然會引發內存溢出。幾乎全部的池技術中,都會在某個時刻清空或清除部分緩存對象,那麼在Go中什麼時候清理未使用的對象呢?
在Pool.Get函數中,取不到對象時會嘗試從p.victim
中取,用完後放回當前P的本地隊列,而p.vcitim
是什麼被建立的呢?是在poolCleanup
函數中,該函數會在GC時被調用到,在init
函數裏註冊。
func poolCleanup() { // 1 for _, p := range oldPools { p.victim = nil p.victimSize = 0 } // 2 for _, p := range allPools { p.victim = p.local p.victimSize = p.localSize p.local = nil p.localSize = 0 } oldPools, allPools = allPools, nil }
poolCleanup
會在STW階段被調用,函數實現雖然看起來簡單,但其意圖較爲複雜,那麼該如何解釋呢?
嘗試模擬一下實際狀況:
oldPools
和allPools
均爲nil
p.local
爲nil
,將會在pinSlow
中建立p.local
,而後將p
放入allPools
,此時allPools
長度爲1,oldPools
爲nil
allPools
中全部p.local
將值賦值給victim
並置爲nil
,最後allPools
爲nil
,oldPools
長度爲1p.local
爲nil
,此時會從p.victim
裏面嘗試取對象p.local
爲nil
,從新建立p.local
,並將對象放回,此時allPools
長度爲1,oldPools
長度爲1oldPools
中全部p.victim
置nil
,前一次的cache在本次GC時被回收,allPools
全部p.local
將值賦值給victim
並置爲nil
,最後allPools
爲nil,oldPools
長度爲1再來看看Go 1.13之前poolCleanup
的實現:
func poolCleanup() { for i, p := range allPools { allPools[i] = nil for i := 0; i < int(p.localSize); i++ { l := indexLocal(p.local, i) l.private = nil for j := range l.shared { l.shared[j] = nil } l.shared = nil } p.local = nil p.localSize = 0 } allPools = []*Pool{} }
Go 1.13之前poolCleanup
的實現簡單粗暴,每次GC STW階段遍歷allPools
,清空p.local
、poolLocal.shared
。
經過二者的對比發現,新版的實現相比Go 1.13以前,GC的粒度拉大了,因爲實際回收的時間線拉長,單位時間內GC的開銷減少。
由此基本明白p.victim
的做用,它的定位是次級緩存,GC時將對象放入其中,下一次GC來臨以前若是有Get調用則會從p.victim
中取,直到再一次GC來臨作回收,同時因爲從p.victim
中取出對象使用完畢以後並未放回p.victim
中,在必定程度也減少了下一次GC的開銷。原來1次GC的開銷被拉長到2次且會有必定程度的開銷減少,這就是p.victim
引入的意圖。
關於Victim Cache更多的信息能夠在延伸閱讀中找到。
無鎖編程是不少編程語言裏逃離不了的話題。sync.Pool
的無鎖是在poolDequeue
和poolChain
層面實現的。
縱觀整個sync.Pool
的實現,明確了生產者(本地P)訪問head,消費者(其餘P)訪問tail,從P的角度切入操做方向,實現了目標操做對象層面的「解耦」,大部分時候二者的操做互不影響。圖文示意以下:
poolDequeue
對一些關鍵變量採用了CAS操做,好比poolDequeue.headTail
,既可完整保證併發又能下降相比鎖而言的開銷。
這點與「操做對象隔離」是相輔相成的,一旦設計目標爲儘可能減小對同一對象的操做鎖,就須要對行爲進行隔離,鏈表能很好的知足這個設計目標:特定的P訪問特定的位置。從整個過程來看,鏈表是減小鎖的高效數據結構。
GC的開銷已經足夠足夠小了,但仍不可避免。對於sync.Pool
而言,避免極端狀況GC的開銷也是重點之一,因此Go 1.13的sync.Pool
引入了Victim Cache機制,有效拉長真正回收的時間線,從而減少單次GC的開銷。