最近在工做中碰到了 GC 的問題:項目中大量重複地建立許多對象,形成 GC 的工做量巨大,CPU 頻繁掉底。準備使用 sync.Pool
來緩存對象,減輕 GC 的消耗。爲了用起來更順暢,我特意研究了一番,造成此文。本文從使用到源碼解析,按部就班,一一道來。html
本文基於 Go 1.14
是 sync 包下的一個組件,能夠做爲保存臨時取還對象的一個「池子」。我的以爲它的名字有必定的誤導性,由於 Pool 裏裝的對象能夠被無通知地被回收,可能 sync.Cache
是一個很好的選擇。頻繁地分配、回收內存會給 GC 帶來必定的負擔,嚴重的時候會引發 CPU 的毛刺,而 sync.Pool
能夠將暫時不用的對象緩存起來,待下次須要的時候直接使用,不用再次通過內存分配,複用對象的內存,減輕 GC 的壓力,提高系統的性能。github
是協程安全的,這對於使用者來講是極其方便的。使用前,設置好對象的 New
函數,用於在 Pool
裏沒有緩存的對象時,建立一個。以後,在程序的任何地方、任什麼時候候僅經過 Get()
下面是 2018 年的時候,《Go 夜讀》上關於 sync.Pool
當多個 goroutine 都須要建立同⼀個對象的時候,若是 goroutine 數過多,致使對象的建立數⽬劇增,進⽽致使 GC 壓⼒增大。造成 「併發⼤-佔⽤內存⼤-GC 緩慢-處理併發能⼒下降-併發更⼤」這樣的惡性循環。在這個時候,須要有⼀個對象池,每一個 goroutine 再也不⾃⼰單首創建對象,⽽是從對象池中獲取出⼀個對象(若是池中已經有的話)。json
package main import ( "fmt" "sync" ) var pool *sync.Pool type Person struct { Name string } func initPool() { pool = &sync.Pool { New: func()interface{} { fmt.Println("Creating a new Person") return new(Person) }, } } func main() { initPool() p := pool.Get().(*Person) fmt.Println("首次從 pool 裏獲取:", p) p.Name = "first" fmt.Printf("設置 p.Name = %s\n", p.Name) pool.Put(p) fmt.Println("Pool 裏已有一個對象:&{first},調用 Get: ", pool.Get().(*Person)) fmt.Println("Pool 沒有對象了,調用 Get: ", pool.Get().(*Person)) }
Creating a new Person 首次從 pool 裏獲取: &{} 設置 p.Name = first Pool 裏已有一個對象:&{first},Get: &{first} Creating a new Person Pool 沒有對象了,Get: &{}
首先,須要初始化 Pool
,惟一須要的就是設置好 New
函數。當調用 Get 方法時,若是池子裏緩存了對象,就直接返回緩存的對象。若是沒有存貨,則調用 New 函數建立一個新的對象。性能優化
另外,咱們發現 Get 方法取出來的對象和上次 Put 進去的對象其實是同一個,Pool 沒有作任何「清空」的處理。但咱們不該當對此有任何假設,由於在實際的併發使用場景中,沒法保證這種順序,最好的作法是在 Put 前,將對象清空。
這部分主要看 fmt.Printf
func Printf(format string, a ...interface{}) (n int, err error) { return Fprintf(os.Stdout, format, a...) }
繼續看 Fprintf
func Fprintf(w io.Writer, format string, a ...interface{}) (n int, err error) { p := newPrinter() p.doPrintf(format, a) n, err = w.Write(p.buf) p.free() return }
函數的參數是一個 io.Writer
傳的是 os.Stdout
,至關於直接輸出到標準輸出。這裏的 newPrinter
用的就是 Pool:
// newPrinter allocates a new pp struct or grabs a cached one. func newPrinter() *pp { p := ppFree.Get().(*pp) p.panicking = false p.erroring = false p.wrapErrs = false p.fmt.init(&p.buf) return p } var ppFree = sync.Pool{ New: func() interface{} { return new(pp) }, }
回到 Fprintf
函數,拿到 pp 指針後,會作一些 format 的操做,而且將 p.buf 裏面的內容寫入 w。最後,調用 free 函數,將 pp 指針歸還到 Pool 中:
// free saves used pp structs in ppFree; avoids an allocation per invocation. func (p *pp) free() { if cap(p.buf) > 64<<10 { return } p.buf = p.buf[:0] p.arg = nil p.value = reflect.Value{} p.wrappedErr = nil ppFree.Put(p) }
歸還到 Pool 前將對象的一些字段清零,這樣,經過 Get 拿到緩存的對象時,就能夠安全地使用了。
經過 test 文件學習源碼是一個很好的途徑,由於它表明了「官方」的用法。更重要的是,測試用例會故意測試一些「坑」,學習這些坑,也會讓本身在使用的時候就能學會避免。
文件裏共有 7 個測試,4 個 BechMark。
和 TestPoolNew
比較簡單,主要是測試 Get/Put 的功能。咱們來看下 TestPoolNew
func TestPoolNew(t *testing.T) { // disable GC so we can control when it happens. defer debug.SetGCPercent(debug.SetGCPercent(-1)) i := 0 p := Pool{ New: func() interface{} { i++ return i }, } if v := p.Get(); v != 1 { t.Fatalf("got %v; want 1", v) } if v := p.Get(); v != 2 { t.Fatalf("got %v; want 2", v) } // Make sure that the goroutine doesn't migrate to another P // between Put and Get calls. Runtime_procPin() p.Put(42) if v := p.Get(); v != 42 { t.Fatalf("got %v; want 42", v) } Runtime_procUnpin() if v := p.Get(); v != 3 { t.Fatalf("got %v; want 3", v) } }
首先設置了 GC=-1
,做用就是中止 GC。那爲啥要用 defer?函數都跑完了,還要 defer 幹啥。注意到,debug.SetGCPercent
這個函數被調用了兩次,並且這個函數返回的是上一次 GC 的值。所以,defer 在這裏的用途是還原到調用此函數以前的 GC 設置,也就是恢復現場。
接着,調置了 Pool 的 New 函數:直接返回一個 int,變且每次調用 New,都會自增 1。而後,連續調用了兩次 Get 函數,由於這個時候 Pool 裏沒有緩存的對象,所以每次都會調用 New 建立一個,因此第一次返回 1,第二次返回 2。
而後,調用 Runtime_procPin()
防止 goroutine 被強佔,目的是保護接下來的一次 Put 和 Get 操做,使得它們操做的對象都是同一個 P 的「池子」。而且,此次調用 Get 的時候並無調用 New,由於以前有一次 Put 的操做。
最後,再次調用 Get 操做,由於沒有「存貨」,所以仍是會再次調用 New 建立一個對象。
和 TestPoolRelease
則主要測試 GC 對 Pool 裏對象的影響。這裏用了一個函數,用於計數有多少對象會被 GC 回收:
runtime.SetFinalizer(v, func(vv *string) { atomic.AddUint32(&fin, 1) })
當垃圾回收檢測到 v
是一個不可達的對象時,而且 v
又有一個關聯的 Finalizer
,就會另起一個 goroutine 調用設置的 finalizer 函數,也就是上面代碼裏的參數 func。這樣,就會讓對象 v 從新可達,從而在此次 GC 過程當中不被回收。以後,解綁對象 v 和它所關聯的 Finalizer
,當下次 GC 再次檢測到對象 v 不可達時,纔會被回收。
從名字看,主要是想測一下「壓力」,具體操做就是起了 10 個 goroutine 不斷地向 Pool 裏 Put 對象,而後又 Get 對象,看是否會出錯。
和 TestPoolChain
,都調用了 testPoolDequeue
,這是具體幹活的。它須要傳入一個 PoolDequeue
// poolDequeue testing. type PoolDequeue interface { PushHead(val interface{}) bool PopHead() (interface{}, bool) PopTail() (interface{}, bool) }
是一個雙端隊列,能夠從頭部入隊元素,從頭部和尾部出隊元素。調用函數時,前者傳入 NewPoolDequeue(16)
,後者傳入 NewPoolChain()
,底層其實都是 poolDequeue
這個結構體。具體來看 testPoolDequeue
總共起了 10 個 goroutine:1 個生產者,9 個消費者。生產者不斷地從隊列頭 pushHead 元素到雙端隊列裏去,而且每 push 10 次,就 popHead 一次;消費者則一直從隊列尾取元素。不管是從隊列頭仍是從隊列尾取元素,都會在 map 裏作標記,最後檢驗每一個元素是否是隻被取出過一次。
剩下的就是 Benchmark 測試了。第一個 BenchmarkPool
比較簡單,就是不停地 Put/Get,測試性能。
函數會先關掉 GC,再向 pool 裏 put 10 個對象,而後強制觸發 GC,記錄 GC 的停頓時間,而且作一個排序,計算 P50 和 P95 的 STW 時間。這個函數能夠加入我的的代碼庫了:
func BenchmarkPoolSTW(b *testing.B) { // Take control of GC. defer debug.SetGCPercent(debug.SetGCPercent(-1)) var mstats runtime.MemStats var pauses []uint64 var p Pool for i := 0; i < b.N; i++ { // Put a large number of items into a pool. const N = 100000 var item interface{} = 42 for i := 0; i < N; i++ { p.Put(item) } // Do a GC. runtime.GC() // Record pause time. runtime.ReadMemStats(&mstats) pauses = append(pauses, mstats.PauseNs[(mstats.NumGC+255)%256]) } // Get pause time stats. sort.Slice(pauses, func(i, j int) bool { return pauses[i] < pauses[j] }) var total uint64 for _, ns := range pauses { total += ns } // ns/op for this benchmark is average STW time. b.ReportMetric(float64(total)/float64(b.N), "ns/op") b.ReportMetric(float64(pauses[len(pauses)*95/100]), "p95-ns/STW") b.ReportMetric(float64(pauses[len(pauses)*50/100]), "p50-ns/STW") }
我在 mac 上跑了一下:
go test -v -run=none -bench=BenchmarkPoolSTW
goos: darwin goarch: amd64 pkg: sync BenchmarkPoolSTW-12 361 3708 ns/op 3583 p50-ns/STW 5008 p95-ns/STW PASS ok sync 1.481s
最後一個 BenchmarkPoolExpensiveNew
測試當 New 的代價很高時,Pool 的表現。也能夠加入我的的代碼庫。
標準庫中 encoding/json
也用到了 sync.Pool 來提高性能。著名的 gin
框架,對 context 取用也到了 sync.Pool
來看下 gin
如何使用 sync.Pool。設置 New 函數:
engine.pool.New = func() interface{} { return engine.allocateContext() } func (engine *Engine) allocateContext() *Context { return &Context{engine: engine, KeysMutex: &sync.RWMutex{}} }
// ServeHTTP conforms to the http.Handler interface. func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) { c := engine.pool.Get().(*Context) c.writermem.reset(w) c.Request = req c.reset() engine.handleHTTPRequest(c) engine.pool.Put(c) }
先調用 Get 取出來緩存的對象,而後會作一些 reset 操做,再執行 handleHTTPRequest
,最後再 Put 回 Pool。
另外,Echo 框架也使⽤了 sync.Pool
來管理 context
It leverages sync pool to reuse memory and achieve zero dynamic memory allocation with no GC overhead.
首先來看 Pool 的結構體:
type Pool struct { noCopy noCopy // 每一個 P 的本地隊列,實際類型爲 [P]poolLocal local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal // [P]poolLocal的大小 localSize uintptr // size of the local array victim unsafe.Pointer // local from previous cycle victimSize uintptr // size of victims array // 自定義的對象建立回調函數,當 pool 中無可用對象時會調用此函數 New func() interface{} }
由於 Pool 不但願被複制,因此結構體裏有一個 noCopy 的字段,使用 go vet
工具能夠檢測到用戶代碼是否複製了 Pool。
是 go1.7 開始引入的一個靜態檢查機制。它不只僅工做在運行時或標準庫,同時也對用戶代碼有效。用戶只需實現這樣的不消耗內存、僅用於靜態分析的結構,來保證一個對象在第一次使用後不會發生複製。
// noCopy 用於嵌入一個結構體中來保證其第一次使用後不會被複制 // // 見 https://golang.org/issues/8005#issuecomment-190753527 type noCopy struct{} // Lock 是一個空操做用來給 `go ve` 的 -copylocks 靜態分析 func (*noCopy) Lock() {} func (*noCopy) Unlock() {}
字段存儲指向 [P]poolLocal
則表示 local 數組的大小。訪問時,P 的 id 對應 [P]poolLocal
下標索引。經過這樣的設計,多個 goroutine 使用同一個 Pool 時,減小了競爭,提高了性能。
在一輪 GC 到來時,victim 和 victimSize 會分別「接管」 local 和 localSize。victim
的機制用於減小 GC 後冷啓動致使的性能抖動,讓分配對象更平滑。
Victim Cache 原本是計算機架構裏面的一個概念,是 CPU 硬件處理緩存的一種技術,
引入的意圖在於下降 GC 壓力的同時提升命中率。
當 Pool 沒有緩存的對象時,調用 New
type poolLocal struct { poolLocalInternal // 將 poolLocal 補齊至兩個緩存行的倍數,防止 false sharing, // 每一個緩存行具備 64 bytes,即 512 bit // 目前咱們的處理器通常擁有 32 * 1024 / 64 = 512 條緩存行 // 僞共享,僅佔位用,防止在 cache line 上分配多個 poolLocalInternal pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte } // Local per-P Pool appendix. type poolLocalInternal struct { // P 的私有緩存區,使用時無須要加鎖 private interface{} // 公共緩存區。本地 P 能夠 pushHead/popHead;其餘 P 則只能 popTail shared poolChain }
字段 pad
主要是防止 false sharing
,董大的《什麼是 cpu cache》裏講得比較好:
現代 cpu 中,cache 都劃分紅以 cache line (cache block) 爲單位,在 x86_64 體系下通常都是 64 字節,cache line 是操做的最小單元。程序即便只想讀內存中的 1 個字節數據,也要同時把附近 63 節字加載到 cache 中,若是讀取超個 64 字節,那麼就要加載到多個 cache line 中。
簡單來講,若是沒有 pad 字段,那麼當須要訪問 0 號索引的 poolLocal 時,CPU 同時會把 0 號和 1 號索引同時加載到 cpu cache。在只修改 0 號索引的狀況下,會讓 1 號索引的 poolLocal 失效。這樣,當其餘線程想要讀取 1 號索引時,發生 cache miss,還得從新再加載,對性能有損。增長一個 pad
,補齊緩存行,讓相關的字段能獨立地加載到緩存行就不會出現 false sharding
type poolChain struct { // 只有生產者會 push to,不用加鎖 head *poolChainElt // 讀寫須要原子控制。 pop from tail *poolChainElt } type poolChainElt struct { poolDequeue // next 被 producer 寫,consumer 讀。因此只會從 nil 變成 non-nil // prev 被 consumer 寫,producer 讀。因此只會從 non-nil 變成 nil next, prev *poolChainElt } type poolDequeue struct { // The head index is stored in the most-significant bits so // that we can atomically add to it and the overflow is // harmless. // headTail 包含一個 32 位的 head 和一個 32 位的 tail 指針。這兩個值都和 len(vals)-1 取模過。 // tail 是隊列中最老的數據,head 指向下一個將要填充的 slot // slots 的有效範圍是 [tail, head),由 consumers 持有。 headTail uint64 // vals 是一個存儲 interface{} 的環形隊列,它的 size 必須是 2 的冪 // 若是 slot 爲空,則 vals[i].typ 爲空;不然,非空。 // 一個 slot 在這時宣告無效:tail 不指向它了,vals[i].typ 爲 nil // 由 consumer 設置成 nil,由 producer 讀 vals []eface }
被實現爲單生產者、多消費者的固定大小的無鎖(atomic 實現) Ring 式隊列(底層存儲使用數組,使用兩個指針標記 head、tail)。生產者能夠從 head 插入、head 刪除,而消費者僅可從 tail 刪除。
指向隊列的頭和尾,經過位運算將 head 和 tail 存入 headTail 變量中。
咱們用一幅圖來完整地描述 Pool 結構體:
咱們看到 Pool 並無直接使用 poolDequeue,緣由是它的大小是固定的,而 Pool 的大小是沒有限制的。所以,在 poolDequeue 之上包裝了一下,變成了一個 poolChainElt
func (p *Pool) Get() interface{} { // ...... l, pid := p.pin() x := l.private l.private = nil if x == nil { x, _ = l.shared.popHead() if x == nil { x = p.getSlow(pid) } } runtime_procUnpin() // ...... if x == nil && p.New != nil { x = p.New() } return x }
省略號的內容是 race
相關的,屬於閱讀源碼過程當中的一些噪音,暫時註釋掉。這樣,Get 的整個過程就很是清晰了:
函數將當前的 goroutine 和 P 綁定,禁止被搶佔,返回當前 P 對應的 poolLocal,以及 pid。runtime_procUnpin()
先來看 Pool.pin()
// src/sync/pool.go // 調用方必須在完成取值後調用 runtime_procUnpin() 來取消搶佔。 func (p *Pool) pin() (*poolLocal, int) { pid := runtime_procPin() s := atomic.LoadUintptr(&p.localSize) // load-acquire l := p.local // load-consume // 由於可能存在動態的 P(運行時調整 P 的個數) if uintptr(pid) < s { return indexLocal(l, pid), pid } return p.pinSlow() }
的做用就是將當前 groutine 和 P 綁定在一塊兒,禁止搶佔。而且返回對應的 poolLocal 以及 P 的 id。
若是 G 被搶佔,則 G 的狀態從 running 變成 runnable,會被放回 P 的 localq 或 globaq,等待下一次調度。下次再執行時,就不必定是和如今的 P 相結合了。由於以後會用到 pid,若是被搶佔了,有可能接下來使用的 pid 與所綁定的 P 並不是同一個。
「綁定」的任務最終交給了 procPin
// src/runtime/proc.go func procPin() int { _g_ := getg() mp := _g_.m mp.locks++ return int(mp.p.ptr().id) }
實現的代碼很簡潔:將當前 goroutine 綁定的 m 上的一個鎖字段 locks 值加 1,即完成了「綁定」。關於 pin 的原理,能夠參考《golang的對象池sync.pool源碼解讀》,文章詳細分析了爲何執行 procPin
以後,不可搶佔,且 GC 不會清掃 Pool 裏的對象。
咱們再回到 p.pin()
,原子操做取出 p.localSize
和 p.local
,若是當前 pid
小於 p.localSize
,則直接取 poolLocal 數組中的 pid 索引處的元素。不然,說明 Pool 尚未建立 poolLocal,調用 p.pinSlow()
func (p *Pool) pinSlow() (*poolLocal, int) { // Retry under the mutex. // Can not lock the mutex while pinned. runtime_procUnpin() allPoolsMu.Lock() defer allPoolsMu.Unlock() pid := runtime_procPin() // poolCleanup won't be called while we are pinned. // 沒有使用原子操做,由於已經加了全局鎖了 s := p.localSize l := p.local // 由於 pinSlow 中途可能已經被其餘的線程調用,所以這時候須要再次對 pid 進行檢查。 若是 pid 在 p.local 大小範圍內,則不用建立 poolLocal 切片,直接返回。 if uintptr(pid) < s { return indexLocal(l, pid), pid } if p.local == nil { allPools = append(allPools, p) } // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one. // 當前 P 的數量 size := runtime.GOMAXPROCS(0) local := make([]poolLocal, size) // 舊的 local 會被回收 atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release return &local[pid], pid }
由於要上一把大鎖 allPoolsMu
,因此函數名帶有 slow
。咱們知道,鎖粒度越大,競爭越多,天然就越「slow」。不過要想上鎖的話,得先解除「綁定」,鎖上以後,再執行「綁定」。緣由是鎖越大,被阻塞的機率就越大,若是還佔着 P,那就浪費資源。
在解除綁定後,pinSlow 可能被其餘的線程調用過了,p.local 可能會發生變化。所以這時候須要再次對 pid 進行檢查。若是 pid 在 p.localSize 大小範圍內,則不用再建立 poolLocal 切片,直接返回。
以後,根據 P 的個數,使用 make 建立切片,包含 runtime.GOMAXPROCS(0)
個 poolLocal,而且使用原子操做設置 p.local 和 p.localSize。
最後,返回 p.local 對應 pid 索引處的元素。
關於這把大鎖 allPoolsMu
,曹大在《幾個 Go 系統可能遇到的鎖問題》裏講了一個例子。第三方庫用了 sync.Pool
,內部有一個結構體 fasttemplate.Template
,包含 sync.Pool
字段。而 rd 在使用時,每一個請求都會新建這樣一個結構體。因而,處理每一個請求時,都會嘗試從一個空的 Pool 裏取緩存的對象,最後 goroutine 都阻塞在了這把大鎖上,由於都在嘗試執行:allPools = append(allPools, p)
回到 Get 函數,再來看另外一個關鍵的函數:poolChain.popHead()
func (c *poolChain) popHead() (interface{}, bool) { d := c.head for d != nil { if val, ok := d.popHead(); ok { return val, ok } // There may still be unconsumed elements in the // previous dequeue, so try backing up. d = loadPoolChainElt(&d.prev) } return nil, false }
函數只會被 producer 調用。首先拿到頭節點:c.head,若是頭節點不爲空的話,嘗試調用頭節點的 popHead 方法。注意這兩個 popHead 方法實際上並不相同,一個是 poolChain
的,一個是 poolDequeue
的,有疑惑的,不妨回頭再看一下 Pool 結構體的圖。咱們來看 poolDequeue.popHead()
// /usr/local/go/src/sync/poolqueue.go func (d *poolDequeue) popHead() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) // 判斷隊列是否爲空 if tail == head { // Queue is empty. return nil, false } // head 位置是隊頭的前一個位置,因此此處要先退一位。 // 在讀出 slot 的 value 以前就把 head 值減 1,取消對這個 slot 的控制 head-- ptrs2 := d.pack(head, tail) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { // We successfully took back slot. slot = &d.vals[head&uint32(len(d.vals)-1)] break } } // 取出 val val := *(*interface{})(unsafe.Pointer(slot)) if val == dequeueNil(nil) { val = nil } // 重置 slot,typ 和 val 均爲 nil // 這裏清空的方式與 popTail 不一樣,與 pushHead 沒有競爭關係,因此不用過小心 *slot = eface{} return val, true }
此函數會刪掉而且返回 queue
的頭節點。但若是 queue
爲空的話,返回 false。這裏的 queue
存儲的實際上就是 Pool 裏緩存的對象。
整個函數的核心是一個無限循環,這是 Go 中經常使用的無鎖化編程形式。
首先調用 unpack
函數分離出 head 和 tail 指針,若是 head 和 tail 相等,即首尾相等,那麼這個隊列就是空的,直接就返回 nil,false
不然,將 head 指針後移一位,即 head 值減 1,而後調用 pack
打包 head 和 tail 指針。使用 atomic.CompareAndSwapUint64
比較 headTail 在這之間是否有變化,若是沒變化,至關於獲取到了這把鎖,那就更新 headTail 的值。而且把 vals 相應索引處的元素賦值給 slot。
由於 vals
長度實際是隻能是 2 的 n 次冪,所以 len(d.vals)-1
實際上獲得的值的低 n 位是全 1,它再與 head 相與,實際就是取 head 低 n 位的值。
獲得相應 slot 的元素後,通過類型轉換並判斷是不是 dequeueNil
,若是是,說明沒取到緩存的對象,返回 nil。
// /usr/local/go/src/sync/poolqueue.go // 由於使用 nil 表明空的 slots,所以使用 dequeueNil 表示 interface{}(nil) type dequeueNil *struct{}
最後,返回 val 以前,將 slot 「歸零」:*slot = eface{}
回到 poolChain.popHead()
,調用 poolDequeue.popHead()
拿到緩存的對象後,直接返回。不然,將 d
從新指向 d.prev
若是在 shared 裏沒有獲取到緩存對象,則繼續調用 Pool.getSlow()
,嘗試從其餘 P 的 poolLocal 偷取:
func (p *Pool) getSlow(pid int) interface{} { // See the comment in pin regarding ordering of the loads. size := atomic.LoadUintptr(&p.localSize) // load-acquire locals := p.local // load-consume // Try to steal one element from other procs. // 從其餘 P 中竊取對象 for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i+1)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } // 嘗試從victim cache中取對象。這發生在嘗試從其餘 P 的 poolLocal 偷去失敗後, // 由於這樣可使 victim 中的對象更容易被回收。 size = atomic.LoadUintptr(&p.victimSize) if uintptr(pid) >= size { return nil } locals = p.victim l := indexLocal(locals, pid) if x := l.private; x != nil { l.private = nil return x } for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } // 清空 victim cache。下次就不用再從這裏找了 atomic.StoreUintptr(&p.victimSize, 0) return nil }
從索引爲 pid+1 的 poolLocal 處開始,嘗試調用 shared.popTail()
獲取緩存對象。若是沒有拿到,則從 victim 裏找,和 poolLocal 的邏輯相似。
最後,實在沒找到,就把 victimSize 置 0,防止後來的「人」再到 victim 裏找。
在 Get 函數的最後,通過這一番操做仍是沒找到緩存的對象,就調用 New 函數建立一個新的對象。
最後,還剩一個 popTail 函數:
func (c *poolChain) popTail() (interface{}, bool) { d := loadPoolChainElt(&c.tail) if d == nil { return nil, false } for { d2 := loadPoolChainElt(&d.next) if val, ok := d.popTail(); ok { return val, ok } if d2 == nil { // 雙向鏈表只有一個尾節點,如今爲空 return nil, false } // 雙向鏈表的尾節點裏的雙端隊列被「掏空」,因此繼續看下一個節點。 // 而且因爲尾節點已經被「掏空」,因此要甩掉它。這樣,下次 popHead 就不會再看它有沒有緩存對象了。 if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { // 甩掉尾節點 storePoolChainElt(&d2.prev, nil) } d = d2 } }
在 for
循環的一開始,就把 d.next 加載到了 d2。由於 d 可能會短暫爲空,但若是 d2 在 pop 或者 pop fails 以前就不爲空的話,說明 d 就會永久爲空了。在這種狀況下,能夠安全地將 d 這個結點「甩掉」。
最後,將 c.tail
更新爲 d2
,能夠防止下次 popTail
的時候查看一個空的 dequeue
;而將 d2.prev
設置爲 nil
,能夠防止下次 popHead
時查看一個空的 dequeue
咱們再看一下核心的 poolDequeue.popTail
// src/sync/poolqueue.go:147 func (d *poolDequeue) popTail() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) // 判斷隊列是否空 if tail == head { // Queue is empty. return nil, false } // 先搞定 head 和 tail 指針位置。若是搞定,那麼這個 slot 就歸屬咱們了 ptrs2 := d.pack(head, tail+1) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { // Success. slot = &d.vals[tail&uint32(len(d.vals)-1)] break } } // We now own slot. val := *(*interface{})(unsafe.Pointer(slot)) if val == dequeueNil(nil) { val = nil } slot.val = nil atomic.StorePointer(&slot.typ, nil) // At this point pushHead owns the slot. return val, true }
從隊列尾部移除一個元素,若是隊列爲空,返回 false。此函數可能同時被多個消費者
函數的核心是一個無限循環,又是一個無鎖編程。先解出 head,tail 指針值,若是二者相等,說明隊列爲空。
由於要從尾部移除一個元素,因此 tail 指針前進 1,而後使用原子操做設置 headTail。
最後,將要移除的 slot 的 val 和 typ 「歸零」:
slot.val = nil atomic.StorePointer(&slot.typ, nil)
// src/sync/pool.go // Put 將對象添加到 Pool func (p *Pool) Put(x interface{}) { if x == nil { return } // …… l, _ := p.pin() if l.private == nil { l.private = x x = nil } if x != nil { l.shared.pushHead(x) } runtime_procUnpin() //…… }
一樣刪掉了 race 相關的函數,看起來清爽多了。整個 Put 的邏輯也很清晰:
方法嘗試將其放入 shared 字段所維護的雙端隊列中。一樣用流程圖來展現整個過程:
咱們來看 pushHead
// src/sync/poolqueue.go func (c *poolChain) pushHead(val interface{}) { d := c.head if d == nil { // poolDequeue 初始長度爲8 const initSize = 8 // Must be a power of 2 d = new(poolChainElt) d.vals = make([]eface, initSize) c.head = d storePoolChainElt(&c.tail, d) } if d.pushHead(val) { return } // 前一個 poolDequeue 長度的 2 倍 newSize := len(d.vals) * 2 if newSize >= dequeueLimit { // Can't make it any bigger. newSize = dequeueLimit } // 首尾相連,構成鏈表 d2 := &poolChainElt{prev: d} d2.vals = make([]eface, newSize) c.head = d2 storePoolChainElt(&d.next, d2) d2.pushHead(val) }
若是 c.head
爲空,就要建立一個 poolChainElt,做爲首結點,固然也是尾節點。它管理的雙端隊列的長度,初始爲 8,放滿以後,再建立一個 poolChainElt 節點時,雙端隊列的長度就要翻倍。固然,有一個最大長度限制(2^30):
const dequeueBits = 32 const dequeueLimit = (1 << dequeueBits) / 4
調用 poolDequeue.pushHead
嘗試將對象放到 poolDeque 裏去:
// src/sync/poolqueue.go // 將 val 添加到雙端隊列頭部。若是隊列已滿,則返回 false。此函數只能被一個生產者調用 func (d *poolDequeue) pushHead(val interface{}) bool { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { // 隊列滿了 return false } slot := &d.vals[head&uint32(len(d.vals)-1)] // 檢測這個 slot 是否被 popTail 釋放 typ := atomic.LoadPointer(&slot.typ) if typ != nil { // 另外一個 groutine 正在 popTail 這個 slot,說明隊列仍然是滿的 return false } // The head slot is free, so we own it. if val == nil { val = dequeueNil(nil) } // slot佔位,將val存入vals中 *(*interface{})(unsafe.Pointer(slot)) = val // head 增長 1 atomic.AddUint64(&d.headTail, 1<<dequeueBits) return true }
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { // Queue is full. return false }
也就是將尾部指針加上 d.vals
的長度,再取低 31 位,看它是否和 head 相等。咱們知道,d.vals
的長度其實是固定的,所以若是隊列已滿,那麼 if 語句的兩邊就是相等的。若是隊列滿了,直接返回 false。
不然,隊列沒滿,經過 head 指針找到即將填充的 slot 位置:取 head 指針的低 31 位。
// Check if the head slot has been released by popTail. typ := atomic.LoadPointer(&slot.typ) if typ != nil { // Another goroutine is still cleaning up the tail, so // the queue is actually still full. // popTail 是先設置 val,再將 typ 設置爲 nil。設置完 typ 以後,popHead 才能夠操做這個 slot return false }
上面這一段用來判斷是否和 popTail 有衝突發生,若是有,則直接返回 false。
最後,將 val 賦值到 slot,並將 head 指針值加 1。
// slot佔位,將val存入vals中 *(*interface{})(unsafe.Pointer(slot)) = val
這裏的實現比較巧妙,slot 是 eface 類型,將 slot 轉爲 interface{} 類型,這樣 val 能以 interface{} 賦值給 slot 讓 slot.typ 和 slot.val 指向其內存塊,因而 slot.typ 和 slot.val 均不爲空。
最後咱們再來看一下 pack 和 unpack 函數,它們其實是一組綁定、解綁 head 和 tail 指針的兩個函數。
// src/sync/poolqueue.go const dequeueBits = 32 func (d *poolDequeue) pack(head, tail uint32) uint64 { const mask = 1<<dequeueBits - 1 return (uint64(head) << dequeueBits) | uint64(tail&mask) }
的低 31 位爲全 1,其餘位爲 0,它和 tail 相與,就是隻看 tail 的低 31 位。而 head 向左移 32 位以後,低 32 位爲全 0。最後把兩部分「或」起來,head 和 tail 就「綁定」在一塊兒了。
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) { const mask = 1<<dequeueBits - 1 head = uint32((ptrs >> dequeueBits) & mask) tail = uint32(ptrs & mask) return }
取出 head 指針的方法就是將 ptrs 右移 32 位,再與 mask 相與,一樣只看 head 的低 31 位。而 tail 實際上更簡單,直接將 ptrs 與 mask 相與就能夠了。
對於 Pool 而言,並不能無限擴展,不然對象佔用內存太多了,會引發內存溢出。
幾乎全部的池技術中,都會在某個時刻清空或清除部分緩存對象,那麼在 Go 中什麼時候清理未使用的對象呢?
答案是 GC 發生時。
在 pool.go 文件的 init 函數裏,註冊了 GC 發生時,如何清理 Pool 的函數:
// src/sync/pool.go func init() { runtime_registerPoolCleanup(poolCleanup) }
// src/runtime/mgc.go // Hooks for other packages var poolcleanup func() // 利用編譯器標誌將 sync 包中的清理註冊到運行時 //go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanup func sync_runtime_registerPoolCleanup(f func()) { poolcleanup = f }
func poolCleanup() { for _, p := range oldPools { p.victim = nil p.victimSize = 0 } // Move primary cache to victim cache. for _, p := range allPools { p.victim = p.local p.victimSize = p.localSize p.local = nil p.localSize = 0 } oldPools, allPools = allPools, nil }
會在 STW 階段被調用。總體看起來,比較簡潔。主要是將 local 和 victim 做交換,這樣也就不致於讓 GC 把全部的 Pool 都清空了,有 victim 在「兜底」。
週期。鳥窩的【Go 1.13中 sync.Pool 是如何優化的?】講了 1.13 中的優化。
參考資料【理解 Go 1.13 中 sync.Pool 的設計與實現】 手動模擬了一下調用 poolCleanup
函數先後 oldPools,allPools,p.vitcim 的變化過程,很精彩:
- 初始狀態下,oldPools 和 allPools 均爲 nil。
和 oldPools
都是切片,切片的元素是指向 Pool 的指針,Get/Put 操做不須要經過它們。在第 6 步,若是還有其餘 Pool 執行了 Put 操做,allPools
在 Go 1.13 以前的實現中,poolCleanup
func poolCleanup() { for i, p := range allPools { allPools[i] = nil for i := 0; i < int(p.localSize); i++ { l := indexLocal(p.local, i) l.private = nil for j := range l.shared { l.shared[j] = nil } l.shared = nil } p.local = nil p.localSize = 0 } allPools = []*Pool{} }
直接清空了全部 Pool 的 p.local
和 poolLocal.shared
經過二者的對比發現,新版的實現相比 Go 1.13 以前,GC 的粒度拉大了,因爲實際回收的時間線拉長,單位時間內 GC 的開銷減少。由此基本明白 p.victim 的做用。它的定位是次級緩存,GC 時將對象放入其中,下一次 GC 來臨以前若是有 Get 調用則會從 p.victim 中取,直到再一次 GC 來臨時回收。
同時因爲從 p.victim 中取出對象使用完畢以後並未放回 p.victim 中,在必定程度也減少了下一次 GC 的開銷。原來 1 次 GC 的開銷被拉長到 2 次且會有必定程度的開銷減少,這就是 p.victim 引入的意圖。
【理解 Go 1.13 中 sync.Pool 的設計與實現】 這篇文章最後還總結了 sync.Pool
的設計理念,包括:無鎖、操做對象隔離、原子操做代替鎖、行爲隔離——鏈表、Victim Cache 下降 GC 開銷。寫得很是不錯,推薦閱讀。
另外,關於 sync.Pool
本文先是介紹了 Pool 是什麼,有什麼做用,接着給出了 Pool 的用法以及在標準庫、一些第三方庫中的用法,還介紹了 pool_test 中的一些測試用例。最後,詳細解讀了 sync.Pool
本文的結尾部分,再來詳細地總結一下關於 sync.Pool
是協程安全的,使用起來很是方便。設置好 New 函數後,調用 Get 獲取,調用 Put 歸還對象。gin
等框架也都使用了 sync.Pool。procPin
將 G 和 P 綁定,防止 G 被搶佔。在綁按期間,GC 沒法清理緩存的對象。victim
機制前,sync.Pool 裏對象的最⼤緩存時間是一個 GC 週期,當 GC 開始時,沒有被引⽤的對象都會被清理掉;加入 victim
機制後,最大緩存時間爲兩個 GC 週期。sync.Pool
引入的意圖在於下降 GC 壓力的同時提升命中率。sync.Pool
