理解Go 1.13中sync.Pool的設計與實現

Go 1.13版本中有幾個比較大的修改,其中之一是sync.Pool修改了部分實現,減少某些極端狀況下的性能開銷。文中內容來源於筆者讀完sync.Pool源代碼的思考和總結,內容以Go 1.13中的實現爲準,少許內容涉及到Go 1.13以前,若有誤區請讀者多多指教。編程

概念

在本文內容開始以前須要理解幾個在Go runtime中的概念,以便於更好的理解sync.Pool中一些實現。數組

goroutine搶佔

Go中調度器是GMP模型,簡單理解G就是goroutine;M能夠類比內核線程,是執行G的地方;P是調度G以及爲G的執行準備所需資源。通常狀況下,P的數量CPU的可用核心數,也可由runtime.GOMAXPROCS指定。本文的重點並不是goroutine調度器,在此不作詳細解釋,感興趣能夠翻閱延伸閱讀的文章。 緩存

Go有這樣的調度規則:某個G不能一直佔用M,在某個時刻的時候,runtime(參見sysmon)會判斷當前M是否能夠被搶佔,即M上正在執行的G讓出。P在合理的時刻將G調度到合理的M上執行,在runtime裏面,每一個P維護一個本地存放待執行G的隊列localq,同時還存在一個全局的待執行G的隊列globalq;調度就是P從localq或globalq中取出G到對應的M上執行,所謂搶佔,runtime將G搶佔移出運行狀態,拷貝G的執行棧放入待執行隊列中,多是某個P的localq,也多是globalq,等待下一次調度,所以當被搶佔的G重回待執行隊列時有可能此時的P與前一次運行的P並不是同一個。 數據結構

所謂禁止搶佔,即當前執行G不容許被搶佔調度,直到禁止搶佔標記解除。Go runtime實現了G的禁止搶佔與解除禁止搶佔。架構

func runtime_procPin() int
禁止搶佔,標記當前G在M上不會被搶佔,並返回當前所在P的ID。併發

func runtime_procUnpin()
解除G的禁止搶佔狀態,以後G可被搶佔。app

數據結構

poolDequeue

type poolDequeue struct {
    headTail uint64
    vals []eface
}

type eface struct {
    typ, val unsafe.Pointer
}

poolDequeue被實現爲單生產者多消費者的固定大小的無鎖Ring式隊列。生產者能夠從head插入head刪除,而消費者僅可從tail刪除。headTail指向了隊列的頭和尾,經過位運算將head和tail位置存入headTail變量中。編程語言

func (d *poolDequeue) pushHead(val interface{}) bool {
    ptrs := atomic.LoadUint64(&d.headTail)
    head, tail := d.unpack(ptrs)
    // Ring式隊列,頭尾相等則隊列已滿
    if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
        return false
    }
    slot := &d.vals[head&uint32(len(d.vals)-1)]
    // 原子操做拿到slot.typ
    typ := atomic.LoadPointer(&slot.typ)
    if typ != nil {
        return false
    }

    if val == nil {
        val = dequeueNil(nil)
    }
    // slot佔位,將val存入vals中
    *(*interface{})(unsafe.Pointer(slot)) = val
    // 更改隊列指向頭
    atomic.AddUint64(&d.headTail, 1<<dequeueBits)
    return true
}
  1. 在頭入時,先判斷隊列是否已滿,判斷條件:head == tail
  2. 從隊列中取到head位置的slot,根據slot.typ判斷當前slot是否已被存放數據,注意這裏使用了atomic.LoadPointer取代鎖操做。
  3. val賦值給slot,這裏實現的比較巧妙,sloteface類型,將slot轉爲interface{}類型,這樣val能以interface{}賦值給slotslot.typslot.val指向其內存塊,這樣slot.typslot.val均不爲空,這也就是第2點條件判斷的來由。插入成功後head加1,指向隊頭的前一個空位,因爲插入刪除都涉及到對headTail的修改,此處使用原子操做取代鎖。
