Redigo源碼分析

使用 golang 開發項目時常常會使用到 redis 服務,這時就須要一個趁手的 sdk,因此就在 github 中找了一個 star 較多的項目,這就是本篇的主角 redigo,同時這也是redis 的官方推薦git

不過在使用過程當中遇到了一些小問題,所以就去了解了一下源碼,如下做爲一個筆記。github

redigo 項目代碼量較少,且註釋明確,適合閱讀學習。
redigo 主要完成了如下功能:golang

  • 與 redis server 創建鏈接
  • 按照 RESP 協議進行命令組裝
  • 向 Redis server 發送組裝好的命令
  • 接收 Redis server 返回的數據
  • 將返回數據解析成 go 的數據類型
  • 提供鏈接池的使用方式

1. 代碼結構

redis
├── conn.go  // 實現 redis.go 中定義的接口,完成以上主要功能
├── conn_test.go
├── doc.go
├── go17.go
├── log.go
├── pool.go // pool 相關代碼
├── pool_test.go
├── pre_go17.go
├── pubsub.go
├── pubsub_test.go
├── redis.go // 定義接口
├── reply.go // 返回數據的類型轉換
├── reply_test.go
├── scan.go
├── scan_test.go
├── script.go // lua 腳本相關代碼
├── script_test.go
├── test_test.go
└── zpop_example_test.go

項目主體主要有以上代碼組成。redis

2. 建立鏈接

代碼位於文件 conn.go,要建立的鏈接是一個自定義的數據結構 conn,以下,shell

// conn is the low-level implementation of Conn
type conn struct {
    // Shared
    mu      sync.Mutex
    pending int // 命令計數
    err     error
    conn    net.Conn

    // Read
    readTimeout time.Duration
    br          *bufio.Reader

    // Write
    writeTimeout time.Duration
    bw           *bufio.Writer

    // Scratch space for formatting argument length.
    // '*' or '$', length, "\r\n"
    lenScratch [32]byte

    // Scratch space for formatting integers and floats.
    numScratch [40]byte
}

建立鏈接所須要的參數統一封裝到結構體 dialOptions 中,以下,數組

type dialOptions struct {
    readTimeout  time.Duration
    writeTimeout time.Duration
    dial         func(network, addr string) (net.Conn, error)
    db           int
    password     string
    dialTLS      bool
    skipVerify   bool
    tlsConfig    *tls.Config
}

其中包含各類超時設置,建立鏈接使用的函數,以及 TLS 等。
參數設置則封裝了一系列 Dialxxxx 函數,如 DialWriteTimeout,數據結構

// DialWriteTimeout specifies the timeout for writing a single command.
func DialWriteTimeout(d time.Duration) DialOption {
    return DialOption{func(do *dialOptions) {
      do.writeTimeout = d
    }}
}

同時須要結合以下結構體完成,app

type DialOption struct {
      f func(*dialOptions)    
}

建立鏈接時使用的是 Dial 函數負載均衡

// Dial connects to the Redis server at the given network and
// address using the specified options.
func Dial(network, address string, options ...DialOption) (Conn, error) {
    do := dialOptions{
      dial: net.Dial,
    }
    for _, option := range options { // 設置
      option.f(&do)
    }

    netConn, err := do.dial(network, address)
    if err != nil {
      return nil, err
    }

    // TLS 相關
    // ...

    c := &conn{
      conn:         netConn,
      bw:           bufio.NewWriter(netConn),
      br:           bufio.NewReader(netConn),
      readTimeout:  do.readTimeout,
      writeTimeout: do.writeTimeout,
    }

    if do.password != "" {
      if _, err := c.Do("AUTH", do.password); err != nil {
        netConn.Close()
        return nil, err
      }
    }

    if do.db != 0 {
      if _, err := c.Do("SELECT", do.db); err != nil {
        netConn.Close()
        return nil, err
      }
    }
    return c, nil
}

還有一個相似的 DialURL 函數就不分析了。less

3. 請求與接收

非 pipeline 的形式,都是經過 Do 函數去觸發這個流程的。

