Go Redigo 源碼分析(二) 鏈接池

Redigo 鏈接池的使用

你們都知道go語言中的goroutine雖然消耗資源很小,而且是一個用戶線程。可是goroutine也不是無限開的,因此咱們會有不少關於協程池的庫,固然啊咱們本身也能夠完成一些簡單的攜程池。redis也是相同的,redis的連接也是不推薦無限制的打開,不然會形成redis負荷加劇。
先看一下Redigo 中的鏈接池的使用git

package main

import (
    "fmt"
    "github.com/panlei/redigo/redis"
    "time"
)

func main() {
    pool := &redis.Pool{
        MaxIdle:   4,
        MaxActive: 4,
        Dial: func() (redis.Conn, error) {
            rc, err := redis.Dial("tcp", "127.0.0.1:6379")
            if err != nil {
                return nil, err
            }
            return rc, nil
        },
        IdleTimeout: time.Second,
        Wait:        true,
    }
    con := pool.Get()
    str, err := redis.String(con.Do("get", "aaa"))
    con.Close()
    fmt.Println("value: ", str, " err:", err)
}

咱們能夠看到Redigo使用鏈接池仍是很簡單的步驟:github

  1. 建立鏈接池
  2. 簡單設置鏈接池的最大連接數等參數
  3. 注入撥號函數(設置redis地址 端口號等)
  4. 調用pool.Get() 獲取鏈接
  5. 使用鏈接Do函數請求redis
  6. 關閉鏈接

源碼

Pool conn 對象的定義
type Pool struct {
    // 撥號函數 從外部注入
    Dial func() (Conn, error)

    // DialContext is an application supplied function for creating and configuring a
    
    DialContext func(ctx context.Context) (Conn, error)

    // 檢測鏈接的可用性,從外部注入。若是返回error 則直接關閉鏈接
    TestOnBorrow func(c Conn, t time.Time) error

    // 最大閒置鏈接數量
    MaxIdle int

    // 最大活動鏈接數
    MaxActive int

    // 閒置過時時間 在get函數中會有邏輯 刪除過時的鏈接
    IdleTimeout time.Duration

    // 設置若是活動鏈接達到上限 再獲取時候是等待仍是返回錯誤
    // 若是是false 系統會返回redigo: connection pool exhausted
    // 若是是true 會利用p 的ch 屬性讓線程等待 知道有鏈接釋放出來
    Wait bool

    // 鏈接最長生存時間 若是超過期間會被從鏈表中刪除
    MaxConnLifetime time.Duration
    // 判斷ch 是否被初始化了
    chInitialized uint32 // set to 1 when field ch is initialized
    // 鎖
    mu           sync.Mutex    // mu protects the following fields
    closed       bool          // set to true when the pool is closed.
    active       int           // the number of open connections in the pool
    ch           chan struct{} // limits open connections when p.Wait is true
    // 存放閒置鏈接的鏈表
    idle         idleList      // idle connections
    // 等待獲取鏈接的數量
    waitCount    int64         // total number of connections waited for.
    waitDuration time.Duration // total time waited for new connections.
}

// 鏈接池中的具體鏈接對象
type conn struct {
    //  鎖
    mu      sync.Mutex
    pending int
    err     error
    // http 包中的conn對象
    conn    net.Conn

    // 讀入過時時間
    readTimeout time.Duration
    // bufio reader對象 用於讀取redis服務返回的結果
    br          *bufio.Reader

    // 寫入過時時間
    writeTimeout time.Duration
    // bufio writer對象 帶buf 用於往服務端寫命令
    bw           *bufio.Writer

    // Scratch space for formatting argument length.
    // '*' or '$', length, "\r\n"
    lenScratch [32]byte

    // Scratch space for formatting integers and floats.
    numScratch [40]byte
}

咱們能夠看到,其中有幾個關鍵性的字段好比最大活動鏈接數、最大閒置鏈接數、閒置連接過時時間、鏈接生存時間等。redis

Pool 的Get Close方法

咱們知道 鏈接池最重要的就是兩個方法,一個是獲取鏈接,一個是關閉鏈接。這個跟sync.Pool。咱們來看一下代碼:
GET:app

