使用 golang 開發項目時常常會使用到 redis 服務,這時就須要一個趁手的 sdk,因此就在 github 中找了一個 star 較多的項目,這就是本篇的主角 redigo,同時這也是redis 的官方推薦。git
不過在使用過程當中遇到了一些小問題,所以就去了解了一下源碼,如下做爲一個筆記。github
redigo 項目代碼量較少,且註釋明確,適合閱讀學習。
redigo 主要完成了如下功能:golang
redis ├── conn.go // 實現 redis.go 中定義的接口,完成以上主要功能 ├── conn_test.go ├── doc.go ├── go17.go ├── log.go ├── pool.go // pool 相關代碼 ├── pool_test.go ├── pre_go17.go ├── pubsub.go ├── pubsub_test.go ├── redis.go // 定義接口 ├── reply.go // 返回數據的類型轉換 ├── reply_test.go ├── scan.go ├── scan_test.go ├── script.go // lua 腳本相關代碼 ├── script_test.go ├── test_test.go └── zpop_example_test.go
項目主體主要有以上代碼組成。redis
代碼位於文件 conn.go
,要建立的鏈接是一個自定義的數據結構 conn
,以下,shell
// conn is the low-level implementation of Conn type conn struct { // Shared mu sync.Mutex pending int // 命令計數 err error conn net.Conn // Read readTimeout time.Duration br *bufio.Reader // Write writeTimeout time.Duration bw *bufio.Writer // Scratch space for formatting argument length. // '*' or '$', length, "\r\n" lenScratch [32]byte // Scratch space for formatting integers and floats. numScratch [40]byte }
建立鏈接所須要的參數統一封裝到結構體 dialOptions
中,以下,數組
type dialOptions struct { readTimeout time.Duration writeTimeout time.Duration dial func(network, addr string) (net.Conn, error) db int password string dialTLS bool skipVerify bool tlsConfig *tls.Config }
其中包含各類超時設置,建立鏈接使用的函數,以及 TLS 等。
參數設置則封裝了一系列 Dialxxxx
函數,如 DialWriteTimeout
,數據結構
// DialWriteTimeout specifies the timeout for writing a single command. func DialWriteTimeout(d time.Duration) DialOption { return DialOption{func(do *dialOptions) { do.writeTimeout = d }} }
同時須要結合以下結構體完成,app
type DialOption struct { f func(*dialOptions) }
建立鏈接時使用的是 Dial
函數負載均衡
// Dial connects to the Redis server at the given network and // address using the specified options. func Dial(network, address string, options ...DialOption) (Conn, error) { do := dialOptions{ dial: net.Dial, } for _, option := range options { // 設置 option.f(&do) } netConn, err := do.dial(network, address) if err != nil { return nil, err } // TLS 相關 // ... c := &conn{ conn: netConn, bw: bufio.NewWriter(netConn), br: bufio.NewReader(netConn), readTimeout: do.readTimeout, writeTimeout: do.writeTimeout, } if do.password != "" { if _, err := c.Do("AUTH", do.password); err != nil { netConn.Close() return nil, err } } if do.db != 0 { if _, err := c.Do("SELECT", do.db); err != nil { netConn.Close() return nil, err } } return c, nil }
還有一個相似的 DialURL
函數就不分析了。less
非 pipeline 的形式,都是經過 Do
函數去觸發這個流程的。
func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) { c.mu.Lock() // 須要更新 pending 變量,加鎖串行 pending := c.pending c.pending = 0 c.mu.Unlock() if cmd == "" && pending == 0 { return nil, nil } if c.writeTimeout != 0 { c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout)) // 設置寫超時 } if cmd != "" { if err := c.writeCommand(cmd, args); err != nil { // 將要發送的命令以 RESP 協議寫到寫buf裏 return nil, c.fatal(err) } } if err := c.bw.Flush(); err != nil { // buff flush,發送命令 return nil, c.fatal(err) } if c.readTimeout != 0 { c.conn.SetReadDeadline(time.Now().Add(c.readTimeout)) // 設置寫超時 } if cmd == "" { reply := make([]interface{}, pending) for i := range reply { r, e := c.readReply() if e != nil { return nil, c.fatal(e) } reply[i] = r } return reply, nil } var err error var reply interface{} for i := 0; i <= pending; i++ { var e error if reply, e = c.readReply(); e != nil { // 解析返回值 return nil, c.fatal(e) } if e, ok := reply.(Error); ok && err == nil { err = e } } return reply, err }
發送命令前必須以 RESP 協議序列化,主要用到如下函數,
func (c *conn) writeCommand(cmd string, args []interface{}) (err error) { c.writeLen('*', 1+len(args)) // +1 是將 cmd 加上,將參數個數寫入 buf, 如*3\r\n err = c.writeString(cmd) for _, arg := range args { if err != nil { break } switch arg := arg.(type) { case string: err = c.writeString(arg) case []byte: err = c.writeBytes(arg) case int: err = c.writeInt64(int64(arg)) case int64: err = c.writeInt64(arg) case float64: err = c.writeFloat64(arg) case bool: if arg { err = c.writeString("1") } else { err = c.writeString("0") } case nil: err = c.writeString("") default: var buf bytes.Buffer fmt.Fprint(&buf, arg) err = c.writeBytes(buf.Bytes()) } } return err }
// 用來寫參數長度和參數個數,經過前綴傳入 * 仍是 $ 決定,如 *3\r\n 或者 $3\r\n func (c *conn) writeLen(prefix byte, n int) error { c.lenScratch[len(c.lenScratch)-1] = '\n' c.lenScratch[len(c.lenScratch)-2] = '\r' i := len(c.lenScratch) - 3 for { c.lenScratch[i] = byte('0' + n%10) i -= 1 n = n / 10 if n == 0 { break } } c.lenScratch[i] = prefix _, err := c.bw.Write(c.lenScratch[i:]) return err }
循環複用 lenScratch
數組,是個好的設計,不會產生不少小的字符串。
拼接完了參數個數部分,在再拼接參數部分,項目中實現了一系列writexxx
函數,對不一樣的類型有不一樣的拼接方式,以 string 類型爲例,
// 用來拼接每一個參數,好比 GET,寫成 $3\r\nGET\r\n func (c *conn) writeString(s string) error { c.writeLen('$', len(s)) c.bw.WriteString(s) _, err := c.bw.WriteString("\r\n") return err }
按照 RESP 協議的格式將命令拼接完之後須要發出去,經過 bufio
的 Flush
完成。
另外,redigo 還支持 pipeline 的返回方式發送請求,使用到的函數是 Send
和 Flush
。在 Send
中只是把命令寫到 bufio 的 buff 裏了,Flush
纔會發到對端。
發送命令成功後, redis server 那邊處理完請求後,一樣以 RESP 的格式回覆。
解析函數是 readReply
,對照着 RESP 協議看下就行了,仍是很簡單的。
multi bulk reply 能夠反覆調用 bulk reply 解析函數去遞歸完成解析。
使用完畢鏈接之後,須要手動 close 掉,以下,
func (c *conn) Close() error { c.mu.Lock() err := c.err if c.err == nil { c.err = errors.New("redigo: closed") err = c.conn.Close() } c.mu.Unlock() return err }
不少人在用 redigo 的時候會使用其鏈接池,由於使用該 sdk 時間較長,發現了 pool 的實現有兩個版本。
主要數據結構爲 pool
,即
type Pool struct { // Dial is an application supplied function for creating and configuring a // connection. // // The connection returned from Dial must not be in a special state // (subscribed to pubsub channel, transaction started, ...). Dial func() (Conn, error) // TestOnBorrow is an optional application supplied function for checking // the health of an idle connection before the connection is used again by // the application. Argument t is the time that the connection was returned // to the pool. If the function returns an error, then the connection is // closed. // 檢測鏈接的可用性,從外部注入。若是返回 error 則直接關閉鏈接 TestOnBorrow func(c Conn, t time.Time) error // Maximum number of idle connections in the pool. // 最大閒置鏈接數量 MaxIdle int // Maximum number of connections allocated by the pool at a given time. // When zero, there is no limit on the number of connections in the pool. // 最大活動鏈接數,若是爲 0,則表示沒有限制 MaxActive int // Close connections after remaining idle for this duration. If the value // is zero, then idle connections are not closed. Applications should set // the timeout to a value less than the server's timeout. // 閒置過時時間,在get函數中會有邏輯刪除過時的鏈接 // 若是不設置,鏈接就不會過時 IdleTimeout time.Duration // If Wait is true and the pool is at the MaxActive limit, then Get() waits // for a connection to be returned to the pool before returning. // 設置若是活動鏈接達到上限 再獲取時候是等待仍是返回錯誤 // 若是是 false 系統會返回redigo: connection pool exhausted // 若是是 true 會讓協程等待直到有鏈接釋放出來 Wait bool // mu protects fields defined below.(主要是與狀態相關) mu sync.Mutex cond *sync.Cond closed bool active int // Stack of idleConn with most recently used at the front. idle list.List }
該版本中使用了條件變量 Cond
來協調多協程獲取鏈接池中的鏈接idle
使用的是 go 標準庫 container 中的 list 數據結構,其中存放的是池中的鏈接,每一個鏈接的數據結構以下,
type idleConn struct { c Conn t time.Time }
pooledConnection
結構實現了 Conn
接口的全部方法。
type pooledConnection struct { p *Pool // pool c Conn // 當前鏈接 state int }
func (p *Pool) Get() Conn { c, err := p.get() if err != nil { return errorConnection{err} } return &pooledConnection{p: p, c: c} }
當從鏈接池獲取不到時就建立一個鏈接,因此仍是重點看如何從鏈接池獲取一個鏈接。
func (p *Pool) get() (Conn, error) { p.mu.Lock() // Prune stale connections.(將過時鏈接的清理放到每次的 get 中) // 若是 idletime 沒有設置,鏈接就不會過時,所以也就沒必要清理 if timeout := p.IdleTimeout; timeout > 0 { for i, n := 0, p.idle.Len(); i < n; i++ { e := p.idle.Back() // 取出最後一個鏈接 if e == nil { break } ic := e.Value.(idleConn) if ic.t.Add(timeout).After(nowFunc()) { // 沒有過時,馬上終止檢查 break } p.idle.Remove(e) p.release() // 須要操做 active 變量 p.mu.Unlock() ic.c.Close() // 關閉鏈接 p.mu.Lock() } } for { // Get idle connection. for i, n := 0, p.idle.Len(); i < n; i++ { e := p.idle.Front() // 從最前面取一個鏈接 if e == nil { // idle 裏是空的,先退出循環吧 break } ic := e.Value.(idleConn) p.idle.Remove(e) test := p.TestOnBorrow p.mu.Unlock() if test == nil || test(ic.c, ic.t) == nil { // 返回這個鏈接 return ic.c, nil } ic.c.Close() // 取出來的鏈接不可用 p.mu.Lock() p.release() } // Check for pool closed before dialing a new connection. if p.closed { p.mu.Unlock() return nil, errors.New("redigo: get on closed pool") } // Dial new connection if under limit. if p.MaxActive == 0 || p.active < p.MaxActive { dial := p.Dial p.active += 1 p.mu.Unlock() c, err := dial() if err != nil { p.mu.Lock() p.release() p.mu.Unlock() c = nil } return c, err } // 到達鏈接池最大鏈接數了,要不要等呢? if !p.Wait { // 不wait的話就直接返回鏈接池資源耗盡的錯誤 p.mu.Unlock() return nil, ErrPoolExhausted } if p.cond == nil { p.cond = sync.NewCond(&p.mu) } p.cond.Wait() // wait 等待 release 和 put 後有新的鏈接可用 } }
當有設置 IdleTimeout 時,那麼到了每次 get
鏈接的時候都會從隊尾拿一個鏈接,檢查時間是否過時,若是過時,那麼把它刪掉,而後 release
,這個操做一直持久直至找到一個沒有過時的鏈接。
而後從隊首拿一個鏈接,拿到後檢查可用後返回,不可用的鏈接處理方式同上面的過時鏈接。
若是這個 pool 的狀態已是 close 了,那麼直接返回。把這個檢查放在這裏,使 closed pool 仍然能夠清理一些過時鏈接,減小內存佔用。
若是 pool 沒有設置 MaxActive,或者當前 pool 中的 active 沒到閾值,那麼可使用 dial
函數建立一個新鏈接,active 值加 1。
若是邏輯走到這裏尚未取到鏈接,說明如今 pool 裏的鏈接都被用了,若是不想 wait
,那麼直接返回 pool 資源耗盡的錯誤(ErrPoolExhausted
),不然使用 pool 的條件變量 cond
進行Wait
。咱們都知道在 Wait
中 會先解鎖,而後陷入阻塞等待喚醒。
cond
喚醒在 release
函數和put
函數中,以下,
// release decrements the active count and signals waiters. The caller must // hold p.mu during the call. func (p *Pool) release() { p.active -= 1 if p.cond != nil { p.cond.Signal() // 通知 wait 的請求返回鏈接 } }
用完鏈接後要還回去,在調用鏈接的 Close
函數中會使用 put
。
func (p *Pool) put(c Conn, forceClose bool) error { err := c.Err() p.mu.Lock() if !p.closed && err == nil && !forceClose { p.idle.PushFront(idleConn{t: nowFunc(), c: c}) // 放回頭部 if p.idle.Len() > p.MaxIdle { c = p.idle.Remove(p.idle.Back()).(idleConn).c // 若是鏈接池中數量超過了 maxidle,那麼從後面刪除一個 } else { c = nil } } if c == nil { if p.cond != nil { p.cond.Signal() // 通知 } p.mu.Unlock() return nil } p.release() p.mu.Unlock() return c.Close() }
將沒有出錯的鏈接而且不是彆強制關閉的鏈接放回到 idle list 中,注意,這裏是放到隊頭!若是 list 長度大於最大閒置鏈接數(MaxIdle),那麼從隊尾取鏈接 remove
掉。
Signal
喚醒條件變量。
在版本的 pool 裏,本身實現了一個 list,取代 golang 的官方庫 list。
type idleList struct { // 只記錄頭尾 count int // list 長度 front, back *poolConn } type poolConn struct { // 雙鏈表節點 c Conn t time.Time created time.Time next, prev *poolConn }
同時實現了幾個雙鏈表的操做,pushFront
、popFront
和 popBack
。
新版本的 pool 裏去掉了條件變量,換上了 channel。
chInitialized uint32 // set to 1 when field ch is initialized ch chan struct{} // limits open connections when p.Wait is true idle idleList // idle connections waitCount int64 // total number of connections waited for. waitDuration time.Duration // total time waited for new connections.
pool 裏的鏈接個數使用了buffer channel 進行控制,大小爲 MaxActive
。
在第一次從 pool 中獲取鏈接時,進行 channel 來初始化,即
func (p *Pool) lazyInit() { // Fast path. if atomic.LoadUint32(&p.chInitialized) == 1 { return } // Slow path. p.mu.Lock() if p.chInitialized == 0 { p.ch = make(chan struct{}, p.MaxActive) if p.closed { close(p.ch) } else { for i := 0; i < p.MaxActive; i++ { p.ch <- struct{}{} } } atomic.StoreUint32(&p.chInitialized, 1) } p.mu.Unlock() }
func (p *Pool) get(ctx context.Context) (*poolConn, error) { // Handle limit for p.Wait == true. var waited time.Duration if p.Wait && p.MaxActive > 0 { p.lazyInit() // wait indicates if we believe it will block so its not 100% accurate // however for stats it should be good enough. wait := len(p.ch) == 0 var start time.Time if wait { start = time.Now() } if ctx == nil { <-p.ch } else { select { case <-p.ch: case <-ctx.Done(): return nil, ctx.Err() } } if wait { waited = time.Since(start) } } p.mu.Lock() if waited > 0 { p.waitCount++ p.waitDuration += waited } // Prune stale connections at the back of the idle list. if p.IdleTimeout > 0 { n := p.idle.count // 清理過時的 conn for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ { pc := p.idle.back p.idle.popBack() p.mu.Unlock() pc.c.Close() p.mu.Lock() p.active-- } } // Get idle connection from the front of idle list. for p.idle.front != nil { pc := p.idle.front p.idle.popFront() // 從前面獲取一個鏈接 p.mu.Unlock() if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) && (p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) { return pc, nil } pc.c.Close() p.mu.Lock() p.active-- } // Check for pool closed before dialing a new connection. if p.closed { p.mu.Unlock() return nil, errors.New("redigo: get on closed pool") } // Handle limit for p.Wait == false. if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive { p.mu.Unlock() return nil, ErrPoolExhausted } // 新建鏈接,更新 active p.active++ p.mu.Unlock() c, err := p.dial(ctx) if err != nil { c = nil p.mu.Lock() p.active-- if p.ch != nil && !p.closed { p.ch <- struct{}{} // 鏈接建立不成功,將這個名額還給 channel } p.mu.Unlock() } return &poolConn{c: c, created: nowFunc()}, err }
能夠看到只有在鏈接池滿了願意等待時,纔回初始化 buffer channel,即調用 lazyInit
函數,省去了沒必要要的內存佔用,能夠借鑑。
當鏈接池已滿,則 channel 爲空,此時取鏈接的流程會阻塞在 <-p.ch
,這跟上一版本的 cond.Wait()
有相同的做用。
有相同的清理過時鏈接的邏輯,以及鏈接建立邏輯。
func (p *Pool) put(pc *poolConn, forceClose bool) error { p.mu.Lock() if !p.closed && !forceClose { pc.t = nowFunc() p.idle.pushFront(pc) // 訪問頭部 if p.idle.count > p.MaxIdle { // 超出了 MaxIdle 的數量的話,從後面踢掉最後面的一個 pc = p.idle.back p.idle.popBack() } else { pc = nil } } if pc != nil { p.mu.Unlock() pc.c.Close() p.mu.Lock() p.active-- } if p.ch != nil && !p.closed { p.ch <- struct{}{} // 放回池子 } p.mu.Unlock() return nil }
ch
控制着 pool 中鏈接的數量,當取走一個時,須要 <-ch
,當還回一個時,須要 ch <- struct{}{}
。
另外,還要考慮到某些失敗的狀況,是否須要將配額還回 ch
。
從上面的代碼能夠看出,無論哪一個版本的 pool,得到鏈接是從隊首獲取,還鏈接也是從隊首還,淘汰過時鏈接或者多出的鏈接是從隊尾淘汰。
另外,新版本的 pool 實現比老版本更加符合 golang 的語言風格。
從某種角度講,這種 pool 的管理方式會形成某些鏈接過熱的狀況,即負載均衡不均,尤爲是過時時間設置不合理的狀況下,需慎重使用。