func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {
    c.mu.Lock() // 須要更新 pending 變量,加鎖串行
    pending := c.pending 
    c.pending = 0
    c.mu.Unlock()

    if cmd == "" && pending == 0 {
      return nil, nil
    }

    if c.writeTimeout != 0 {
      c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout)) // 設置寫超時
    }

    if cmd != "" {
      if err := c.writeCommand(cmd, args); err != nil { // 將要發送的命令以 RESP 協議寫到寫buf裏
        return nil, c.fatal(err)
      }
    }

    if err := c.bw.Flush(); err != nil { // buff flush,發送命令
      return nil, c.fatal(err)
    }

    if c.readTimeout != 0 {
      c.conn.SetReadDeadline(time.Now().Add(c.readTimeout)) // 設置寫超時
    }

    if cmd == "" {
      reply := make([]interface{}, pending)
      for i := range reply {
        r, e := c.readReply()
        if e != nil {
          return nil, c.fatal(e)
        }
        reply[i] = r
      }
      return reply, nil
    }

    var err error
    var reply interface{}
    for i := 0; i <= pending; i++ {
      var e error
      if reply, e = c.readReply(); e != nil { // 解析返回值
        return nil, c.fatal(e)
      }
      if e, ok := reply.(Error); ok && err == nil {
        err = e
      }
    }
    return reply, err
}

3.1 發送命令

發送命令前必須以 RESP 協議序列化,主要用到如下函數,

func (c *conn) writeCommand(cmd string, args []interface{}) (err error) {
    c.writeLen('*', 1+len(args)) // +1 是將 cmd 加上,將參數個數寫入 buf, 如*3\r\n
    err = c.writeString(cmd)
    for _, arg := range args {
      if err != nil {
        break
      }
      switch arg := arg.(type) {
      case string:
        err = c.writeString(arg)
      case []byte:
        err = c.writeBytes(arg)
      case int:
        err = c.writeInt64(int64(arg))
      case int64:
        err = c.writeInt64(arg)
      case float64:
        err = c.writeFloat64(arg)
      case bool:
        if arg {
          err = c.writeString("1")
        } else {
          err = c.writeString("0")
        }
      case nil:
        err = c.writeString("")
      default:
        var buf bytes.Buffer
        fmt.Fprint(&buf, arg)
        err = c.writeBytes(buf.Bytes())
      }
    }
    return err
}
// 用來寫參數長度和參數個數,經過前綴傳入 * 仍是 $ 決定,如 *3\r\n 或者 $3\r\n
func (c *conn) writeLen(prefix byte, n int) error {
    c.lenScratch[len(c.lenScratch)-1] = '\n'
    c.lenScratch[len(c.lenScratch)-2] = '\r'
    i := len(c.lenScratch) - 3
    for {
      c.lenScratch[i] = byte('0' + n%10)
      i -= 1
      n = n / 10
      if n == 0 {
        break
      }
    }
    c.lenScratch[i] = prefix
    _, err := c.bw.Write(c.lenScratch[i:])
    return err
}

循環複用 lenScratch 數組,是個好的設計,不會產生不少小的字符串。

拼接完了參數個數部分,在再拼接參數部分,項目中實現了一系列writexxx 函數,對不一樣的類型有不一樣的拼接方式,以 string 類型爲例,

// 用來拼接每一個參數,好比 GET,寫成 $3\r\nGET\r\n
func (c *conn) writeString(s string) error {
    c.writeLen('$', len(s))
    c.bw.WriteString(s)
    _, err := c.bw.WriteString("\r\n")
    return err
}

按照 RESP 協議的格式將命令拼接完之後須要發出去,經過 bufioFlush 完成。
另外,redigo 還支持 pipeline 的返回方式發送請求,使用到的函數是 SendFlush。在 Send中只是把命令寫到 bufio 的 buff 裏了,Flush 纔會發到對端。

3.2 響應解析

發送命令成功後, redis server 那邊處理完請求後,一樣以 RESP 的格式回覆。
解析函數是 readReply,對照着 RESP 協議看下就行了,仍是很簡單的。
multi bulk reply 能夠反覆調用 bulk reply 解析函數去遞歸完成解析。

3.3 關閉鏈接

使用完畢鏈接之後,須要手動 close 掉,以下,

func (c *conn) Close() error {
    c.mu.Lock()
    err := c.err
    if c.err == nil {
      c.err = errors.New("redigo: closed")
      err = c.conn.Close()
    }
    c.mu.Unlock()
    return err
}

4. pool 的分析

不少人在用 redigo 的時候會使用其鏈接池,由於使用該 sdk 時間較長,發現了 pool 的實現有兩個版本。

4.1 老版本 pool

主要數據結構爲 pool,即