func (d *poolDequeue) popHead() (interface{}, bool) {
    var slot *eface
    for {
        ptrs := atomic.LoadUint64(&d.headTail)
        head, tail := d.unpack(ptrs)
        // 判斷隊列是否爲空
        if tail == head {
            return nil, false
        }

        // head位置是隊頭的前一個位置,因此此處要先退一位
        head--
        ptrs2 := d.pack(head, tail)
        if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
            slot = &d.vals[head&uint32(len(d.vals)-1)]
            break
        }
    }

    // 取出val
    val := *(*interface{})(unsafe.Pointer(slot))
    if val == dequeueNil(nil) {
        val = nil
    }

    // 重置slot,typ和val均爲nil
    *slot = eface{}
    return val, true
}

popHead的代碼比較簡單,流程上無非就是判斷隊列是否空,拿出slot轉換爲interface{}而後重置slotpopHeadpushHead有一點須要注意,在popHead中,是先修改了headTail,而後再取slot,而在pushHead中是先插入數據,而後再修改headTail,至於爲何這裏先留一個疑問,後面將會詳細解釋。函數

func (d *poolDequeue) popTail() (interface{}, bool) {
    var slot *eface
    for {
        ptrs := atomic.LoadUint64(&d.headTail)
        head, tail := d.unpack(ptrs)
        // 判斷隊列是否空
        if tail == head {
            return nil, false
        }

        ptrs2 := d.pack(head, tail+1)
        if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
            slot = &d.vals[tail&uint32(len(d.vals)-1)]
            break
        }
    }
    // 取出val
    val := *(*interface{})(unsafe.Pointer(slot))
    if val == dequeueNil(nil) {
        val = nil
    }

    // 重置slot.typ
    slot.val = nil
    atomic.StorePointer(&slot.typ, nil)
    return val, true
}

popTail的代碼相似popHead,只是刪除對象從隊首變成隊尾,注意結尾部分代碼,使用了atomic.StorePointer取代鎖操做。性能

poolChain

poolDequeue被實現爲Ring式隊列,而poolChain則是基於poolDequeue實現爲雙向鏈表。

type poolChain struct {
    head *poolChainElt
    tail *poolChainElt
}

type poolChainElt struct {
    poolDequeue
    next, prev *poolChainElt
}

同理,poolChain也實現了pushHeadpopHeadpopTail

func (c *poolChain) pushHead(val interface{}) {
    d := c.head
    if d == nil {
        // poolDequeue初始長度爲8
        const initSize = 8 
        d = new(poolChainElt)
        d.vals = make([]eface, initSize)
        c.head = d
        storePoolChainElt(&c.tail, d)
    }

    if d.pushHead(val) {
        return
    }
    // 前一個poolDequeue長度的2倍
    newSize := len(d.vals) * 2
    if newSize >= dequeueLimit {
        newSize = dequeueLimit
    }
    // 首尾相連,構成鏈表
    d2 := &poolChainElt{prev: d}
    d2.vals = make([]eface, newSize)
    c.head = d2
    storePoolChainElt(&d.next, d2)
    d2.pushHead(val)
}

到這裏大概就明白了,poolDequeue是在poolChainpushHead中建立的,且每次建立的長度都是前一個poolDequeue長度的2倍,初始長度爲8。

func (c *poolChain) popHead() (interface{}, bool) {
    d := c.head
    for d != nil {
        if val, ok := d.popHead(); ok {
            return val, ok
        }

        d = loadPoolChainElt(&d.prev)
    }
    return nil, false
}

popHead以隊首向隊尾爲方向遍歷鏈表,對每一個poolDequeue執行popHead嘗試取出存放的對象。

func (c *poolChain) popTail() (interface{}, bool) {
    d := loadPoolChainElt(&c.tail)
    if d == nil {
        return nil, false
    }

    for {
        d2 := loadPoolChainElt(&d.next)
        if val, ok := d.popTail(); ok {
            return val, ok
        }

        if d2 == nil {
            return nil, false
        }

        if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
            storePoolChainElt(&d2.prev, nil)
        }
        d = d2
    }
}

