Golang Sync.Pool淺析

sync pool使用來存放臨時變量的一個緩衝區,可是這個緩衝區並不可靠,每次gc的時候,都會首先清除緩衝區,因此,假如一個slice僅僅存放在 Pool 中,而沒有其餘地方引用,則會被當成垃圾清理掉。golang

概念

A Pool is a set of temporary objects that may be individually saved and retrieved.

Any item stored in the Pool may be removed automatically at any time without notification. If the Pool holds the only reference when this happens, the item might be deallocated.數組

A Pool is safe for use by multiple goroutines simultaneously.安全

Pool's purpose is to cache allocated but unused items for later reuse, relieving pressure on the garbage collector. That is, it makes it easy to build efficient, thread-safe free lists. However, it is not suitable for all free lists.app

An appropriate use of a Pool is to manage a group of temporary items silently shared among and potentially reused by concurrent independent clients of a package. Pool provides a way to amortize allocation overhead across many clients.dom

An example of good use of a Pool is in the fmt package, which maintains a dynamically-sized store of temporary output buffers. The store scales under load (when many goroutines are actively printing) and shrinks when quiescent.ide

圖示

sync.pool的結構組成如上圖所示,在這裏可能會有兩個問題函數

  1. 咱們實例化 Sync.Pool的時候,爲何實例化了一個LocalPool數組,怎麼肯定個人數據應該存儲在LocalPool數組的哪一個單元?
  2. PoolLocalInternal 裏面的成員有private和shared,爲何要作這兩種區分?

源碼分析

Put

Put()

func (p *Pool) Put(x interface{}) {
    if x == nil {
        return
    }
    // race檢測 先忽略這一塊
    if race.Enabled {
        if fastrand()%4 == 0 {
            // Randomly drop x on floor.
            return
        }
        race.ReleaseMerge(poolRaceAddr(x))
        race.Disable()
    }
    
    // 根據自身的goroutine的id,獲取對應的PoolLocal的地址,後面具體分析
    l := p.pin()
    // 若是private字段爲空的話,首先給private字段賦值
    if l.private == nil {
        l.private = x
        x = nil
    }
    runtime_procUnpin()
    // 若是private字段, 則添加到shared字段,由於 shared字段能夠被其餘goroutine獲取,因此這裏須要加鎖
    if x != nil {
        l.Lock()
        l.shared = append(l.shared, x)
        l.Unlock()
    }
    if race.Enabled {
        race.Enable()
    }
}

pin()

func (p *Pool) pin() *poolLocal {
   // 獲取當前的Pid/P,數量由
    pid := runtime_procPin()
    // LocalPool的數量
    s := atomic.LoadUintptr(&p.localSize) // load-acquire
    l := p.local                          // load-consume
    // 若是獲取到的pid比LocalPool數組的長度小,返回對應的LocalPool
    if uintptr(pid) < s {
        return indexLocal(l, pid)
    }
    // 若是pid比LocalPool數組的長度大,進一步確認,這個函數後面討論
    return p.pinSlow()
}

runtime_procPin() 這個是獲取當前運行的pid,具體實現沒有查到, 可是runtime_procPin()返回的數值範圍是由 runtime.GOMAXPROCS(0) 決定的。網上有篇文章能夠參考一下《Golang 的 協程調度機制 與 GOMAXPROCS 性能調優》,這裏暫不深刻源碼分析

PinSlow()

func (p *Pool) pinSlow() *poolLocal {
    // Retry under the mutex.
    // Can not lock the mutex while pinned.
    runtime_procUnpin()
    allPoolsMu.Lock()
    defer allPoolsMu.Unlock()
    pid := runtime_procPin()
    // poolCleanup won't be called while we are pinned.
    // 再次檢查是否LocalPool是否有對應的索引,避免其餘的線程形成影響
    s := p.localSize
    l := p.local
    if uintptr(pid) < s {
        return indexLocal(l, pid)
    }
    // 若是local爲nil,說明是新構建的Pool結構體,加緊allPools slice裏面
    if p.local == nil {
        allPools = append(allPools, p)
    }
    // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
    // 從新獲取 GOMAXPROCS,並根據這個設置PoolLocal的大小
    size := runtime.GOMAXPROCS(0)
    local := make([]poolLocal, size)
    atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
    atomic.StoreUintptr(&p.localSize, uintptr(size))         // store-release
    // 找到當前goroutine對應的地址,並返回
    return &local[pid]
}

Put 邏輯

