Golang 鏈接池的幾種實現案例

由於TCP的三隻握手等等緣由,創建一個鏈接是一件成本比較高的行爲。因此在一個須要屢次與特定實體交互的程序中,就須要維持一個鏈接池,裏面有能夠複用的鏈接可供重複使用。mysql

而維持一個鏈接池,最基本的要求就是要作到:thread safe(線程安全),尤爲是在Golang這種特性是goroutine的語言中。git

做者:Xiao淩求個好運氣
來源:掘金
原文連接: https://juejin.im/post/5e58e3...

實現簡單的鏈接池

type Pool struct {
    m sync.Mutex // 保證多個goroutine訪問時候,closed的線程安全
    res chan io.Closer //鏈接存儲的chan
    factory func() (io.Closer,error) //新建鏈接的工廠方法
    closed bool //鏈接池關閉標誌
}

這個簡單的鏈接池,咱們利用chan來存儲池裏的鏈接。而新建結構體的方法也比較簡單:github

func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
    if size <= 0 {
        return nil, errors.New("size的值過小了。")
    }
    return &Pool{
        factory: fn,
        res:     make(chan io.Closer, size),
    }, nil
}

只須要提供對應的工廠函數和鏈接池的大小就能夠了。redis

獲取鏈接

那麼咱們要怎麼從中獲取資源呢?由於咱們內部存儲鏈接的結構是chan,因此只須要簡單的select就能夠保證線程安全:sql

//從資源池裏獲取一個資源
func (p *Pool) Acquire() (io.Closer,error) {
    select {
    case r,ok := <-p.res:
        log.Println("Acquire:共享資源")
        if !ok {
            return nil,ErrPoolClosed
        }
        return r,nil
    default:
        log.Println("Acquire:新生成資源")
        return p.factory()
    }
}

咱們先從鏈接池的res這個chan裏面獲取,若是沒有的話咱們就利用咱們早已經準備好的工廠函數進行構造鏈接。同時咱們在從res獲取鏈接的時候利用ok先肯定了這個鏈接池是否已經關閉。若是已經關閉的話咱們就返回早已經準備好的鏈接已關閉錯誤。數據庫

關閉鏈接池

那麼既然提到關閉鏈接池,咱們是怎麼樣關閉鏈接池的呢?安全

//關閉資源池,釋放資源
func (p *Pool) Close() {
    p.m.Lock()
    defer p.m.Unlock()

    if p.closed {
        return
    }

    p.closed = true

    //關閉通道,不讓寫入了
    close(p.res)

    //關閉通道里的資源
    for r:=range p.res {
        r.Close()
    }
}

這邊咱們須要先進行p.m.Lock()*上鎖操做,這麼作是由於咱們須要對結構體裏面的*closed進行讀寫。須要先把這個標誌位設定後,關閉res這個chan,使得Acquire方法沒法再獲取新的鏈接。咱們再對res這個chan裏面的鏈接進行Close操做。session

釋放鏈接

釋放鏈接首先得有個前提,就是鏈接池尚未關閉。若是鏈接池已經關閉再往res裏面送鏈接的話就好觸發panic。併發

func (p *Pool) Release(r io.Closer){
    //保證該操做和Close方法的操做是安全的
    p.m.Lock()
    defer p.m.Unlock()

    //資源池都關閉了,就省這一個沒有釋放的資源了,釋放便可
    if p.closed {
        r.Close()
        return
    }

    select {
    case p.res <- r:
        log.Println("資源釋放到池子裏了")
    default:
        log.Println("資源池滿了,釋放這個資源吧")
        r.Close()
    }
}