popTail以隊尾向隊隊首爲方向遍歷鏈表,對每一個poolDequeue執行popTail嘗試從尾部取出存放的對象。

宏觀上來看,poolChain的結構以下圖:
pool.png

Pool的源代碼實現

poolChain爲sync.Pool的底層數據結構,接下來一覽Pool的實現。

type Pool struct {
    // ...
    // 每一個P的本地隊列,實際類型爲[P]poolLocal
    local     unsafe.Pointer
    // [P]poolLocal的大小,<= P
    localSize uintptr        

    victim     unsafe.Pointer
    victimSize uintptr        

    // 自定義的對象建立回調,當pool中無可用對象時會調用此函數
    New func() interface{}
}
type poolLocalInternal struct {
    // 每一個P的私有共享,使用時無需加鎖
    private interface{}
    // 對象列表,本地P能夠pushHead/popHead,其餘P僅能popTail
    shared  poolChain
}

type poolLocal struct {
    poolLocalInternal
    // 僞共享,僅佔位用,防止在cache line上分配多個poolLocalInternal
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

接下來看看Pool的具體實現。

Pool.Get

func (p *Pool) pin() (*poolLocal, int) {
    pid := runtime_procPin()
    s := atomic.LoadUintptr(&p.localSize)
    l := p.local                          
    if uintptr(pid) < s {
        return indexLocal(l, pid), pid
    }
    return p.pinSlow()
}

pin首先標記了當前G禁止搶佔,在runtime_procUnpin以前,當前G和P不會被搶佔。此處之因此標記禁止搶佔是由於下文中有使用到P ID,若是被搶佔了,有可能接下里使用的P ID與所綁定的P並不是同一個。

在得到P ID以後,當P ID小於p.local數組長度時在p.local數組裏找到P對應的poolLocal對象,不然進入pinSlow函數建立新的poolLocal

func (p *Pool) pinSlow() (*poolLocal, int) {
    runtime_procUnpin()
    allPoolsMu.Lock()
    defer allPoolsMu.Unlock()
    pid := runtime_procPin()

    s := p.localSize
    l := p.local
    if uintptr(pid) < s {
        return indexLocal(l, pid), pid
    }
    if p.local == nil {
        allPools = append(allPools, p)
    }
    // 當前P的數量
    size := runtime.GOMAXPROCS(0)
    local := make([]poolLocal, size)
    atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) 
    atomic.StoreUintptr(&p.localSize, uintptr(size))         
    return &local[pid], pid
}

pinSlow的實現比較簡單,即當前P ID在Pool中沒有對應的poolLocal對象時,則建立一個新的poolLocal對象,舊的poolLocal將會進入GC。仔細觀察pinSlow函數發現先執行了runtime_procUnpin,隨後有執行了runtime_procPin,後者的目的在於獲取最新的P ID,這裏的意圖有點難以理解,在仔細閱讀和理解以後,發現目的在於儘可能減小[]poolLocal的建立次數,由於pinSlow以前和pinSlow裏面可能會由於解除禁止搶佔而致使綁定的P不一致,萬一新綁定的P裏存在可用的poolLocal呢。

func (p *Pool) Get() interface{} {
    // ...
    l, pid := p.pin()
    x := l.private
    l.private = nil
    if x == nil {
        x, _ = l.shared.popHead()
        if x == nil {
            x = p.getSlow(pid)
        }
    }
    // 解除禁止搶佔
    runtime_procUnpin()
    // ...
    if x == nil && p.New != nil {
        x = p.New()
    }
    return x
}
  1. 根據當前P取得對應的poolLocal和P ID。
  2. 若當前poolLocal.private不爲空,則表示可複用此對象;若爲空,則在poolLocal.shared隊列中獲取對象。
  3. poolLocal.shared無可用對象,則進入getSlow獲取對象。
  4. 若未能從其餘P成功竊取對象,則調用自定義的對象建立函數,若是該函數不爲nil
