你們都知道go語言中的goroutine雖然消耗資源很小,而且是一個用戶線程。可是goroutine也不是無限開的,因此咱們會有不少關於協程池的庫,固然啊咱們本身也能夠完成一些簡單的攜程池。redis也是相同的,redis的連接也是不推薦無限制的打開,不然會形成redis負荷加劇。
先看一下Redigo 中的鏈接池的使用git
package main import ( "fmt" "github.com/panlei/redigo/redis" "time" ) func main() { pool := &redis.Pool{ MaxIdle: 4, MaxActive: 4, Dial: func() (redis.Conn, error) { rc, err := redis.Dial("tcp", "127.0.0.1:6379") if err != nil { return nil, err } return rc, nil }, IdleTimeout: time.Second, Wait: true, } con := pool.Get() str, err := redis.String(con.Do("get", "aaa")) con.Close() fmt.Println("value: ", str, " err:", err) }
咱們能夠看到Redigo使用鏈接池仍是很簡單的步驟:github
type Pool struct { // 撥號函數 從外部注入 Dial func() (Conn, error) // DialContext is an application supplied function for creating and configuring a DialContext func(ctx context.Context) (Conn, error) // 檢測鏈接的可用性,從外部注入。若是返回error 則直接關閉鏈接 TestOnBorrow func(c Conn, t time.Time) error // 最大閒置鏈接數量 MaxIdle int // 最大活動鏈接數 MaxActive int // 閒置過時時間 在get函數中會有邏輯 刪除過時的鏈接 IdleTimeout time.Duration // 設置若是活動鏈接達到上限 再獲取時候是等待仍是返回錯誤 // 若是是false 系統會返回redigo: connection pool exhausted // 若是是true 會利用p 的ch 屬性讓線程等待 知道有鏈接釋放出來 Wait bool // 鏈接最長生存時間 若是超過期間會被從鏈表中刪除 MaxConnLifetime time.Duration // 判斷ch 是否被初始化了 chInitialized uint32 // set to 1 when field ch is initialized // 鎖 mu sync.Mutex // mu protects the following fields closed bool // set to true when the pool is closed. active int // the number of open connections in the pool ch chan struct{} // limits open connections when p.Wait is true // 存放閒置鏈接的鏈表 idle idleList // idle connections // 等待獲取鏈接的數量 waitCount int64 // total number of connections waited for. waitDuration time.Duration // total time waited for new connections. } // 鏈接池中的具體鏈接對象 type conn struct { // 鎖 mu sync.Mutex pending int err error // http 包中的conn對象 conn net.Conn // 讀入過時時間 readTimeout time.Duration // bufio reader對象 用於讀取redis服務返回的結果 br *bufio.Reader // 寫入過時時間 writeTimeout time.Duration // bufio writer對象 帶buf 用於往服務端寫命令 bw *bufio.Writer // Scratch space for formatting argument length. // '*' or '$', length, "\r\n" lenScratch [32]byte // Scratch space for formatting integers and floats. numScratch [40]byte }
咱們能夠看到,其中有幾個關鍵性的字段好比最大活動鏈接數、最大閒置鏈接數、閒置連接過時時間、鏈接生存時間等。redis
咱們知道 鏈接池最重要的就是兩個方法,一個是獲取鏈接,一個是關閉鏈接。這個跟sync.Pool。咱們來看一下代碼:
GET:app
func (p *Pool) get(ctx context.Context) (*poolConn, error) { // 處理是否須要等待 pool Wait若是是true 則等待鏈接釋放 var waited time.Duration if p.Wait && p.MaxActive > 0 { // 從新初始化pool的ch channel p.lazyInit() // wait indicates if we believe it will block so its not 100% accurate // however for stats it should be good enough. wait := len(p.ch) == 0 var start time.Time if wait { start = time.Now() } // 獲取pool 的ch通道,一旦有鏈接被close 則能夠繼續返回鏈接 if ctx == nil { <-p.ch } else { select { case <-p.ch: case <-ctx.Done(): return nil, ctx.Err() } } if wait { waited = time.Since(start) } } p.mu.Lock() // 等待數量加1 增長等待時間 if waited > 0 { p.waitCount++ p.waitDuration += waited } // Prune stale connections at the back of the idle list. // 刪除鏈表尾部的陳舊鏈接,刪除超時的鏈接 // 鏈接close以後,鏈接會回到pool的idle(閒置)鏈表中 if p.IdleTimeout > 0 { n := p.idle.count for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ { pc := p.idle.back p.idle.popBack() p.mu.Unlock() pc.c.Close() p.mu.Lock() p.active-- } } // Get idle connection from the front of idle list. // 獲取鏈表空閒鏈接 拿鏈表第一個 for p.idle.front != nil { pc := p.idle.front p.idle.popFront() p.mu.Unlock() // 調用驗證函數若是返回錯誤不爲nil 關閉鏈接拿下一個 // 判斷鏈接生存時間 大於生存時間則關閉拿下一個 if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) && (p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) { return pc, nil } pc.c.Close() p.mu.Lock() p.active-- } // Check for pool closed before dialing a new connection. // 判斷鏈接池是否被關閉 若是關閉則解鎖報錯 if p.closed { p.mu.Unlock() return nil, errors.New("redigo: get on closed pool") } // Handle limit for p.Wait == false. // 若是活動鏈接大於最大鏈接解鎖 返回錯誤 if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive { p.mu.Unlock() return nil, ErrPoolExhausted } // 若是在鏈表中沒有獲取到可用的鏈接 並添加active數量添加 p.active++ p.mu.Unlock() c, err := p.dial(ctx) // 若是調用失敗 則減小active數量 if err != nil { c = nil p.mu.Lock() p.active-- if p.ch != nil && !p.closed { p.ch <- struct{}{} } p.mu.Unlock() } // 建立鏈接 設置建立時間 return &poolConn{c: c, created: nowFunc()}, err }
Put:tcp
// 關閉方法 func (ac *activeConn) Close() error { pc := ac.pc if pc == nil { return nil } ac.pc = nil // 判斷鏈接的狀態 發送取消事務 取消watch if ac.state&connectionMultiState != 0 { pc.c.Send("DISCARD") ac.state &^= (connectionMultiState | connectionWatchState) } else if ac.state&connectionWatchState != 0 { pc.c.Send("UNWATCH") ac.state &^= connectionWatchState } if ac.state&connectionSubscribeState != 0 { pc.c.Send("UNSUBSCRIBE") pc.c.Send("PUNSUBSCRIBE") // To detect the end of the message stream, ask the server to echo // a sentinel value and read until we see that value. sentinelOnce.Do(initSentinel) pc.c.Send("ECHO", sentinel) pc.c.Flush() for { p, err := pc.c.Receive() if err != nil { break } if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) { ac.state &^= connectionSubscribeState break } } } pc.c.Do("") // 把鏈接放入鏈表 ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil) return nil } // 將鏈接 從新放入限制鏈表 func (p *Pool) put(pc *poolConn, forceClose bool) error { p.mu.Lock() if !p.closed && !forceClose { pc.t = nowFunc() p.idle.pushFront(pc) if p.idle.count > p.MaxIdle { pc = p.idle.back p.idle.popBack() } else { pc = nil } } if pc != nil { p.mu.Unlock() pc.c.Close() p.mu.Lock() p.active-- } // 若是鏈接的ch 不爲空 而且鏈接池沒有關閉 則給channel中輸入一個struct{}{} // 若是在鏈接打到最大活動數量以後 再獲取鏈接而且pool的Wait爲ture 會阻塞線程等待返回鏈接 if p.ch != nil && !p.closed { p.ch <- struct{}{} } p.mu.Unlock() return nil }
整個Pool總體流程,我大概畫了一個圖。
從初始化 =》獲取 -》建立鏈接 =》返回鏈接 =》關閉鏈接 =》
其中還有一條線是Pool.Wait = true 會一直阻塞 一直到有鏈接Close 釋放活動鏈接數 線程被喚醒返回閒置的鏈接
其實大部分的鏈接池都是相似的流程,好比goroutine,redis。
函數