type Pool struct {

    // Dial is an application supplied function for creating and configuring a
    // connection.
    //
    // The connection returned from Dial must not be in a special state
    // (subscribed to pubsub channel, transaction started, ...).
    Dial func() (Conn, error)

    // TestOnBorrow is an optional application supplied function for checking
    // the health of an idle connection before the connection is used again by
    // the application. Argument t is the time that the connection was returned
    // to the pool. If the function returns an error, then the connection is
    // closed.
    // 檢測鏈接的可用性,從外部注入。若是返回 error 則直接關閉鏈接
    TestOnBorrow func(c Conn, t time.Time) error

    // Maximum number of idle connections in the pool.
    // 最大閒置鏈接數量
    MaxIdle int

    // Maximum number of connections allocated by the pool at a given time.
    // When zero, there is no limit on the number of connections in the pool.
    // 最大活動鏈接數,若是爲 0,則表示沒有限制
    MaxActive int

    // Close connections after remaining idle for this duration. If the value
    // is zero, then idle connections are not closed. Applications should set
    // the timeout to a value less than the server's timeout.
    // 閒置過時時間,在get函數中會有邏輯刪除過時的鏈接
    // 若是不設置,鏈接就不會過時
    IdleTimeout time.Duration

    // If Wait is true and the pool is at the MaxActive limit, then Get() waits
    // for a connection to be returned to the pool before returning.
    // 設置若是活動鏈接達到上限 再獲取時候是等待仍是返回錯誤
    // 若是是 false 系統會返回redigo: connection pool exhausted
    // 若是是 true 會讓協程等待直到有鏈接釋放出來
    Wait bool

    // mu protects fields defined below.(主要是與狀態相關)
    mu     sync.Mutex
    cond   *sync.Cond
    closed bool
    active int

    // Stack of idleConn with most recently used at the front.
    idle list.List
}

該版本中使用了條件變量 Cond來協調多協程獲取鏈接池中的鏈接
idle 使用的是 go 標準庫 container 中的 list 數據結構,其中存放的是池中的鏈接,每一個鏈接的數據結構以下,

type idleConn struct {
    c Conn
    t time.Time
}

pooledConnection 結構實現了 Conn 接口的全部方法。

type pooledConnection struct {
    p     *Pool // pool
    c     Conn  // 當前鏈接
    state int
}

4.1.1 從 pool 獲取鏈接

func (p *Pool) Get() Conn {
    c, err := p.get()
    if err != nil {
      return errorConnection{err}
    }
    return &pooledConnection{p: p, c: c}
}

當從鏈接池獲取不到時就建立一個鏈接,因此仍是重點看如何從鏈接池獲取一個鏈接。

func (p *Pool) get() (Conn, error) {
    p.mu.Lock()

    // Prune stale connections.(將過時鏈接的清理放到每次的 get 中)
    // 若是 idletime 沒有設置,鏈接就不會過時,所以也就沒必要清理
    if timeout := p.IdleTimeout; timeout > 0 { 
      for i, n := 0, p.idle.Len(); i < n; i++ {
        e := p.idle.Back() // 取出最後一個鏈接
        if e == nil {
          break
        }
        ic := e.Value.(idleConn)
        if ic.t.Add(timeout).After(nowFunc()) { // 沒有過時,馬上終止檢查
          break
        }
        p.idle.Remove(e)
        p.release() // 須要操做 active 變量
        p.mu.Unlock()
        ic.c.Close() // 關閉鏈接
        p.mu.Lock()
      }
    }

    for {
      // Get idle connection.
        for i, n := 0, p.idle.Len(); i < n; i++ {
          e := p.idle.Front() // 從最前面取一個鏈接
          if e == nil {       // idle 裏是空的,先退出循環吧
            break
          }
          ic := e.Value.(idleConn)
          p.idle.Remove(e)
          test := p.TestOnBorrow
          p.mu.Unlock()
          if test == nil || test(ic.c, ic.t) == nil { // 返回這個鏈接
            return ic.c, nil
          }
          ic.c.Close() // 取出來的鏈接不可用
          p.mu.Lock()
          p.release()
        }

        // Check for pool closed before dialing a new connection.

        if p.closed {
          p.mu.Unlock()
          return nil, errors.New("redigo: get on closed pool")
        }

        // Dial new connection if under limit.
        if p.MaxActive == 0 || p.active < p.MaxActive {
          dial := p.Dial
          p.active += 1
          p.mu.Unlock()
          c, err := dial()
          if err != nil {
            p.mu.Lock()
            p.release()
            p.mu.Unlock()
            c = nil
          }
          return c, err
        }

        // 到達鏈接池最大鏈接數了,要不要等呢?
        if !p.Wait { // 不wait的話就直接返回鏈接池資源耗盡的錯誤
          p.mu.Unlock()
          return nil, ErrPoolExhausted
        }

        if p.cond == nil {
          p.cond = sync.NewCond(&p.mu)
        }
        p.cond.Wait() // wait 等待 release 和 put 後有新的鏈接可用
      }
}