func (p *Pool) get(ctx context.Context) (*poolConn, error) {

    // 處理是否須要等待 pool Wait若是是true 則等待鏈接釋放
    var waited time.Duration
    if p.Wait && p.MaxActive > 0 {
        // 從新初始化pool的ch channel
        p.lazyInit()

        // wait indicates if we believe it will block so its not 100% accurate
        // however for stats it should be good enough.
        wait := len(p.ch) == 0
        var start time.Time
        if wait {
            start = time.Now()
        }
        // 獲取pool 的ch通道,一旦有鏈接被close 則能夠繼續返回鏈接
        if ctx == nil {
            <-p.ch
        } else {
            select {
            case <-p.ch:
            case <-ctx.Done():
                return nil, ctx.Err()
            }
        }
        if wait {
            waited = time.Since(start)
        }
    }

    p.mu.Lock()
    // 等待數量加1 增長等待時間
    if waited > 0 {
        p.waitCount++
        p.waitDuration += waited
    }

    // Prune stale connections at the back of the idle list.
    // 刪除鏈表尾部的陳舊鏈接,刪除超時的鏈接
    // 鏈接close以後,鏈接會回到pool的idle(閒置)鏈表中
    if p.IdleTimeout > 0 {
        n := p.idle.count
        for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
            pc := p.idle.back
            p.idle.popBack()
            p.mu.Unlock()
            pc.c.Close()
            p.mu.Lock()
            p.active--
        }
    }

    // Get idle connection from the front of idle list.
    // 獲取鏈表空閒鏈接 拿鏈表第一個
    for p.idle.front != nil {
        pc := p.idle.front
        p.idle.popFront()
        p.mu.Unlock()
        // 調用驗證函數若是返回錯誤不爲nil 關閉鏈接拿下一個
        // 判斷鏈接生存時間 大於生存時間則關閉拿下一個
        if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
            (p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {
            return pc, nil
        }
        pc.c.Close()
        p.mu.Lock()
        p.active--
    }

    // Check for pool closed before dialing a new connection.
    // 判斷鏈接池是否被關閉 若是關閉則解鎖報錯
    if p.closed {
        p.mu.Unlock()
        return nil, errors.New("redigo: get on closed pool")
    }

    // Handle limit for p.Wait == false.
    // 若是活動鏈接大於最大鏈接解鎖 返回錯誤
    if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive {
        p.mu.Unlock()
        return nil, ErrPoolExhausted
    }
    // 若是在鏈表中沒有獲取到可用的鏈接 並添加active數量添加
    p.active++
    p.mu.Unlock()
    c, err := p.dial(ctx)
    // 若是調用失敗 則減小active數量
    if err != nil {
        c = nil
        p.mu.Lock()
        p.active--
        if p.ch != nil && !p.closed {
            p.ch <- struct{}{}
        }
        p.mu.Unlock()
    }
    // 建立鏈接 設置建立時間
    return &poolConn{c: c, created: nowFunc()}, err
}

Put:tcp

// 關閉方法
func (ac *activeConn) Close() error {
    pc := ac.pc
    if pc == nil {
        return nil
    }
    ac.pc = nil
    // 判斷鏈接的狀態 發送取消事務 取消watch
    if ac.state&connectionMultiState != 0 {
        pc.c.Send("DISCARD")
        ac.state &^= (connectionMultiState | connectionWatchState)
    } else if ac.state&connectionWatchState != 0 {
        pc.c.Send("UNWATCH")
        ac.state &^= connectionWatchState
    }
    if ac.state&connectionSubscribeState != 0 {
        pc.c.Send("UNSUBSCRIBE")
        pc.c.Send("PUNSUBSCRIBE")
        // To detect the end of the message stream, ask the server to echo
        // a sentinel value and read until we see that value.
        sentinelOnce.Do(initSentinel)
        pc.c.Send("ECHO", sentinel)
        pc.c.Flush()
        for {
            p, err := pc.c.Receive()
            if err != nil {
                break
            }
            if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {
                ac.state &^= connectionSubscribeState
                break
            }
        }
    }
    pc.c.Do("")
    // 把鏈接放入鏈表
    ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil)
    return nil
}

// 將鏈接 從新放入限制鏈表
func (p *Pool) put(pc *poolConn, forceClose bool) error {
    p.mu.Lock()
    if !p.closed && !forceClose {
        pc.t = nowFunc()
        p.idle.pushFront(pc)
        if p.idle.count > p.MaxIdle {
            pc = p.idle.back
            p.idle.popBack()
        } else {
            pc = nil
        }
    }

    if pc != nil {
        p.mu.Unlock()
        pc.c.Close()
        p.mu.Lock()
        p.active--
    }
    // 若是鏈接的ch 不爲空 而且鏈接池沒有關閉 則給channel中輸入一個struct{}{}
    // 若是在鏈接打到最大活動數量以後 再獲取鏈接而且pool的Wait爲ture 會阻塞線程等待返回鏈接
    if p.ch != nil && !p.closed {
        p.ch <- struct{}{}
    }
    p.mu.Unlock()
    return nil
}

總結

整個Pool總體流程,我大概畫了一個圖。
從初始化 =》獲取 -》建立鏈接 =》返回鏈接 =》關閉鏈接 =》
其中還有一條線是Pool.Wait = true 會一直阻塞 一直到有鏈接Close 釋放活動鏈接數 線程被喚醒返回閒置的鏈接
其實大部分的鏈接池都是相似的流程,好比goroutine,redis。
圖片描述函數

相關文章
相關標籤/搜索