以上就是一個簡單且線程安全的鏈接池實現方式了。咱們能夠看到的是,如今鏈接池雖然已經實現了,可是還有幾個小缺點:app

  1. 咱們對鏈接最大的數量沒有限制,若是線程池空的話都咱們默認就直接新建一個鏈接返回了。一旦併發量高的話將會不斷新建鏈接,很容易(尤爲是MySQL)形成too many connections的報錯發生。
  2. 既然咱們須要保證最大可獲取鏈接數量,那麼咱們就不但願數量定的太死。但願空閒的時候能夠維護必定的空閒鏈接數量idleNum,可是又但願咱們能限制最大可獲取鏈接數量maxNum。
  3. 第一種狀況是併發過多的狀況,那麼若是併發量過少呢?如今咱們在新建一個鏈接而且歸還後,咱們很長一段時間再也不使用這個鏈接。那麼這個鏈接頗有可能在幾個小時甚至更長時間以前就已經創建的了。長時間閒置的鏈接咱們並無辦法保證它的可用性。便有可能咱們下次獲取的鏈接是已經失效的鏈接。

那麼咱們能夠從已經成熟使用的MySQL鏈接池庫和Redis鏈接池庫中看看,它們是怎麼解決這些問題的。

Golang標準庫的Sql鏈接池

Golang的鏈接池實如今標準庫database/sql/sql.go下。當咱們運行:

db, err := sql.Open("mysql", "xxxx")

的時候,就會打開一個鏈接池。咱們能夠看看返回的db的結構體:

type DB struct {
    waitDuration int64 // Total time waited for new connections.
    mu           sync.Mutex // protects following fields
    freeConn     []*driverConn
    connRequests map[uint64]chan connRequest
    nextRequest  uint64 // Next key to use in connRequests.
    numOpen      int    // number of opened and pending open connections
    // Used to signal the need for new connections
    // a goroutine running connectionOpener() reads on this chan and
    // maybeOpenNewConnections sends on the chan (one send per needed connection)
    // It is closed during db.Close(). The close tells the connectionOpener
    // goroutine to exit.
    openerCh          chan struct{}
    closed            bool
    maxIdle           int                    // zero means defaultMaxIdleConns; negative means 0
    maxOpen           int                    // <= 0 means unlimited
    maxLifetime       time.Duration          // maximum amount of time a connection may be reused
    cleanerCh         chan struct{}
    waitCount         int64 // Total number of connections waited for.
    maxIdleClosed     int64 // Total number of connections closed due to idle.
    maxLifetimeClosed int64 // Total number of connections closed due to max free limit.
}

上面省去了一些暫時不須要關注的field。咱們能夠看的,DB這個鏈接池內部存儲鏈接的結構freeConn,並非咱們以前使用的chan,而是[]*driverConn,一個鏈接切片。同時咱們還能夠看到,裏面有maxIdle等相關變量來控制空閒鏈接數量。值得注意的是,DB的初始化函數Open函數並無新建數據庫鏈接。而新建鏈接在哪一個函數呢?咱們能夠在Query方法一路往回找,咱們能夠看到這個函數:func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error)。而咱們從鏈接池獲取鏈接的方法,就從這裏開始:

獲取鏈接

// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
    // 先判斷db是否已經關閉。
    db.mu.Lock()
    if db.closed {
        db.mu.Unlock()
        return nil, errDBClosed
    }
    // 注意檢測context是否已經被超時等緣由被取消。
    select {
    default:
    case <-ctx.Done():
        db.mu.Unlock()
        return nil, ctx.Err()
    }
    lifetime := db.maxLifetime

    // 這邊若是在freeConn這個切片有空閒鏈接的話,就left pop一個出列。注意的是,這邊由於是切片操做,因此須要前面須要加鎖且獲取後進行解鎖操做。同時判斷返回的鏈接是否已通過期。
    numFree := len(db.freeConn)
    if strategy == cachedOrNewConn && numFree > 0 {
        conn := db.freeConn[0]
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        conn.inUse = true
        db.mu.Unlock()
        if conn.expired(lifetime) {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        // Lock around reading lastErr to ensure the session resetter finished.
        conn.Lock()
        err := conn.lastErr
        conn.Unlock()
        if err == driver.ErrBadConn {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        return conn, nil
    }

    // 這邊就是等候獲取鏈接的重點了。當空閒的鏈接爲空的時候,這邊將會新建一個request(的等待鏈接 的請求)而且開始等待
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        // 下面的動做至關於往connRequests這個map插入本身的號碼牌。
        // 插入號碼牌以後這邊就不須要阻塞等待繼續往下走邏輯。
        req := make(chan connRequest, 1)
        reqKey := db.nextRequestKeyLocked()
        db.connRequests[reqKey] = req
        db.waitCount++
        db.mu.Unlock()

        waitStart := time.Now()

        // Timeout the connection request with the context.
        select {
        case <-ctx.Done():
            // context取消操做的時候,記得從connRequests這個map取走本身的號碼牌。
            db.mu.Lock()
            delete(db.connRequests, reqKey)
            db.mu.Unlock()

            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

            select {
            default:
            case ret, ok := <-req:
                // 這邊值得注意了,由於如今已經被context取消了。可是剛剛放了本身的號碼牌進去排隊裏面。意思是說不定已經發了鏈接了,因此得注意歸還!
                if ok && ret.conn != nil {
                    db.putConn(ret.conn, ret.err, false)
                }
            }
            return nil, ctx.Err()
        case ret, ok := <-req:
            // 下面是已經得到鏈接後的操做了。檢測一下得到鏈接的情況。由於有可能已通過期了等等。
            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

            if !ok {
                return nil, errDBClosed
            }
            if ret.err == nil && ret.conn.expired(lifetime) {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            if ret.conn == nil {
                return nil, ret.err
            }
            ret.conn.Lock()
            err := ret.conn.lastErr
            ret.conn.Unlock()
            if err == driver.ErrBadConn {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            return ret.conn, ret.err
        }
    }
    // 下面就是若是上面說的限制狀況不存在,能夠建立先鏈接時候,要作的建立鏈接操做了。
    db.numOpen++ // optimistically
    db.mu.Unlock()
    ci, err := db.connector.Connect(ctx)
    if err != nil {
        db.mu.Lock()
        db.numOpen-- // correct for earlier optimism
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        return nil, err
    }
    db.mu.Lock()
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
        inUse:     true,
    }
    db.addDepLocked(dc, dc)
    db.mu.Unlock()
    return dc, nil
}
複製代碼

簡單來講,DB結構體除了用的是slice來存儲鏈接,還加了一個相似排隊機制的connRequests來解決獲取等待鏈接的過程。同時在判斷鏈接健康性都有很好的兼顧。那麼既然有了排隊機制,歸還鏈接的時候是怎麼作的呢?

釋放鏈接

咱們能夠直接找到func (db *DB) putConnDBLocked(dc *driverConn, err error) bool這個方法。就像註釋說的,這個方法主要的目的是:

Satisfy a connRequest or put the driverConn in the idle pool and return true or return false.

咱們主要來看看裏面重點那幾行:

...
    // 若是已經超過最大打開數量了,就不須要在迴歸pool了
    if db.maxOpen > 0 && db.numOpen > db.maxOpen {
        return false
    }
    // 這邊是重點了,基原本說就是從connRequest這個map裏面隨機抽一個在排隊等着的請求。取出來後發給他。就不用歸還池子了。
    if c := len(db.connRequests); c > 0 {
        var req chan connRequest
        var reqKey uint64
        for reqKey, req = range db.connRequests {
            break
        }
        delete(db.connRequests, reqKey) // 刪除這個在排隊的請求。
        if err == nil {
            dc.inUse = true
        }
        // 把鏈接給這個正在排隊的鏈接。
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    } else if err == nil && !db.closed {
        // 既然沒人排隊,就看看到了最大鏈接數目沒有。沒到就歸還給freeConn。
        if db.maxIdleConnsLocked() > len(db.freeConn) {
            db.freeConn = append(db.freeConn, dc)
            db.startCleanerLocked()
            return true
        }
        db.maxIdleClosed++
    }
...

咱們能夠看到,當歸還鏈接時候,若是有在排隊輪候的請求就不歸還給池子直接發給在輪候的人了。

如今基本就解決前面說的小問題了。不會出現鏈接太多致使沒法控制too many connections的狀況。也很好了維持了鏈接池的最小數量。同時也作了相關對於鏈接健康性的檢查操做。

值得注意的是,做爲標準庫的代碼,相關注釋和代碼都很是完美,真的能夠看的神清氣爽。

redis Golang實現的Redis客戶端

這個Golang實現的Redis客戶端,是怎麼實現鏈接池的。這邊的思路很是奇妙,仍是能學習到很多好思路。固然了,因爲代碼註釋比較少,啃起來第一下仍是有點迷糊的。相關代碼地址在https://github.com/go-redis/r... 能夠看到。

而它的鏈接池結構以下

type ConnPool struct {
    ...
    queue chan struct{}

    connsMu      sync.Mutex
    conns        []*Conn
    idleConns    []*Conn
    poolSize     int
    idleConnsLen int

    stats Stats

    _closed  uint32 // atomic
    closedCh chan struct{}
}

咱們能夠看到裏面存儲鏈接的結構仍是slice。可是咱們能夠重點看看queueconnsidleConns這幾個變量,後面會說起到。可是值得注意的是!咱們能夠看到,這裏有兩個[]*Conn結構:connsidleConns,那麼問題來了:

到底鏈接存在哪裏?

新建鏈接池鏈接

咱們先重新建鏈接池鏈接開始看:

func NewConnPool(opt *Options) *ConnPool {
    ....
    p.checkMinIdleConns()

    if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
        go p.reaper(opt.IdleCheckFrequency)
    }
    ....
}