func (p *Pool) getSlow(pid int) interface{} {
    size := atomic.LoadUintptr(&p.localSize) 
    locals := p.local                        
    // 從其餘P中竊取對象
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i+1)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }

    // 嘗試從victim cache中取對象
    size = atomic.LoadUintptr(&p.victimSize)
    if uintptr(pid) >= size {
        return nil
    }
    locals = p.victim
    l := indexLocal(locals, pid)
    if x := l.private; x != nil {
        l.private = nil
        return x
    }
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }

    // 清空victim cache
    atomic.StoreUintptr(&p.victimSize, 0)
    return nil
}

進入getSlow的前提是當前P本地無可用對象,因而轉頭去其餘P的裏竊取對象,getSlow就是作這件事情。

  1. 遍歷Pool.local中的其餘P,確認其餘P的shared裏是否有可用對象,若是有,則從鏈尾取出。
  2. 若是其餘P中也沒有可用對象,則嘗試從victim cache中取可用對象,至於victim cache本文下半部分將會作詳細解釋。

縱觀整個Get過程會發現,從當前P的poolLocal中取對象時使用的時popHead,而從其餘P的poolLocal中竊取對象時使用的時popTail,再回到上文中對poolChain的定義,能夠知道,當前P對本地poolLocal是生產者,對其餘P的poolLocal而言是消費者

再次回到poolDequeuepoolChain上。咱們知道某一時刻P只會調度一個G,那麼對於生產者而言,調用pushHeadpopHead並不須要加鎖,由於當前P操做的是本地poolLocal;當消費者是其餘P,在進行popTail操做時,則會對pushHead以及popHead造成競爭關係,對這種問題,poolDequeue的實現直指要害。

首先注意eface這個結構,若插入成功eface下的兩個字段會指向要緩存對象的內存地址,在pushHead中使用了原子操做判斷typ字段是否爲nil,存在這樣一種可能性:pushHead所取到的slot正在popTail裏準備重置,這種狀況下pushHead會直接返回失敗。

回到競爭問題上,pushHead的流程能夠簡化爲先取slot,再判斷是否可插入最後修改headTail,而popTail的流程能夠簡化爲先修改headTail再取slot而後重置slotpushHead修改head位置,popTail修改tail位置,因此對於headTail字段使用原子操做避免便可讀寫衝突。

疑問是爲什麼popTail中須要先修改headTail呢,由於存在其餘P都會到當前P上竊取對象,當多個P都調用本地P的popTail時,競爭現象就會更加明顯,因此此時應儘早修改headTail,一旦某個P竊取到了其餘P就沒法再竊取此對象。

Pool.Put

func (p *Pool) Put(x interface{}) {
    if x == nil {
        return
    }
    // ...
    l, _ := p.pin()
    if l.private == nil {
        l.private = x
        x = nil
    }
    if x != nil {
        l.shared.pushHead(x)
    }
    runtime_procUnpin()
    // ...
}

Put的實現比較簡單,優先將對象存入private,若private已存在則放入shared鏈表中,pin中會標記禁止搶佔,所以須要在pin結束以及Put邏輯結束後取消禁止搶佔。

Victim Cache

Victim Cache本是計算機架構裏面的一個概念,是CPU硬件處理緩存的一種技術,sync.Pool引入的意圖在於下降GC壓力同時提升命中率,本文並不須要詳解Victim Cache的原理,分析sync.Pool便可明白其意圖。對於Pool而言有一點須要明白,這個Pool並不是是無限制擴展的,不然會引發內存溢出。幾乎全部的池技術中,都會在某個時刻清空或清除部分緩存對象,那麼在Go中什麼時候清理未使用的對象呢?

在Pool.Get函數中,取不到對象時會嘗試從p.victim中取,用完後放回當前P的本地隊列,而p.vcitim是什麼被建立的呢?是在poolCleanup函數中,該函數會在GC時被調用到,在init函數裏註冊。