當有設置 IdleTimeout 時,那麼到了每次 get 鏈接的時候都會從隊尾拿一個鏈接,檢查時間是否過時,若是過時,那麼把它刪掉,而後 release,這個操做一直持久直至找到一個沒有過時的鏈接。

而後從隊首拿一個鏈接,拿到後檢查可用後返回,不可用的鏈接處理方式同上面的過時鏈接。

若是這個 pool 的狀態已是 close 了,那麼直接返回。把這個檢查放在這裏,使 closed pool 仍然能夠清理一些過時鏈接,減小內存佔用。

若是 pool 沒有設置 MaxActive,或者當前 pool 中的 active 沒到閾值,那麼可使用 dial函數建立一個新鏈接,active 值加 1。

若是邏輯走到這裏尚未取到鏈接,說明如今 pool 裏的鏈接都被用了,若是不想 wait,那麼直接返回 pool 資源耗盡的錯誤(ErrPoolExhausted),不然使用 pool 的條件變量 cond進行Wait。咱們都知道在 Wait中 會先解鎖,而後陷入阻塞等待喚醒。

cond喚醒在 release 函數和put函數中,以下,

// release decrements the active count and signals waiters. The caller must
// hold p.mu during the call.
func (p *Pool) release() {
    p.active -= 1
    if p.cond != nil {
      p.cond.Signal() // 通知 wait 的請求返回鏈接
    }
}

4.1.2 向 pool return 鏈接

用完鏈接後要還回去,在調用鏈接的 Close 函數中會使用 put

func (p *Pool) put(c Conn, forceClose bool) error {
    err := c.Err()
    p.mu.Lock()
    if !p.closed && err == nil && !forceClose {
      p.idle.PushFront(idleConn{t: nowFunc(), c: c}) // 放回頭部
      if p.idle.Len() > p.MaxIdle {
        c = p.idle.Remove(p.idle.Back()).(idleConn).c // 若是鏈接池中數量超過了 maxidle,那麼從後面刪除一個
      } else {
        c = nil
      }
    }

    if c == nil {
      if p.cond != nil {
        p.cond.Signal() // 通知
      }
      p.mu.Unlock()
      return nil
    }

    p.release()
    p.mu.Unlock()
    return c.Close()
}

將沒有出錯的鏈接而且不是彆強制關閉的鏈接放回到 idle list 中,注意,這裏是放到隊頭!若是 list 長度大於最大閒置鏈接數(MaxIdle),那麼從隊尾取鏈接 remove掉。

Signal 喚醒條件變量。

4.2 新版本 pool

在版本的 pool 裏,本身實現了一個 list,取代 golang 的官方庫 list。

type idleList struct { // 只記錄頭尾
    count       int // list 長度
    front, back *poolConn
}

type poolConn struct { // 雙鏈表節點
    c          Conn
    t          time.Time
    created    time.Time
    next, prev *poolConn
}

同時實現了幾個雙鏈表的操做,pushFrontpopFrontpopBack
新版本的 pool 裏去掉了條件變量,換上了 channel。

chInitialized uint32 // set to 1 when field ch is initialized
ch           chan struct{} // limits open connections when p.Wait is true
idle         idleList      // idle connections
waitCount    int64         // total number of connections waited for.
waitDuration time.Duration // total time waited for new connections.

pool 裏的鏈接個數使用了buffer channel 進行控制,大小爲 MaxActive
在第一次從 pool 中獲取鏈接時,進行 channel 來初始化,即

func (p *Pool) lazyInit() {
    // Fast path.
    if atomic.LoadUint32(&p.chInitialized) == 1 {
      return
    }
    // Slow path.
    p.mu.Lock()
    if p.chInitialized == 0 {
      p.ch = make(chan struct{}, p.MaxActive)
      if p.closed {
        close(p.ch)
      } else {
        for i := 0; i < p.MaxActive; i++ {
          p.ch <- struct{}{}
        }
      }
      atomic.StoreUint32(&p.chInitialized, 1)
    }
    p.mu.Unlock()
}