初始化鏈接池的函數有個和前面兩個不一樣的地方。

  1. checkMinIdleConns方法,在鏈接池初始化的時候就會往鏈接池填滿空閒的鏈接。
  2. go p.reaper(opt.IdleCheckFrequency)則會在初始化鏈接池的時候就會起一個go程,週期性的淘汰鏈接池裏面要被淘汰的鏈接。

獲取鏈接

func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
    if p.closed() {
        return nil, ErrClosed
    }
    
    //這邊和前面sql獲取鏈接函數的流程先不一樣。sql是先看看鏈接池有沒有空閒鏈接,有的話先獲取不到再排隊。這邊是直接先排隊獲取令牌,排隊函數後面會分析。
    err := p.waitTurn(ctx)
    if err != nil {
        return nil, err
    }
    //前面沒出error的話,就已經排隊輪候到了。接下來就是獲取的流程。
    for {
        p.connsMu.Lock()
        //從空閒鏈接裏面先獲取一個空閒鏈接。
        cn := p.popIdle()
        p.connsMu.Unlock()

        if cn == nil {
            // 沒有空閒鏈接時候直接跳出循環。
            break
        }
        // 判斷是否已通過時,是的話close掉了而後繼續取出。
        if p.isStaleConn(cn) {
            _ = p.CloseConn(cn)
            continue
        }

        atomic.AddUint32(&p.stats.Hits, 1)
        return cn, nil
    }

    atomic.AddUint32(&p.stats.Misses, 1)
    
    // 若是沒有空閒鏈接的話,這邊就直接新建鏈接了。
    newcn, err := p.newConn(ctx, true)
    if err != nil {
        // 歸還令牌。
        p.freeTurn()
        return nil, err
    }

    return newcn, nil
}

咱們能夠試着回答開頭那個問題:鏈接到底存在哪裏?答案是從cn := p.popIdle()這句話能夠看出,獲取鏈接這個動做,是從idleConns裏面獲取的,而裏面的函數也證實了這一點。同時個人理解是:

  1. sql的排隊意味着我對鏈接池申請鏈接後,把本身的編號告訴鏈接池。鏈接那邊一看到有空閒了,就叫個人號。我答應了一聲,而後鏈接池就直接給個鏈接給我。我若是不歸還,鏈接池就一直不叫下一個號。
  2. redis這邊的意思是,我去和鏈接池申請的不是鏈接而是令牌。我就一直排隊等着,鏈接池給我令牌了,我纔去倉庫裏面找空閒鏈接或者本身新建一個鏈接。用完了鏈接除了歸還鏈接外,還得歸還令牌。固然了,若是我本身新建鏈接出錯了,我哪怕拿不到鏈接回家,我也得把令牌給回鏈接池,否則鏈接池的令牌數少了,最大鏈接數也會變小。