綜上,Put的基本操做邏輯就是post

  • 獲取當前執行的Pid
  • 根據Pid,找到對應的PoolLocal,接着使用裏面PoolLocalInternal
  • 優先存入 PoolLocalInternalprivate屬性,其次粗如 PoolLocalInternalshared 這個slice裏面

Get

Get()

func (p *Pool) Get() interface{} {
    if race.Enabled {
        race.Disable()
    }
    
    // 獲取到LocalPool
    l := p.pin()
    
    // 把private數據值拷貝一份,而後把private設置爲nil,由於若是private有數據,把private數據返回後,要把private設置爲nil,若是private沒有數據,則原先就是nil,添加這一步也沒有關係
    x := l.private
    l.private = nil
    runtime_procUnpin()
    // 若是private裏面沒有數據,則從shared裏面去找
    if x == nil {
        l.Lock()
        last := len(l.shared) - 1
        if last >= 0 {
            x = l.shared[last]
            l.shared = l.shared[:last]
        }
        l.Unlock()
        // 若是當前線程下對應的LocalPool,沒有數據,則調用getSlow(),從其餘的LocalPool的shared裏面獲取數據,後面解析 getSlow
        if x == nil {
            x = p.getSlow()
        }
    }
    if race.Enabled {
        race.Enable()
        if x != nil {
            race.Acquire(poolRaceAddr(x))
        }
    }
    // 若是 從private shared及其餘的LocalPool的shared裏面都獲取不到數據,且註冊的New函數不爲空,則執行註冊的New函數
    if x == nil && p.New != nil {
        x = p.New()
    }
    return x
}

getSlow()

func (p *Pool) getSlow() (x interface{}) {
    // See the comment in pin regarding ordering of the loads.
    // 獲取LocalPool的size
    size := atomic.LoadUintptr(&p.localSize) // load-acquire
    local := p.local                         // load-consume
    // Try to steal one element from other procs.
    pid := runtime_procPin()
    runtime_procUnpin()
    // 便利LocalPool,獲取shared裏面的數據,找到就返回
    for i := 0; i < int(size); i++ {
        l := indexLocal(local, (pid+i+1)%int(size))
        l.Lock()
        last := len(l.shared) - 1
        if last >= 0 {
            x = l.shared[last]
            l.shared = l.shared[:last]
            l.Unlock()
            break
        }
        l.Unlock()
    }
    return x
}

經過上面的邏輯能夠看出,shared 裏面的數據是會被其餘的P檢索到的,而 private裏面的數據是不會的,因此在獲取shared裏面數據的時候,須要加鎖性能

poolCleanup

這個函數是Pool包裏面提供的,用來清理Pool的,可是官方的實現略顯粗暴

func poolCleanup() {
    // This function is called with the world stopped, at the beginning of a garbage collection.
    // It must not allocate and probably should not call any runtime functions.
    // Defensively zero out everything, 2 reasons:
    // 1. To prevent false retention of whole Pools.
    // 2. If GC happens while a goroutine works with l.shared in Put/Get,
    //    it will retain whole Pool. So next cycle memory consumption would be doubled.
    // 便利全部的Sync.Pool
    for i, p := range allPools {
        allPools[i] = nil
        // 遍歷Pool裏面的LocalPool,並清空裏面的數據
        for i := 0; i < int(p.localSize); i++ {
            l := indexLocal(p.local, i)
            l.private = nil
            for j := range l.shared {
                l.shared[j] = nil
            }
            l.shared = nil
        }
        p.local = nil
        p.localSize = 0
    }
    // 清空allPools
    allPools = []*Pool{}
}

這個函數會在GC以前調用,這也就解釋了官方的下面一句話

Any item stored in the Pool may be removed automatically at any time without
notification. If the Pool holds the only reference when this happens, the
item might be deallocated.

若是一個數據僅僅在Pool中有引用,那麼就須要擔憂這個數據被GC清理掉

問題分析

針對於上面提出的兩個問題,作一下簡單的分析

咱們實例化 Sync.Pool的時候,爲何實例化了一個LocalPool數組,怎麼肯定個人數據應該存儲在LocalPool數組的哪一個單元?

這裏的LocalPool是根據不一樣的pid來區分的,保證private數據的線程安全,程序運行的時候能夠獲取到pid,而後使用pid做爲LocalPool的索引,找到對應的地址便可

PoolLocalInternal 裏面的成員有private和shared,爲何要作這兩種區分?

private 是 P 專屬的, shared是能夠被其餘的P獲取到的

參考文檔

《sync.Pool源碼實現》《Go 語言學習筆記 - 雨痕》

相關文章
相關標籤/搜索