https://juejin.cn/post/691837...html
最近在工做中碰到了 GC 的問題:項目中大量重複地建立許多對象,形成 GC 的工做量巨大,CPU 頻繁掉底。準備使用 sync.Pool
來緩存對象,減輕 GC 的消耗。爲了用起來更順暢,我特意研究了一番,造成此文。本文從使用到源碼解析,按部就班,一一道來。git
本文基於 Go 1.14
sync.Pool
是 sync 包下的一個組件,能夠做爲保存臨時取還對象的一個「池子」。我的以爲它的名字有必定的誤導性,由於 Pool 裏裝的對象能夠被無通知地被回收,可能 sync.Cache
是一個更合適的名字。github
對於不少須要重複分配、回收內存的地方,sync.Pool
是一個很好的選擇。頻繁地分配、回收內存會給 GC 帶來必定的負擔,嚴重的時候會引發 CPU 的毛刺,而 sync.Pool
能夠將暫時不用的對象緩存起來,待下次須要的時候直接使用,不用再次通過內存分配,複用對象的內存,減輕 GC 的壓力,提高系統的性能。golang
首先,sync.Pool
是協程安全的,這對於使用者來講是極其方便的。使用前,設置好對象的 New
函數,用於在 Pool
裏沒有緩存的對象時,建立一個。以後,在程序的任何地方、任什麼時候候僅經過 Get()
、Put()
方法就能夠取、還對象了。編程
下面是 2018 年的時候,《Go 夜讀》上關於 sync.Pool
的分享,關於適用場景:json
當多個 goroutine 都須要建立同⼀個對象的時候,若是 goroutine 數過多,致使對象的建立數⽬劇增,進⽽致使 GC 壓⼒增大。造成 「併發⼤-佔⽤內存⼤-GC 緩慢-處理併發能⼒下降-併發更⼤」這樣的惡性循環。數組
在這個時候,須要有⼀個對象池,每一個 goroutine 再也不⾃⼰單首創建對象,⽽是從對象池中獲取出⼀個對象(若是池中已經有的話)。緩存
所以關鍵思想就是對象的複用,避免重複建立、銷燬,下面咱們來看看如何使用。安全
首先來看一個簡單的例子:性能優化
package main import ( "fmt" "sync" ) var pool \*sync.Pool type Person struct { Name string } func initPool() { pool = &sync.Pool { New: func()interface{} { fmt.Println("Creating a new Person") return new(Person) }, } } func main() { initPool() p := pool.Get().(\*Person) fmt.Println("首次從 pool 裏獲取:", p) p.Name = "first" fmt.Printf("設置 p.Name = %s\\n", p.Name) pool.Put(p) fmt.Println("Pool 裏已有一個對象:&{first},調用 Get: ", pool.Get().(\*Person)) fmt.Println("Pool 沒有對象了,調用 Get: ", pool.Get().(\*Person)) }
運行結果:
Creating a new Person 首次從 pool 裏獲取:&{} 設置 p.Name = first Pool 裏已有一個對象:&{first},Get: &{first} Creating a new Person Pool 沒有對象了,Get: &{}
首先,須要初始化 Pool
,惟一須要的就是設置好 New
函數。當調用 Get 方法時,若是池子裏緩存了對象,就直接返回緩存的對象。若是沒有存貨,則調用 New 函數建立一個新的對象。
另外,咱們發現 Get 方法取出來的對象和上次 Put 進去的對象其實是同一個,Pool 沒有作任何「清空」的處理。但咱們不該當對此有任何假設,由於在實際的併發使用場景中,沒法保證這種順序,最好的作法是在 Put 前,將對象清空。
這部分主要看 fmt.Printf
如何使用:
func Printf(format string, a ...interface{}) (n int, err error) { return Fprintf(os.Stdout, format, a...) }
繼續看 Fprintf
:
func Fprintf(w io.Writer, format string, a ...interface{}) (n int, err error) { p := newPrinter() p.doPrintf(format, a) n, err = w.Write(p.buf) p.free() return }
Fprintf
函數的參數是一個 io.Writer
,Printf
傳的是 os.Stdout
,至關於直接輸出到標準輸出。這裏的 newPrinter
用的就是 Pool:
// newPrinter allocates a new pp struct or grabs a cached one. func newPrinter() \*pp { p := ppFree.Get().(\*pp) p.panicking = false p.erroring = false p.wrapErrs = false p.fmt.init(&p.buf) return p } var ppFree = sync.Pool{ New: func() interface{} { return new(pp) }, }
回到 Fprintf
函數,拿到 pp 指針後,會作一些 format 的操做,而且將 p.buf 裏面的內容寫入 w。最後,調用 free 函數,將 pp 指針歸還到 Pool 中:
// free saves used pp structs in ppFree; avoids an allocation per invocation. func (p \*pp) free() { if cap(p.buf) > 64<<10 { return } p.buf = p.buf\[:0\] p.arg = nil p.value = reflect.Value{} p.wrappedErr = nil ppFree.Put(p) }
歸還到 Pool 前將對象的一些字段清零,這樣,經過 Get 拿到緩存的對象時,就能夠安全地使用了。
經過 test 文件學習源碼是一個很好的途徑,由於它表明了「官方」的用法。更重要的是,測試用例會故意測試一些「坑」,學習這些坑,也會讓本身在使用的時候就能學會避免。
pool_test
文件裏共有 7 個測試,4 個 BechMark。
TestPool
和 TestPoolNew
比較簡單,主要是測試 Get/Put 的功能。咱們來看下 TestPoolNew
:
func TestPoolNew(t \*testing.T) { // disable GC so we can control when it happens. defer debug.SetGCPercent(debug.SetGCPercent(-1)) i := 0 p := Pool{ New: func() interface{} { i++ return i }, } if v := p.Get(); v != 1 { t.Fatalf("got %v; want 1", v) } if v := p.Get(); v != 2 { t.Fatalf("got %v; want 2", v) } // Make sure that the goroutine doesn't migrate to another P // between Put and Get calls. Runtime\_procPin() p.Put(42) if v := p.Get(); v != 42 { t.Fatalf("got %v; want 42", v) } Runtime\_procUnpin() if v := p.Get(); v != 3 { t.Fatalf("got %v; want 3", v) } }
首先設置了 GC=-1
,做用就是中止 GC。那爲啥要用 defer?函數都跑完了,還要 defer 幹啥。注意到,debug.SetGCPercent
這個函數被調用了兩次,並且這個函數返回的是上一次 GC 的值。所以,defer 在這裏的用途是還原到調用此函數以前的 GC 設置,也就是恢復現場。
接着,調置了 Pool 的 New 函數:直接返回一個 int,變且每次調用 New,都會自增 1。而後,連續調用了兩次 Get 函數,由於這個時候 Pool 裏沒有緩存的對象,所以每次都會調用 New 建立一個,因此第一次返回 1,第二次返回 2。
而後,調用 Runtime_procPin()
防止 goroutine 被強佔,目的是保護接下來的一次 Put 和 Get 操做,使得它們操做的對象都是同一個 P 的「池子」。而且,此次調用 Get 的時候並無調用 New,由於以前有一次 Put 的操做。
最後,再次調用 Get 操做,由於沒有「存貨」,所以仍是會再次調用 New 建立一個對象。
TestPoolGC
和 TestPoolRelease
則主要測試 GC 對 Pool 裏對象的影響。這裏用了一個函數,用於計數有多少對象會被 GC 回收:
runtime.SetFinalizer(v, func(vv \*string) { atomic.AddUint32(&fin, 1) })
當垃圾回收檢測到 v
是一個不可達的對象時,而且 v
又有一個關聯的 Finalizer
,就會另起一個 goroutine 調用設置的 finalizer 函數,也就是上面代碼裏的參數 func。這樣,就會讓對象 v 從新可達,從而在此次 GC 過程當中不被回收。以後,解綁對象 v 和它所關聯的 Finalizer
,當下次 GC 再次檢測到對象 v 不可達時,纔會被回收。
TestPoolStress
從名字看,主要是想測一下「壓力」,具體操做就是起了 10 個 goroutine 不斷地向 Pool 裏 Put 對象,而後又 Get 對象,看是否會出錯。
TestPoolDequeue
和 TestPoolChain
,都調用了 testPoolDequeue
,這是具體幹活的。它須要傳入一個 PoolDequeue
接口:
// poolDequeue testing. type PoolDequeue interface { PushHead(val interface{}) bool PopHead() (interface{}, bool) PopTail() (interface{}, bool) }
PoolDequeue
是一個雙端隊列,能夠從頭部入隊元素,從頭部和尾部出隊元素。調用函數時,前者傳入 NewPoolDequeue(16)
,後者傳入 NewPoolChain()
,底層其實都是 poolDequeue
這個結構體。具體來看 testPoolDequeue
作了什麼:
雙端隊列
總共起了 10 個 goroutine:1 個生產者,9 個消費者。生產者不斷地從隊列頭 pushHead 元素到雙端隊列裏去,而且每 push 10 次,就 popHead 一次;消費者則一直從隊列尾取元素。不管是從隊列頭仍是從隊列尾取元素,都會在 map 裏作標記,最後檢驗每一個元素是否是隻被取出過一次。
剩下的就是 Benchmark 測試了。第一個 BenchmarkPool
比較簡單,就是不停地 Put/Get,測試性能。
BenchmarkPoolSTW
函數會先關掉 GC,再向 pool 裏 put 10 個對象,而後強制觸發 GC,記錄 GC 的停頓時間,而且作一個排序,計算 P50 和 P95 的 STW 時間。這個函數能夠加入我的的代碼庫了:
func BenchmarkPoolSTW(b \*testing.B) { // Take control of GC. defer debug.SetGCPercent(debug.SetGCPercent(-1)) var mstats runtime.MemStats var pauses \[\]uint64 var p Pool for i := 0; i < b.N; i++ { // Put a large number of items into a pool. const N = 100000 var item interface{} = 42 for i := 0; i < N; i++ { p.Put(item) } // Do a GC. runtime.GC() // Record pause time. runtime.ReadMemStats(&mstats) pauses = append(pauses, mstats.PauseNs\[(mstats.NumGC+255)%256\]) } // Get pause time stats. sort.Slice(pauses, func(i, j int) bool { return pauses\[i\] < pauses\[j\] }) var total uint64 for \_, ns := range pauses { total += ns } // ns/op for this benchmark is average STW time. b.ReportMetric(float64(total)/float64(b.N), "ns/op") b.ReportMetric(float64(pauses\[len(pauses)\*95/100\]), "p95-ns/STW") b.ReportMetric(float64(pauses\[len(pauses)\*50/100\]), "p50-ns/STW") }
我在 mac 上跑了一下:
go test -v -run=none -bench=BenchmarkPoolSTW
獲得輸出:
goos: darwin goarch: amd64 pkg: sync BenchmarkPoolSTW-12 361 3708 ns/op 3583 p50-ns/STW 5008 p95-ns/STW PASS ok sync 1.481s
最後一個 BenchmarkPoolExpensiveNew
測試當 New 的代價很高時,Pool 的表現。也能夠加入我的的代碼庫。
標準庫中 encoding/json
也用到了 sync.Pool 來提高性能。著名的 gin
框架,對 context 取用也到了 sync.Pool
。
來看下 gin
如何使用 sync.Pool。設置 New 函數:
engine.pool.New = func() interface{} { return engine.allocateContext() } func (engine \*Engine) allocateContext() \*Context { return &Context{engine: engine, KeysMutex: &sync.RWMutex{}} }
使用:
// ServeHTTP conforms to the http.Handler interface. func (engine \*Engine) ServeHTTP(w http.ResponseWriter, req \*http.Request) { c := engine.pool.Get().(\*Context) c.writermem.reset(w) c.Request = req c.reset() engine.handleHTTPRequest(c) engine.pool.Put(c) }
先調用 Get 取出來緩存的對象,而後會作一些 reset 操做,再執行 handleHTTPRequest
,最後再 Put 回 Pool。
另外,Echo 框架也使⽤了 sync.Pool
來管理 context
,而且⼏乎達到了零堆內存分配:
It leverages sync pool to reuse memory and achieve zero dynamic memory allocation with no GC overhead.
首先來看 Pool 的結構體:
type Pool struct { noCopy noCopy // 每一個 P 的本地隊列,實際類型爲 \[P\]poolLocal local unsafe.Pointer // local fixed-size per-P pool, actual type is \[P\]poolLocal // \[P\]poolLocal的大小 localSize uintptr // size of the local array victim unsafe.Pointer // local from previous cycle victimSize uintptr // size of victims array // 自定義的對象建立回調函數,當 pool 中無可用對象時會調用此函數 New func() interface{} }
由於 Pool 不但願被複制,因此結構體裏有一個 noCopy 的字段,使用 go vet
工具能夠檢測到用戶代碼是否複製了 Pool。
noCopy
是 go1.7 開始引入的一個靜態檢查機制。它不只僅工做在運行時或標準庫,同時也對用戶代碼有效。用戶只需實現這樣的不消耗內存、僅用於靜態分析的結構,來保證一個對象在第一次使用後不會發生複製。
實現很是簡單:
// noCopy 用於嵌入一個結構體中來保證其第一次使用後不會被複制 // // 見 https://golang.org/issues/8005#issuecomment-190753527 type noCopy struct{} // Lock 是一個空操做用來給 \`go ve\` 的 -copylocks 靜態分析 func (\*noCopy) Lock() {} func (\*noCopy) Unlock() {}
local
字段存儲指向 [P]poolLocal
數組(嚴格來講,它是一個切片)的指針,localSize
則表示 local 數組的大小。訪問時,P 的 id 對應 [P]poolLocal
下標索引。經過這樣的設計,多個 goroutine 使用同一個 Pool 時,減小了競爭,提高了性能。
在一輪 GC 到來時,victim 和 victimSize 會分別「接管」 local 和 localSize。victim
的機制用於減小 GC 後冷啓動致使的性能抖動,讓分配對象更平滑。
Victim Cache 原本是計算機架構裏面的一個概念,是 CPU 硬件處理緩存的一種技術,
sync.Pool
引入的意圖在於下降 GC 壓力的同時提升命中率。
當 Pool 沒有緩存的對象時,調用 New
方法生成一個新的對象。
type poolLocal struct { poolLocalInternal // 將 poolLocal 補齊至兩個緩存行的倍數,防止 false sharing, // 每一個緩存行具備 64 bytes,即 512 bit // 目前咱們的處理器通常擁有 32 \* 1024 / 64 = 512 條緩存行 // 僞共享,僅佔位用,防止在 cache line 上分配多個 poolLocalInternal pad \[128 - unsafe.Sizeof(poolLocalInternal{})%128\]byte } // Local per-P Pool appendix. type poolLocalInternal struct { // P 的私有緩存區,使用時無須要加鎖 private interface{} // 公共緩存區。本地 P 能夠 pushHead/popHead;其餘 P 則只能 popTail shared poolChain }
字段 pad
主要是防止 false sharing
,董大的《什麼是 cpu cache》裏講得比較好:
現代 cpu 中,cache 都劃分紅以 cache line (cache block) 爲單位,在 x86\_64 體系下通常都是 64 字節,cache line 是操做的最小單元。
程序即便只想讀內存中的 1 個字節數據,也要同時把附近 63 節字加載到 cache 中,若是讀取超個 64 字節,那麼就要加載到多個 cache line 中。
簡單來講,若是沒有 pad 字段,那麼當須要訪問 0 號索引的 poolLocal 時,CPU 同時會把 0 號和 1 號索引同時加載到 cpu cache。在只修改 0 號索引的狀況下,會讓 1 號索引的 poolLocal 失效。這樣,當其餘線程想要讀取 1 號索引時,發生 cache miss,還得從新再加載,對性能有損。增長一個 pad
,補齊緩存行,讓相關的字段能獨立地加載到緩存行就不會出現 false sharding
了。
poolChain
是一個雙端隊列的實現:
type poolChain struct { // 只有生產者會 push to,不用加鎖 head \*poolChainElt // 讀寫須要原子控制。pop from tail \*poolChainElt } type poolChainElt struct { poolDequeue // next 被 producer 寫,consumer 讀。因此只會從 nil 變成 non-nil // prev 被 consumer 寫,producer 讀。因此只會從 non-nil 變成 nil next, prev \*poolChainElt } type poolDequeue struct { // The head index is stored in the most-significant bits so // that we can atomically add to it and the overflow is // harmless. // headTail 包含一個 32 位的 head 和一個 32 位的 tail 指針。這兩個值都和 len(vals)-1 取模過。 // tail 是隊列中最老的數據,head 指向下一個將要填充的 slot // slots 的有效範圍是 \[tail, head),由 consumers 持有。 headTail uint64 // vals 是一個存儲 interface{} 的環形隊列,它的 size 必須是 2 的冪 // 若是 slot 爲空,則 vals\[i\].typ 爲空;不然,非空。 // 一個 slot 在這時宣告無效:tail 不指向它了,vals\[i\].typ 爲 nil // 由 consumer 設置成 nil,由 producer 讀 vals \[\]eface }
poolDequeue
被實現爲單生產者、多消費者的固定大小的無鎖(atomic 實現) Ring 式隊列(底層存儲使用數組,使用兩個指針標記 head、tail)。生產者能夠從 head 插入、head 刪除,而消費者僅可從 tail 刪除。
headTail
指向隊列的頭和尾,經過位運算將 head 和 tail 存入 headTail 變量中。
咱們用一幅圖來完整地描述 Pool 結構體:
Pool 結構體
結合木白的技術私廚的《請問sync.Pool有什麼缺點?》裏的一張圖,對於雙端隊列的理解會更容易一些:
Pool 結構體
咱們看到 Pool 並無直接使用 poolDequeue,緣由是它的大小是固定的,而 Pool 的大小是沒有限制的。所以,在 poolDequeue 之上包裝了一下,變成了一個 poolChainElt
的雙向鏈表,能夠動態增加。
直接上源碼:
func (p \*Pool) Get() interface{} { // ...... l, pid := p.pin() x := l.private l.private = nil if x == nil { x, \_ = l.shared.popHead() if x == nil { x = p.getSlow(pid) } } runtime\_procUnpin() // ...... if x == nil && p.New != nil { x = p.New() } return x }
省略號的內容是 race
相關的,屬於閱讀源碼過程當中的一些噪音,暫時註釋掉。這樣,Get 的整個過程就很是清晰了:
p.pin()
函數將當前的 goroutine 和 P 綁定,禁止被搶佔,返回當前 P 對應的 poolLocal,以及 pid。runtime_procUnpin()
解除非搶佔。我用一張流程圖來展現整個過程:
Get 流程圖
總體流程梳理完了,咱們再來看一下其中的一些關鍵函數。
先來看 Pool.pin()
:
// src/sync/pool.go // 調用方必須在完成取值後調用 runtime\_procUnpin() 來取消搶佔。 func (p \*Pool) pin() (\*poolLocal, int) { pid := runtime\_procPin() s := atomic.LoadUintptr(&p.localSize) // load-acquire l := p.local // load-consume // 由於可能存在動態的 P(運行時調整 P 的個數) if uintptr(pid) < s { return indexLocal(l, pid), pid } return p.pinSlow() }
pin
的做用就是將當前 groutine 和 P 綁定在一塊兒,禁止搶佔。而且返回對應的 poolLocal 以及 P 的 id。
若是 G 被搶佔,則 G 的狀態從 running 變成 runnable,會被放回 P 的 localq 或 globaq,等待下一次調度。下次再執行時,就不必定是和如今的 P 相結合了。由於以後會用到 pid,若是被搶佔了,有可能接下來使用的 pid 與所綁定的 P 並不是同一個。
「綁定」的任務最終交給了 procPin
:
// src/runtime/proc.go func procPin() int { \_g\_ := getg() mp := \_g\_.m mp.locks++ return int(mp.p.ptr().id) }
實現的代碼很簡潔:將當前 goroutine 綁定的 m 上的一個鎖字段 locks 值加 1,即完成了「綁定」。關於 pin 的原理,能夠參考《golang的對象池sync.pool源碼解讀》,文章詳細分析了爲何執行 procPin
以後,不可搶佔,且 GC 不會清掃 Pool 裏的對象。
咱們再回到 p.pin()
,原子操做取出 p.localSize
和 p.local
,若是當前 pid
小於 p.localSize
,則直接取 poolLocal 數組中的 pid 索引處的元素。不然,說明 Pool 尚未建立 poolLocal,調用 p.pinSlow()
完成建立工做。
func (p \*Pool) pinSlow() (\*poolLocal, int) { // Retry under the mutex. // Can not lock the mutex while pinned. runtime\_procUnpin() allPoolsMu.Lock() defer allPoolsMu.Unlock() pid := runtime\_procPin() // poolCleanup won't be called while we are pinned. // 沒有使用原子操做,由於已經加了全局鎖了 s := p.localSize l := p.local // 由於 pinSlow 中途可能已經被其餘的線程調用,所以這時候須要再次對 pid 進行檢查。若是 pid 在 p.local 大小範圍內,則不用建立 poolLocal 切片,直接返回。 if uintptr(pid) < s { return indexLocal(l, pid), pid } if p.local == nil { allPools = append(allPools, p) } // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one. // 當前 P 的數量 size := runtime.GOMAXPROCS(0) local := make(\[\]poolLocal, size) // 舊的 local 會被回收 atomic.StorePointer(&p.local, unsafe.Pointer(&local\[0\])) // store-release atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release return &local\[pid\], pid }
由於要上一把大鎖 allPoolsMu
,因此函數名帶有 slow
。咱們知道,鎖粒度越大,競爭越多,天然就越「slow」。不過要想上鎖的話,得先解除「綁定」,鎖上以後,再執行「綁定」。緣由是鎖越大,被阻塞的機率就越大,若是還佔着 P,那就浪費資源。
在解除綁定後,pinSlow 可能被其餘的線程調用過了,p.local 可能會發生變化。所以這時候須要再次對 pid 進行檢查。若是 pid 在 p.localSize 大小範圍內,則不用再建立 poolLocal 切片,直接返回。
以後,根據 P 的個數,使用 make 建立切片,包含 runtime.GOMAXPROCS(0)
個 poolLocal,而且使用原子操做設置 p.local 和 p.localSize。
最後,返回 p.local 對應 pid 索引處的元素。
關於這把大鎖 allPoolsMu
,曹大在《幾個 Go 系統可能遇到的鎖問題》裏講了一個例子。第三方庫用了 sync.Pool
,內部有一個結構體 fasttemplate.Template
,包含 sync.Pool
字段。而 rd 在使用時,每一個請求都會新建這樣一個結構體。因而,處理每一個請求時,都會嘗試從一個空的 Pool 裏取緩存的對象,最後 goroutine 都阻塞在了這把大鎖上,由於都在嘗試執行:allPools = append(allPools, p)
,從而形成性能問題。
回到 Get 函數,再來看另外一個關鍵的函數:poolChain.popHead()
:
func (c \*poolChain) popHead() (interface{}, bool) { d := c.head for d != nil { if val, ok := d.popHead(); ok { return val, ok } // There may still be unconsumed elements in the // previous dequeue, so try backing up. d = loadPoolChainElt(&d.prev) } return nil, false }
popHead
函數只會被 producer 調用。首先拿到頭節點:c.head,若是頭節點不爲空的話,嘗試調用頭節點的 popHead 方法。注意這兩個 popHead 方法實際上並不相同,一個是 poolChain
的,一個是 poolDequeue
的,有疑惑的,不妨回頭再看一下 Pool 結構體的圖。咱們來看 poolDequeue.popHead()
:
// /usr/local/go/src/sync/poolqueue.go func (d \*poolDequeue) popHead() (interface{}, bool) { var slot \*eface for { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) // 判斷隊列是否爲空 if tail == head { // Queue is empty. return nil, false } // head 位置是隊頭的前一個位置,因此此處要先退一位。 // 在讀出 slot 的 value 以前就把 head 值減 1,取消對這個 slot 的控制 head-- ptrs2 := d.pack(head, tail) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { // We successfully took back slot. slot = &d.vals\[head&uint32(len(d.vals)-1)\] break } } // 取出 val val := \*(\*interface{})(unsafe.Pointer(slot)) if val == dequeueNil(nil) { val = nil } // 重置 slot,typ 和 val 均爲 nil // 這裏清空的方式與 popTail 不一樣,與 pushHead 沒有競爭關係,因此不用過小心 \*slot = eface{} return val, true }
此函數會刪掉而且返回 queue
的頭節點。但若是 queue
爲空的話,返回 false。這裏的 queue
存儲的實際上就是 Pool 裏緩存的對象。
整個函數的核心是一個無限循環,這是 Go 中經常使用的無鎖化編程形式。
首先調用 unpack
函數分離出 head 和 tail 指針,若是 head 和 tail 相等,即首尾相等,那麼這個隊列就是空的,直接就返回 nil,false
。
不然,將 head 指針後移一位,即 head 值減 1,而後調用 pack
打包 head 和 tail 指針。使用 atomic.CompareAndSwapUint64
比較 headTail 在這之間是否有變化,若是沒變化,至關於獲取到了這把鎖,那就更新 headTail 的值。而且把 vals 相應索引處的元素賦值給 slot。
由於 vals
長度實際是隻能是 2 的 n 次冪,所以 len(d.vals)-1
實際上獲得的值的低 n 位是全 1,它再與 head 相與,實際就是取 head 低 n 位的值。
獲得相應 slot 的元素後,通過類型轉換並判斷是不是 dequeueNil
,若是是,說明沒取到緩存的對象,返回 nil。
// /usr/local/go/src/sync/poolqueue.go // 由於使用 nil 表明空的 slots,所以使用 dequeueNil 表示 interface{}(nil) type dequeueNil \*struct{}
最後,返回 val 以前,將 slot 「歸零」:*slot = eface{}
。
回到 poolChain.popHead()
,調用 poolDequeue.popHead()
拿到緩存的對象後,直接返回。不然,將 d
從新指向 d.prev
,繼續嘗試獲取緩存的對象。
若是在 shared 裏沒有獲取到緩存對象,則繼續調用 Pool.getSlow()
,嘗試從其餘 P 的 poolLocal 偷取:
func (p \*Pool) getSlow(pid int) interface{} { // See the comment in pin regarding ordering of the loads. size := atomic.LoadUintptr(&p.localSize) // load-acquire locals := p.local // load-consume // Try to steal one element from other procs. // 從其餘 P 中竊取對象 for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i+1)%int(size)) if x, \_ := l.shared.popTail(); x != nil { return x } } // 嘗試從victim cache中取對象。這發生在嘗試從其餘 P 的 poolLocal 偷去失敗後, // 由於這樣可使 victim 中的對象更容易被回收。 size = atomic.LoadUintptr(&p.victimSize) if uintptr(pid) >= size { return nil } locals = p.victim l := indexLocal(locals, pid) if x := l.private; x != nil { l.private = nil return x } for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i)%int(size)) if x, \_ := l.shared.popTail(); x != nil { return x } } // 清空 victim cache。下次就不用再從這裏找了 atomic.StoreUintptr(&p.victimSize, 0) return nil }
從索引爲 pid+1 的 poolLocal 處開始,嘗試調用 shared.popTail()
獲取緩存對象。若是沒有拿到,則從 victim 裏找,和 poolLocal 的邏輯相似。
最後,實在沒找到,就把 victimSize 置 0,防止後來的「人」再到 victim 裏找。
在 Get 函數的最後,通過這一番操做仍是沒找到緩存的對象,就調用 New 函數建立一個新的對象。
最後,還剩一個 popTail 函數:
func (c \*poolChain) popTail() (interface{}, bool) { d := loadPoolChainElt(&c.tail) if d == nil { return nil, false } for { d2 := loadPoolChainElt(&d.next) if val, ok := d.popTail(); ok { return val, ok } if d2 == nil { // 雙向鏈表只有一個尾節點,如今爲空 return nil, false } // 雙向鏈表的尾節點裏的雙端隊列被「掏空」,因此繼續看下一個節點。 // 而且因爲尾節點已經被「掏空」,因此要甩掉它。這樣,下次 popHead 就不會再看它有沒有緩存對象了。 if atomic.CompareAndSwapPointer((\*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { // 甩掉尾節點 storePoolChainElt(&d2.prev, nil) } d = d2 } }
在 for
循環的一開始,就把 d.next 加載到了 d2。由於 d 可能會短暫爲空,但若是 d2 在 pop 或者 pop fails 以前就不爲空的話,說明 d 就會永久爲空了。在這種狀況下,能夠安全地將 d 這個結點「甩掉」。
最後,將 c.tail
更新爲 d2
,能夠防止下次 popTail
的時候查看一個空的 dequeue
;而將 d2.prev
設置爲 nil
,能夠防止下次 popHead
時查看一個空的 dequeue
。
咱們再看一下核心的 poolDequeue.popTail
:
// src/sync/poolqueue.go:147 func (d \*poolDequeue) popTail() (interface{}, bool) { var slot \*eface for { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) // 判斷隊列是否空 if tail == head { // Queue is empty. return nil, false } // 先搞定 head 和 tail 指針位置。若是搞定,那麼這個 slot 就歸屬咱們了 ptrs2 := d.pack(head, tail+1) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { // Success. slot = &d.vals\[tail&uint32(len(d.vals)-1)\] break } } // We now own slot. val := \*(\*interface{})(unsafe.Pointer(slot)) if val == dequeueNil(nil) { val = nil } slot.val = nil atomic.StorePointer(&slot.typ, nil) // At this point pushHead owns the slot. return val, true }
popTail
從隊列尾部移除一個元素,若是隊列爲空,返回 false。此函數可能同時被多個消費者
調用。
函數的核心是一個無限循環,又是一個無鎖編程。先解出 head,tail 指針值,若是二者相等,說明隊列爲空。
由於要從尾部移除一個元素,因此 tail 指針前進 1,而後使用原子操做設置 headTail。
最後,將要移除的 slot 的 val 和 typ 「歸零」:
slot.val = nil atomic.StorePointer(&slot.typ, nil)
// src/sync/pool.go // Put 將對象添加到 Pool func (p \*Pool) Put(x interface{}) { if x == nil { return } // …… l, \_ := p.pin() if l.private == nil { l.private = x x = nil } if x != nil { l.shared.pushHead(x) } runtime\_procUnpin() //…… }
一樣刪掉了 race 相關的函數,看起來清爽多了。整個 Put 的邏輯也很清晰:
pushHead
方法嘗試將其放入 shared 字段所維護的雙端隊列中。一樣用流程圖來展現整個過程:
Put 流程圖
咱們來看 pushHead
的源碼,比較清晰:
// src/sync/poolqueue.go func (c \*poolChain) pushHead(val interface{}) { d := c.head if d == nil { // poolDequeue 初始長度爲8 const initSize = 8 // Must be a power of 2 d = new(poolChainElt) d.vals = make(\[\]eface, initSize) c.head = d storePoolChainElt(&c.tail, d) } if d.pushHead(val) { return } // 前一個 poolDequeue 長度的 2 倍 newSize := len(d.vals) \* 2 if newSize >= dequeueLimit { // Can't make it any bigger. newSize = dequeueLimit } // 首尾相連,構成鏈表 d2 := &poolChainElt{prev: d} d2.vals = make(\[\]eface, newSize) c.head = d2 storePoolChainElt(&d.next, d2) d2.pushHead(val) }
若是 c.head
爲空,就要建立一個 poolChainElt,做爲首結點,固然也是尾節點。它管理的雙端隊列的長度,初始爲 8,放滿以後,再建立一個 poolChainElt 節點時,雙端隊列的長度就要翻倍。固然,有一個最大長度限制(2^30):
const dequeueBits = 32 const dequeueLimit = (1 << dequeueBits) / 4
調用 poolDequeue.pushHead
嘗試將對象放到 poolDeque 裏去:
// src/sync/poolqueue.go // 將 val 添加到雙端隊列頭部。若是隊列已滿,則返回 false。此函數只能被一個生產者調用 func (d \*poolDequeue) pushHead(val interface{}) bool { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { // 隊列滿了 return false } slot := &d.vals\[head&uint32(len(d.vals)-1)\] // 檢測這個 slot 是否被 popTail 釋放 typ := atomic.LoadPointer(&slot.typ) if typ != nil { // 另外一個 groutine 正在 popTail 這個 slot,說明隊列仍然是滿的 return false } // The head slot is free, so we own it. if val == nil { val = dequeueNil(nil) } // slot佔位,將val存入vals中 \*(\*interface{})(unsafe.Pointer(slot)) = val // head 增長 1 atomic.AddUint64(&d.headTail, 1<<dequeueBits) return true }
首先判斷隊列是否已滿:
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { // Queue is full. return false }
也就是將尾部指針加上 d.vals
的長度,再取低 31 位,看它是否和 head 相等。咱們知道,d.vals
的長度其實是固定的,所以若是隊列已滿,那麼 if 語句的兩邊就是相等的。若是隊列滿了,直接返回 false。
不然,隊列沒滿,經過 head 指針找到即將填充的 slot 位置:取 head 指針的低 31 位。
// Check if the head slot has been released by popTail. typ := atomic.LoadPointer(&slot.typ) if typ != nil { // Another goroutine is still cleaning up the tail, so // the queue is actually still full. // popTail 是先設置 val,再將 typ 設置爲 nil。設置完 typ 以後,popHead 才能夠操做這個 slot return false }
上面這一段用來判斷是否和 popTail 有衝突發生,若是有,則直接返回 false。
最後,將 val 賦值到 slot,並將 head 指針值加 1。
// slot佔位,將val存入vals中 \*(\*interface{})(unsafe.Pointer(slot)) = val
這裏的實現比較巧妙,slot 是 eface 類型,將 slot 轉爲 interface{} 類型,這樣 val 能以 interface{} 賦值給 slot 讓 slot.typ 和 slot.val 指向其內存塊,因而 slot.typ 和 slot.val 均不爲空。
最後咱們再來看一下 pack 和 unpack 函數,它們其實是一組綁定、解綁 head 和 tail 指針的兩個函數。
// src/sync/poolqueue.go const dequeueBits = 32 func (d \*poolDequeue) pack(head, tail uint32) uint64 { const mask = 1<<dequeueBits - 1 return (uint64(head) << dequeueBits) | uint64(tail&mask) }
mask
的低 31 位爲全 1,其餘位爲 0,它和 tail 相與,就是隻看 tail 的低 31 位。而 head 向左移 32 位以後,低 32 位爲全 0。最後把兩部分「或」起來,head 和 tail 就「綁定」在一塊兒了。
相應的解綁函數:
func (d \*poolDequeue) unpack(ptrs uint64) (head, tail uint32) { const mask = 1<<dequeueBits - 1 head = uint32((ptrs >> dequeueBits) & mask) tail = uint32(ptrs & mask) return }
取出 head 指針的方法就是將 ptrs 右移 32 位,再與 mask 相與,一樣只看 head 的低 31 位。而 tail 實際上更簡單,直接將 ptrs 與 mask 相與就能夠了。
對於 Pool 而言,並不能無限擴展,不然對象佔用內存太多了,會引發內存溢出。
幾乎全部的池技術中,都會在某個時刻清空或清除部分緩存對象,那麼在 Go 中什麼時候清理未使用的對象呢?
答案是 GC 發生時。
在 pool.go 文件的 init 函數裏,註冊了 GC 發生時,如何清理 Pool 的函數:
// src/sync/pool.go func init() { runtime\_registerPoolCleanup(poolCleanup) }
編譯器在背後作了一些動做:
// src/runtime/mgc.go // Hooks for other packages var poolcleanup func() // 利用編譯器標誌將 sync 包中的清理註冊到運行時 //go:linkname sync\_runtime\_registerPoolCleanup sync.runtime\_registerPoolCleanup func sync\_runtime\_registerPoolCleanup(f func()) { poolcleanup = f }
具體來看下:
func poolCleanup() { for \_, p := range oldPools { p.victim = nil p.victimSize = 0 } // Move primary cache to victim cache. for \_, p := range allPools { p.victim = p.local p.victimSize = p.localSize p.local = nil p.localSize = 0 } oldPools, allPools = allPools, nil }
poolCleanup
會在 STW 階段被調用。總體看起來,比較簡潔。主要是將 local 和 victim 做交換,這樣也就不致於讓 GC 把全部的 Pool 都清空了,有 victim 在「兜底」。
若是
sync.Pool
的獲取、釋放速度穩定,那麼就不會有新的池對象進行分配。若是獲取的速度降低了,那麼對象可能會在兩個GC
週期內被釋放,而不是之前的一個GC
週期。鳥窩的【Go 1.13中 sync.Pool 是如何優化的?】講了 1.13 中的優化。
參考資料【理解 Go 1.13 中 sync.Pool 的設計與實現】 手動模擬了一下調用 poolCleanup
函數先後 oldPools,allPools,p.vitcim 的變化過程,很精彩:
- 初始狀態下,oldPools 和 allPools 均爲 nil。
我根據這個流程畫了一張圖,能夠理解地更清晰一些:
poolCleanup 過程
須要指出的是,allPools
和 oldPools
都是切片,切片的元素是指向 Pool 的指針,Get/Put 操做不須要經過它們。在第 6 步,若是還有其餘 Pool 執行了 Put 操做,allPools
這時就會有多個元素。
在 Go 1.13 以前的實現中,poolCleanup
比較「簡單粗暴」:
func poolCleanup() { for i, p := range allPools { allPools\[i\] = nil for i := 0; i < int(p.localSize); i++ { l := indexLocal(p.local, i) l.private = nil for j := range l.shared { l.shared\[j\] = nil } l.shared = nil } p.local = nil p.localSize = 0 } allPools = \[\]\*Pool{} }
直接清空了全部 Pool 的 p.local
和 poolLocal.shared
。
經過二者的對比發現,新版的實現相比 Go 1.13 以前,GC 的粒度拉大了,因爲實際回收的時間線拉長,單位時間內 GC 的開銷減少。
由此基本明白 p.victim 的做用。它的定位是次級緩存,GC 時將對象放入其中,下一次 GC 來臨以前若是有 Get 調用則會從 p.victim 中取,直到再一次 GC 來臨時回收。
同時因爲從 p.victim 中取出對象使用完畢以後並未放回 p.victim 中,在必定程度也減少了下一次 GC 的開銷。原來 1 次 GC 的開銷被拉長到 2 次且會有必定程度的開銷減少,這就是 p.victim 引入的意圖。
【理解 Go 1.13 中 sync.Pool 的設計與實現】 這篇文章最後還總結了 sync.Pool
的設計理念,包括:無鎖、操做對象隔離、原子操做代替鎖、行爲隔離——鏈表、Victim Cache 下降 GC 開銷。寫得很是不錯,推薦閱讀。
另外,關於 sync.Pool
中鎖競爭優化的文章,推薦閱讀芮大神的【優化鎖競爭】。
本文先是介紹了 Pool 是什麼,有什麼做用,接着給出了 Pool 的用法以及在標準庫、一些第三方庫中的用法,還介紹了 pool\_test 中的一些測試用例。最後,詳細解讀了 sync.Pool
的源碼。
本文的結尾部分,再來詳細地總結一下關於 sync.Pool
的要點:
sync.Pool
是協程安全的,使用起來很是方便。設置好 New 函數後,調用 Get 獲取,調用 Put 歸還對象。gin
,Echo
等框架也都使用了 sync.Pool。procPin
將 G 和 P 綁定,防止 G 被搶佔。在綁按期間,GC 沒法清理緩存的對象。victim
機制前,sync.Pool 裏對象的最⼤緩存時間是一個 GC 週期,當 GC 開始時,沒有被引⽤的對象都會被清理掉;加入 victim
機制後,最大緩存時間爲兩個 GC 週期。sync.Pool
引入的意圖在於下降 GC 壓力的同時提升命中率。sync.Pool
的最底層使用切片加鏈表來實現雙端隊列,並將緩存的對象存儲在切片中。【歐神 源碼分析】https://changkun.us/archives/...
【Go 夜讀】https://reading.hidevops.io/r...
【夜讀第 14 期視頻】https://www.youtube.com/watch...
【源碼分析,僞共享】https://juejin.im/post/5d4087...
【golang的對象池sync.pool源碼解讀】https://zhuanlan.zhihu.com/p/...
【理解 Go 1.13 中 sync.Pool 的設計與實現】https://zhuanlan.zhihu.com/p/...
【優缺點,圖】http://cbsheng.github.io/post...
【xiaorui 優化鎖競爭】http://xiaorui.cc/archives/5878
【性能優化之路,自定義多種規格的緩存】https://blog.cyeam.com/golang...
【sync.Pool 有什麼缺點】https://mp.weixin.qq.com/s?\_\_biz=MzA4ODg0NDkzOA==&mid=2247487149&idx=1&sn=f38f2d72fd7112e19e97d5a2cd304430&source=41#wechat\_redirect
【1.12 和 1.13 的演變】https://github.com/watermelo/...\_pool\_understand.md
【董澤潤 演進】https://www.jianshu.com/p/2e0...
【noCopy】https://github.com/golang/go/...
【董澤潤 cpu cache】https://www.jianshu.com/p/dc4...
【gomemcache 例子】https://docs.kilvn.com/The-Go...
【鳥窩 1.13 優化】https://colobu.com/2019/10/08...
【A journey with go】https://medium.com/a-journey-...
【封裝了一個計數組件】https://www.akshaydeo.com/blo...
【僞共享】http://ifeve.com/falsesharing/
推薦閱讀
喜歡本文的朋友,歡迎關注「Go語言中文網」:
Go語言中文網啓用微信學習交流羣,歡迎加微信:274768166,投稿亦歡迎