4.2.1 從 pool 獲取鏈接

func (p *Pool) get(ctx context.Context) (*poolConn, error) {

    // Handle limit for p.Wait == true.
    var waited time.Duration
    if p.Wait && p.MaxActive > 0 {
      p.lazyInit()

      // wait indicates if we believe it will block so its not 100% accurate
      // however for stats it should be good enough.
      wait := len(p.ch) == 0
      var start time.Time
      if wait {
        start = time.Now()
      }
      if ctx == nil {
        <-p.ch
      } else {
        select {
        case <-p.ch:
        case <-ctx.Done():
          return nil, ctx.Err()
        }
      }
      if wait {
        waited = time.Since(start)
      }
    }

    p.mu.Lock()

    if waited > 0 {
      p.waitCount++
      p.waitDuration += waited
    }

    // Prune stale connections at the back of the idle list.
    if p.IdleTimeout > 0 {
      n := p.idle.count
      // 清理過時的 conn
      for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
        pc := p.idle.back
        p.idle.popBack()
        p.mu.Unlock()
        pc.c.Close()
        p.mu.Lock()
        p.active--
      }
    }

    // Get idle connection from the front of idle list.
    for p.idle.front != nil {
      pc := p.idle.front
      p.idle.popFront() // 從前面獲取一個鏈接
      p.mu.Unlock()
      if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
        (p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {
        return pc, nil
      }
      pc.c.Close()
      p.mu.Lock()
      p.active--
    }

    // Check for pool closed before dialing a new connection.
    if p.closed {
      p.mu.Unlock()
      return nil, errors.New("redigo: get on closed pool")
    }

    // Handle limit for p.Wait == false.
    if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive {
      p.mu.Unlock()
      return nil, ErrPoolExhausted
    }

   // 新建鏈接,更新 active
    p.active++
    p.mu.Unlock()
    c, err := p.dial(ctx)
    if err != nil {
      c = nil
      p.mu.Lock()
      p.active--
      if p.ch != nil && !p.closed {
        p.ch <- struct{}{} // 鏈接建立不成功,將這個名額還給 channel
      }
      p.mu.Unlock()
    }
    return &poolConn{c: c, created: nowFunc()}, err
}

能夠看到只有在鏈接池滿了願意等待時,纔回初始化 buffer channel,即調用 lazyInit 函數,省去了沒必要要的內存佔用,能夠借鑑。

當鏈接池已滿,則 channel 爲空,此時取鏈接的流程會阻塞在 <-p.ch,這跟上一版本的 cond.Wait() 有相同的做用。

有相同的清理過時鏈接的邏輯,以及鏈接建立邏輯。

4.2.2 從 pool 獲取鏈接

func (p *Pool) put(pc *poolConn, forceClose bool) error {
    p.mu.Lock()
    if !p.closed && !forceClose {
      pc.t = nowFunc()
      p.idle.pushFront(pc)          // 訪問頭部
      if p.idle.count > p.MaxIdle { // 超出了 MaxIdle 的數量的話,從後面踢掉最後面的一個
        pc = p.idle.back
        p.idle.popBack()
      } else {
        pc = nil
      }
    }

    if pc != nil {
      p.mu.Unlock()
      pc.c.Close()
      p.mu.Lock()
      p.active--
    }

    if p.ch != nil && !p.closed {
      p.ch <- struct{}{} // 放回池子
    }
    p.mu.Unlock()
    return nil
}

ch 控制着 pool 中鏈接的數量,當取走一個時,須要 <-ch,當還回一個時,須要 ch <- struct{}{}
另外,還要考慮到某些失敗的狀況,是否須要將配額還回 ch

4.3 分析

從上面的代碼能夠看出,無論哪一個版本的 pool,得到鏈接是從隊首獲取,還鏈接也是從隊首還,淘汰過時鏈接或者多出的鏈接是從隊尾淘汰。
另外,新版本的 pool 實現比老版本更加符合 golang 的語言風格。
從某種角度講,這種 pool 的管理方式會形成某些鏈接過熱的狀況,即負載均衡不均,尤爲是過時時間設置不合理的狀況下,需慎重使用。

相關文章
相關標籤/搜索