而:

func (p *ConnPool) freeTurn() {
    <-p.queue
}
func (p *ConnPool) waitTurn(ctx context.Context) error {
...
    case p.queue <- struct{}{}:
        return nil
...
}

就是在靠queue這個chan來維持令牌數量。

那麼conns的做用是什麼呢?咱們能夠來看看新建鏈接這個函數:

新建鏈接

func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
    cn, err := p.dialConn(ctx, pooled)
    if err != nil {
        return nil, err
    }

    p.connsMu.Lock()
    p.conns = append(p.conns, cn)
    if pooled {
        // 若是鏈接池滿了,會在後面移除。
        if p.poolSize >= p.opt.PoolSize {
            cn.pooled = false
        } else {
            p.poolSize++
        }
    }
    p.connsMu.Unlock()
    return cn, nil
}

基本邏輯出來了。就是若是新建鏈接的話,我並不會直接放在idleConns裏面,而是先放conns裏面。同時先看池子滿了沒有。滿的話後面歸還的時候會標記,後面會刪除。那麼這個後面會刪除,指的是何時呢?那就是下面說的歸還鏈接的時候了。

歸還鏈接

func (p *ConnPool) Put(cn *Conn) {
    if cn.rd.Buffered() > 0 {
        internal.Logger.Printf("Conn has unread data")
        p.Remove(cn, BadConnError{})
        return
    }
    //這就是咱們剛剛說的後面了,前面標記過不要入池的,這邊就刪除了。固然了,裏面也會進行freeTurn操做。
    if !cn.pooled {
        p.Remove(cn, nil)
        return
    }

    p.connsMu.Lock()
    p.idleConns = append(p.idleConns, cn)
    p.idleConnsLen++
    p.connsMu.Unlock()
    //咱們能夠看到很明顯的這個歸還號碼牌的動做。
    p.freeTurn()
}

其實歸還的過程,就是從conns轉移到idleConns的過程。固然了,若是新建這個鏈接時候發現已經超賣了,後面歸還時候就不轉移,直接刪除了。

等等,上面的邏輯彷佛有點不對?咱們來理一下獲取鏈接流程:

  1. waitTurn,拿到令牌。而令牌數量是根據pool裏面的queue決定的。
  2. 拿到令牌了,去庫房idleConns裏面拿空閒的鏈接。沒有的話就本身newConn一個,而且把他記錄到conns裏面。
  3. 用完了,就調用put歸還:也就是從conns轉移到idleConns。歸還的時候就檢查在newConn時候是否是已經作了超賣標記了。是的話就不轉移到idleConns

我當時疑惑了很久,既然始終都須要得到令牌才能獲得鏈接,令牌數量是定的。爲何還會超賣呢?翻了一下源碼,個人答案是:

雖然Get方法獲取鏈接是newConn這個私用方法,受到令牌管制致使不會出現超賣。可是這個方法接受傳參:pooled bool。因此我猜是擔憂其餘人調用這個方法時候,無論三七二十一就傳了true,致使poolSize愈來愈大。

總的來講,redis這個鏈接池的鏈接數控制,仍是在 queue這個我稱爲令牌的chan進行操做。

總結

上面能夠看到,鏈接池的最基本的保證,就是獲取鏈接時候的線程安全。可是在實現諸多額外特性時候卻又從不一樣角度來實現。仍是很是有意思的。可是無論存儲結構是用chan仍是仍是slice,均可以很好的實現這一點。若是像sql或者redis那樣用slice來存儲鏈接,就得維護一個結構來表示排隊等候的效果。

相關文章
相關標籤/搜索