func poolCleanup() {
    // 1
    for _, p := range oldPools {
        p.victim = nil
        p.victimSize = 0
    }
    // 2
    for _, p := range allPools {
        p.victim = p.local
        p.victimSize = p.localSize
        p.local = nil
        p.localSize = 0
    }

    oldPools, allPools = allPools, nil
}

poolCleanup會在STW階段被調用,函數實現雖然看起來簡單,但其意圖較爲複雜,那麼該如何解釋呢?

嘗試模擬一下實際狀況:

  1. 初始狀態下,oldPoolsallPools均爲nil
  2. 第1次調用Get,因爲p.localnil,將會在pinSlow中建立p.local,而後將p放入allPools,此時allPools長度爲1,oldPoolsnil
  3. 對象使用完畢,第1次調用Put放回對象
  4. 第1次GC STW階段,allPools中全部p.local將值賦值給victim並置爲nil,最後allPoolsniloldPools長度爲1
  5. 第2次調用Get,因爲p.localnil,此時會從p.victim裏面嘗試取對象
  6. 對象使用完畢,第2次調用Put放回對象,但因爲p.localnil,從新建立p.local,並將對象放回,此時allPools長度爲1,oldPools長度爲1
  7. 第2次GC STW階段,oldPools中全部p.victimnil,前一次的cache在本次GC時被回收,allPools全部p.local將值賦值給victim並置爲nil,最後allPools爲nil,oldPools長度爲1

再來看看Go 1.13之前poolCleanup的實現:

func poolCleanup() {
    for i, p := range allPools {
        allPools[i] = nil
        for i := 0; i < int(p.localSize); i++ {
            l := indexLocal(p.local, i)
            l.private = nil
            for j := range l.shared {
                l.shared[j] = nil
            }
            l.shared = nil
        }
        p.local = nil
        p.localSize = 0
    }
    allPools = []*Pool{}
}

Go 1.13之前poolCleanup的實現簡單粗暴,每次GC STW階段遍歷allPools,清空p.localpoolLocal.shared

經過二者的對比發現,新版的實現相比Go 1.13以前,GC的粒度拉大了,因爲實際回收的時間線拉長,單位時間內GC的開銷減少。

由此基本明白p.victim的做用,它的定位是次級緩存,GC時將對象放入其中,下一次GC來臨以前若是有Get調用則會從p.victim中取,直到再一次GC來臨作回收,同時因爲從p.victim中取出對象使用完畢以後並未放回p.victim中,在必定程度也減少了下一次GC的開銷。原來1次GC的開銷被拉長到2次且會有必定程度的開銷減少,這就是p.victim引入的意圖。

關於Victim Cache更多的信息能夠在延伸閱讀中找到。

sync.Pool的設計理念

無鎖

無鎖編程是不少編程語言裏逃離不了的話題。sync.Pool的無鎖是在poolDequeuepoolChain層面實現的。

操做對象隔離

縱觀整個sync.Pool的實現,明確了生產者(本地P)訪問head,消費者(其餘P)訪問tail,從P的角度切入操做方向,實現了目標操做對象層面的「解耦」,大部分時候二者的操做互不影響。圖文示意以下:
poolLocal.png

原子操做代替鎖

poolDequeue對一些關鍵變量採用了CAS操做,好比poolDequeue.headTail,既可完整保證併發又能下降相比鎖而言的開銷。

行爲隔離——鏈表

這點與「操做對象隔離」是相輔相成的,一旦設計目標爲儘可能減小對同一對象的操做鎖,就須要對行爲進行隔離,鏈表能很好的知足這個設計目標:特定的P訪問特定的位置。從整個過程來看,鏈表是減小鎖的高效數據結構。

Victim Cache下降GC開銷

GC的開銷已經足夠足夠小了,但仍不可避免。對於sync.Pool而言,避免極端狀況GC的開銷也是重點之一,因此Go 1.13的sync.Pool引入了Victim Cache機制,有效拉長真正回收的時間線,從而減少單次GC的開銷。

延伸閱讀

  • [High Performance Cache Architecture Using Victim

Cache](https://www.ijedr.org/papers/...

相關文章
相關標籤/搜索