轉載請聲明出處哦~,本篇文章發佈於luozhiyun的博客:https://www.luozhiyun.comgit
本文使用的go的源碼時14.4github
總所周知Go 是一個自動垃圾回收的編程語言,採用三色併發標記算法標記對象並回收。若是你想使用 Go 開發一個高性能的應用程序的話,就必須考慮垃圾回收給性能帶來的影響。由於Go 在垃圾回收的時候會有一個STW(stop-the-world,程序暫停)的時間,而且若是對象太多,作標記也須要時間。golang
因此若是採用對象池來建立對象,增長對象的重複利用率,使用的時候就沒必要在堆上從新建立對象能夠節省開銷。算法
在Go中,sync.Pool提供了對象池的功能。它對外提供了三個方法:New、Get 和 Put。下面用一個簡短的例子來講明一下Pool使用:數據庫
var pool *sync.Pool type Person struct { Name string } func init() { pool = &sync.Pool{ New: func() interface{}{ fmt.Println("creating a new person") return new(Person) }, } } func main() { person := pool.Get().(*Person) fmt.Println("Get Pool Object:", person) person.Name = "first" pool.Put(person) fmt.Println("Get Pool Object:",pool.Get().(*Person)) fmt.Println("Get Pool Object:",pool.Get().(*Person)) }
結果:編程
creating a new person Get Pool Object: &{} Get Pool Object: &{first} creating a new person Get Pool Object: &{}
這裏我用了init方法初始化了一個pool,而後get了三次,put了一次到pool中,若是pool中沒有對象,那麼會調用New函數建立一個新的對象,不然會重put進去的對象中獲取。數組
type Pool struct { noCopy noCopy local unsafe.Pointer localSize uintptr victim unsafe.Pointer victimSize uintptr New func() interface{} }
Pool結構體裏面noCopy表明這個結構體是禁止拷貝的,它能夠在咱們使用 go vet
工具的時候生效;緩存
local是一個poolLocal數組的指針,localSize表明這個數組的大小;一樣victim也是一個poolLocal數組的指針,每次垃圾回收的時候,Pool 會把 victim 中的對象移除,而後把 local 的數據給 victim;local和victim的邏輯咱們下面會詳細介紹到。服務器
New函數是在建立pool的時候設置的,當pool沒有緩存對象的時候,會調用New方法生成一個新的對象。數據結構
下面咱們對照着pool的結構圖往下講,避免找不到北:
type poolLocal struct { poolLocalInternal pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte }
local字段存儲的是一個poolLocal數組的指針,poolLocal數組大小是goroutine中P的數量,訪問時,P的id對應poolLocal數組下標索引,因此Pool的最大個數runtime.GOMAXPROCS(0)。
經過這樣的設計,每一個P都有了本身的本地空間,多個 goroutine 使用同一個 Pool 時,減小了競爭,提高了性能。若是對goroutine的P、G、M有疑惑的同窗不妨看看這篇文章:The Go scheduler。
poolLocal裏面有一個pad數組用來佔位用,防止在 cache line 上分配多個 poolLocalInternal從而形成false sharing,有關於false sharing能夠看看這篇文章:
What’s false sharing and how to solve it ,文中對於false sharing的定義:
That’s what false sharing is: one core update a variable would force other cores to update cache either.
type poolLocalInternal struct { private interface{} // Can be used only by the respective P. shared poolChain // Local P can pushHead/popHead; any P can popTail. }
poolLocalInternal包含兩個字段private和shared。
private表明緩存的一個元素,只能由相應的一個 P 存取。由於一個 P 同時只能執行一個 goroutine,因此不會有併發的問題;
shared則能夠由任意的 P 訪問,可是隻有本地的 P 才能 pushHead/popHead,其它 P 能夠 popTail。
type poolChain struct { head *poolChainElt tail *poolChainElt } type poolChainElt struct { poolDequeue next, prev *poolChainElt } type poolDequeue struct { headTail uint64 vals []eface }
poolChain是一個雙端隊列,裏面的head和tail分別指向隊列頭尾;poolDequeue裏面存放真正的數據,是一個單生產者、多消費者的固定大小的無鎖的環狀隊列,headTail是環狀隊列的首位位置的指針,能夠經過位運算解析出首尾的位置,生產者能夠從 head 插入、head 刪除,而消費者僅可從 tail 刪除。
這個雙端隊列的模型大概是這個樣子:
poolDequeue裏面的環狀隊列大小是固定的,後面分析源碼咱們會看到,當環狀隊列滿了的時候會建立一個size是原來兩倍大小的環狀隊列。你們這張圖好好體會一下,會反覆用到。
func (p *Pool) Get() interface{} { ... //1.把當前goroutine綁定在當前的P上 l, pid := p.pin() //2.優先從local的private中獲取 x := l.private l.private = nil if x == nil { //3,private沒有,那麼從shared的頭部獲取 x, _ = l.shared.popHead() //4. 若是都沒有,那麼去別的local上去偷一個 if x == nil { x = p.getSlow(pid) } } //解除搶佔 runtime_procUnpin() ... //5. 若是沒有獲取到,嘗試使用New函數生成一個新的 if x == nil && p.New != nil { x = p.New() } return x }
這一段代碼首先會將當前goroutine綁定在當前的P上返回對應的local,而後嘗試從local的private中獲取,而後須要把private字段置空,由於已經拿到了想要的對象;
private中獲取不到,那麼就去shared的頭部獲取;
shared也沒有,那麼嘗試遍歷全部的 local,嘗試從它們的 shared 彈出一個元素;
最後若是仍是沒有,那麼就直接調用預先設置好的 New 函數,建立一個出來。
func (p *Pool) pin() (*poolLocal, int) { pid := runtime_procPin() s := atomic.LoadUintptr(&p.localSize) // load-acquire l := p.local // load-consume if uintptr(pid) < s { return indexLocal(l, pid), pid } return p.pinSlow() }
pin方法裏面首先會調用runtime_procPin方法會先獲取當前goroutine,而後綁定到對應的M上,而後返回M目前綁定的P的id,由於這個pid後面會用到,防止在使用途中P被搶佔,具體的細節能夠看這篇:https://zhuanlan.zhihu.com/p/99710992。
接下來會使用原子操做取出localSize,若是當前pid大於localSize,那麼就表示Pool還沒建立對應的poolLocal,那麼調用pinSlow進行建立工做,不然調用indexLocal取出pid對應的poolLocal返回。
func indexLocal(l unsafe.Pointer, i int) *poolLocal { lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{})) return (*poolLocal)(lp) }
indexLocal裏面是使用了地址操做,傳入的i是數組的index值,因此須要獲取poolLocal{}的size作一下地址的位移操做,而後再轉成轉成poolLocal地址返回。
func (p *Pool) pinSlow() (*poolLocal, int) { // 解除pin runtime_procUnpin() // 加上全局鎖 allPoolsMu.Lock() defer allPoolsMu.Unlock() // pin住 pid := runtime_procPin() s := p.localSize l := p.local // 從新對pid進行檢查 if uintptr(pid) < s { return indexLocal(l, pid), pid } // 初始化local前會將pool放入到allPools數組中 if p.local == nil { allPools = append(allPools, p) } // 當前P的數量 size := runtime.GOMAXPROCS(0) local := make([]poolLocal, size) atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) atomic.StoreUintptr(&p.localSize, uintptr(size)) return &local[pid], pid }
由於allPoolsMu是一個全局Mutex鎖,所以上鎖會比較慢可能被阻塞,因此上鎖前調用runtime_procUnpin方法解除pin的操做;
在解除綁定後,pinSlow 可能被其餘的線程調用過了,p.local 可能會發生變化。所以這時候須要再次對 pid 進行檢查。
最後初始化local,並使用原子操做對local和localSize設值,返回當前P對應的local。
到這裏pin方法終於講完了。畫一個簡單的圖描述一下這整個流程:
下面咱們再回到Get方法中往下走,代碼我再貼一遍,以便閱讀:
func (p *Pool) Get() interface{} { ... //2.優先從local的private中獲取 x := l.private l.private = nil if x == nil { //3,private沒有,那麼從shared的頭部獲取 x, _ = l.shared.popHead() //4. 若是都沒有,那麼去別的local上去偷一個 if x == nil { x = p.getSlow(pid) } } ... return x }
若是private中沒有值,那麼會調用shared的popHead方法獲取值。
func (c *poolChain) popHead() (interface{}, bool) { // 這裏頭部是一個poolChainElt d := c.head // 遍歷poolChain鏈表 for d != nil { // 從poolChainElt的環狀列表中獲取值 if val, ok := d.popHead(); ok { return val, ok } // load poolChain下一個對象 d = loadPoolChainElt(&d.prev) } return nil, false }
popHead方法裏面會獲取到poolChain的頭結點,不記得poolChain數據結構的同窗建議往上面翻一下再回來。
接着有個for循環會挨個從poolChain的頭結點往下遍歷,直到獲取對象返回。
func (d *poolDequeue) popHead() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) // headTail的高32位爲head,低32位爲tail head, tail := d.unpack(ptrs) // 首尾相等,那麼這個隊列就是空的 if tail == head { return nil, false } // 這裏須要head--以後再獲取slot head-- ptrs2 := d.pack(head, tail) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { slot = &d.vals[head&uint32(len(d.vals)-1)] break } } val := *(*interface{})(unsafe.Pointer(slot)) // 說明沒取到緩存的對象,返回 nil if val == dequeueNil(nil) { val = nil } // 重置slot *slot = eface{} return val, true }
poolDequeue的popHead方法首先會獲取到headTail的值,而後調用unpack解包,headTail是一個64位的值,高32位表示head,低32位表示tail。
判斷head和tail是否相等,相等那麼這個隊列就是空的;
若是隊列不是空的,那麼將head減一以後再使用,由於head當前指的位置是空值,表示下一個新對象存放的位置;
CAS從新設值新的headTail,成功以後獲取slot,這裏由於vals大小是2的n 次冪,所以len(d.vals)-1)
以後低n位全是1,和head取與以後能夠獲取到head的低n位的值;
若是slot所對應的對象是dequeueNil,那麼表示是空值,直接返回,不然將slot指針對應位置的值置空,返回val。
若是shared的popHead方法也沒獲取到值,那麼就須要調用getSlow方法獲取了。
func (p *Pool) getSlow(pid int) interface{} { size := atomic.LoadUintptr(&p.localSize) // load-acquire locals := p.local // load-consume // 遍歷locals列表,從其餘的local的shared列表尾部獲取對象 for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i+1)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } size = atomic.LoadUintptr(&p.victimSize) if uintptr(pid) >= size { return nil } locals = p.victim l := indexLocal(locals, pid) // victim的private不爲空則返回 if x := l.private; x != nil { l.private = nil return x } // 遍歷victim對應的locals列表,從其餘的local的shared列表尾部獲取對象 for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } // 獲取不到,將victimSize置爲0 atomic.StoreUintptr(&p.victimSize, 0) return nil }
getSlow方法會遍歷locals列表,這裏須要注意的是,遍歷是從索引爲 pid+1 的 poolLocal 處開始,嘗試調用shared的popTail方法獲取對象;若是沒有拿到,則從 victim 裏找。若是都沒找到,那麼就將victimSize置爲0,下次就不找victim了。
func (c *poolChain) popTail() (interface{}, bool) { d := loadPoolChainElt(&c.tail) // 若是最後一個節點是空的,那麼直接返回 if d == nil { return nil, false } for { // 這裏獲取的是next節點,與通常的雙向鏈表是相反的 d2 := loadPoolChainElt(&d.next) // 獲取尾部對象 if val, ok := d.popTail(); ok { return val, ok } if d2 == nil { return nil, false } // 由於d已經沒有數據了,因此重置tail爲d2,並刪除d2的上一個節點 if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { storePoolChainElt(&d2.prev, nil) } d = d2 } }
func (d *poolDequeue) popTail() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) // 和pophead同樣,將headTail解包 head, tail := d.unpack(ptrs) // 首位相等,表示列表中沒有數據,返回 if tail == head { return nil, false } ptrs2 := d.pack(head, tail+1) // CAS重置tail位置 if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { // 獲取tail位置對象 slot = &d.vals[tail&uint32(len(d.vals)-1)] break } } val := *(*interface{})(unsafe.Pointer(slot)) // 判斷對象是否是爲空 if val == dequeueNil(nil) { val = nil } // 將slot置空 slot.val = nil atomic.StorePointer(&slot.typ, nil) return val, true }
若是看懂了popHead,那麼這個popTail方法是和它很是的相近的。
popTail簡單來講也是從隊列尾部移除一個元素,若是隊列爲空,返回 false。可是須要注意的是,這個popTail可能會被多個消費者調用,因此須要循環CAS獲取對象;在poolDequeue環狀列表中tail是有數據的,沒必要像popHead中head--
。
最後,須要將slot置空。
你們能夠再對照一下圖回顧一下代碼:
func (p *Pool) Put(x interface{}) { if x == nil { return } ... l, _ := p.pin() if l.private == nil { l.private = x x = nil } if x != nil { l.shared.pushHead(x) } runtime_procUnpin() ... }
看完了Get方法,看Put方法就容易多了。一樣Put方法首先會去Pin住當前goroutine和P,而後嘗試將 x 賦值給 private 字段。若是private不爲空,那麼就調用pushHead將其放入到shared隊列中。
func (c *poolChain) pushHead(val interface{}) { d := c.head // 頭節點沒有初始化,那麼設值一下 if d == nil { const initSize = 8 // Must be a power of 2 d = new(poolChainElt) d.vals = make([]eface, initSize) c.head = d storePoolChainElt(&c.tail, d) } // 將對象加入到環狀隊列中 if d.pushHead(val) { return } newSize := len(d.vals) * 2 // 這裏作了限制,單個環狀隊列不能超過2的30次方大小 if newSize >= dequeueLimit { newSize = dequeueLimit } // 初始化新的環狀列表,大小是d的兩倍 d2 := &poolChainElt{prev: d} d2.vals = make([]eface, newSize) c.head = d2 storePoolChainElt(&d.next, d2) // push到新的隊列中 d2.pushHead(val) }
若是頭節點爲空,那麼須要建立一個新的poolChainElt對象做爲頭節點,大小爲8;而後調用pushHead放入到環狀隊列中;
若是放置失敗,那麼建立一個 poolChainElt 節點,而且雙端隊列的長度翻倍,固然長度也不能超過dequeueLimit,即2的30次方;
而後將新的節點d2和d互相綁定一下,並將d2設值爲頭節點,將傳入的對象push到d2中;
func (d *poolDequeue) pushHead(val interface{}) bool { ptrs := atomic.LoadUint64(&d.headTail) // 解包headTail head, tail := d.unpack(ptrs) // 判斷隊列是否已滿 if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { return false } // 找到head的槽位 slot := &d.vals[head&uint32(len(d.vals)-1)] // 檢查slot是否和popTail有衝突 typ := atomic.LoadPointer(&slot.typ) if typ != nil { return false } if val == nil { val = dequeueNil(nil) } // 將 val 賦值到 slot,並將 head 指針值加 1 *(*interface{})(unsafe.Pointer(slot)) = val atomic.AddUint64(&d.headTail, 1<<dequeueBits) return true }
首先經過位運算判斷隊列是否已滿,也就是將尾部指針加上 len(d.vals)
,由於head指向的是將要被填充的位置,因此head和tail位置是相隔len(d.vals)
,而後再取低 31 位,看它是否和 head 相等。若是隊列滿了,直接返回 false;
而後找到找到head的槽位slot,並判斷typ是否爲空,由於popTail 是先設置 val,再將 typ 設置爲 nil,因此若是有衝突,那麼直接返回;
最後設值slot,並將head加1返回;
在pool.go文件的 init 函數裏,註冊了 GC 發生時,如何清理 Pool 的函數:
func init() { runtime_registerPoolCleanup(poolCleanup) } func poolCleanup() { for _, p := range oldPools { p.victim = nil p.victimSize = 0 } for _, p := range allPools { p.victim = p.local p.victimSize = p.localSize p.local = nil p.localSize = 0 } oldPools, allPools = allPools, nil }
poolCleanup
會在 STW 階段被調用。主要是將 local 和 victim 做交換,那麼不至於GC 把全部的 Pool 都清空了,而是須要兩個 GC
週期纔會被釋放。若是 sync.Pool
的獲取、釋放速度穩定,那麼就不會有新的池對象進行分配。
Pool這個概念在後臺優化中是一個很是重要的手段,好比說在使用Http的時候會使用Http鏈接池,使用數據庫的時候,也會用到數據庫鏈接池。這些經過對象重用和預先分配能夠減小服務器的壓力。
當咱們在後期的項目開發中,若是發現GC耗時很高,有大量臨時對象時不妨能夠考慮使用Pool。
例如發現現系統中的 goroutine 數量很是多,因爲一個goroutine初始棧是2048字節,因此一個服務器上運行數十萬的goroutine 也是很是耗時的;這時候就能夠考慮使用Worker Pool 來減小 goroutine 的使用。