golang 數據庫鏈接池database/sql 實現原理分析

golang對數據庫的請求,抽象出來一套通用的鏈接池,用go的機制來講,golang只須要提供一個驅動(driver)的interface,底層不一樣數據庫協議,由用戶根據本身的數據庫實現對應的驅動便可。css

本文從源碼實現的角度,探索這裏的細節以及須要避免的坑,基於1.14代碼分析,部分bug在1.15中有修復或優化,這裏也會說起。mysql

golang版本:1.14git

目錄結構說明

└── sql
    ├── convert.go           # 結果行的讀取與轉換
    ├── convert_test.go
    ├── ctxutil.go           # 綁定上下文的一些通用方法
    ├── doc.txt
    ├── driver               # driver 定義來實現數據庫驅動所須要的接口
    │   ├── driver.go
    │   ├── types.go         # 數據類型別名和轉換
    │   └── types_test.go
    ├── example_cli_test.go
    ├── example_service_test.go
    ├── example_test.go
    ├── fakedb_test.go
    ├── sql.go               # 通用的接口和類型,包括事物,鏈接等
    └── sql_test.go

主要數據結構

1. sql.DB

type DB struct {
    // Atomic access only. At top of struct to prevent mis-alignment
    // on 32-bit platforms. Of type time.Duration.
    waitDuration int64          // 等待新的鏈接所須要的總時間
    connector driver.Connector  // 數據庫驅動本身實現
    // numClosed is an atomic counter which represents a total number of
    // closed connections. Stmt.openStmt checks it before cleaning closed
    // connections in Stmt.css.
    numClosed uint64           // 關閉的鏈接數

    mu           sync.Mutex // protects following fields
    freeConn     []*driverConn
    connRequests map[uint64]chan connRequest
    nextRequest  uint64 // Next key to use in connRequests.
    numOpen      int    // number of opened and pending open connections
    // Used to signal the need for new connections
    // a goroutine running connectionOpener() reads on this chan and
    // maybeOpenNewConnections sends on the chan (one send per needed connection)
    // It is closed during db.Close(). The close tells the connectionOpener
    // goroutine to exit.
    openerCh          chan struct{}      // 用於通知須要建立新的鏈接
    // resetterCh        chan *driverConn  // 已廢棄
    closed            bool
    dep               map[finalCloser]depSet // map[一級對象]map[二級對象]bool,一個外部以來,用於自動關閉
    lastPut           map[*driverConn]string // stacktrace of last conn's put; debug only
    maxIdle           int                    // zero means defaultMaxIdleConns(2); negative means 0
    maxOpen           int                    // <= 0 means unlimited
    maxLifetime       time.Duration          // maximum amount of time a connection may be reused
    cleanerCh         chan struct{}          // 用於通知清理過時的鏈接,maxlife時間改變或者鏈接被關閉時會經過該channel通知
    waitCount         int64 // Total number of connections waited for.   // 這些狀態數據,能夠經過db.Stat() 獲取
    maxIdleClosed     int64 // Total number of connections closed due to idle.
    maxLifetimeClosed int64 // Total number of connections closed due to max free limit.

    stop func() // stop cancels the connection opener and the session resetter.
}

sql.DB不是一個鏈接,它是數據庫的抽象接口,也是整個鏈接池的句柄,對多個goroutine是併發安全的。它能夠根據driver打開關閉數據庫鏈接,管理鏈接池。這對不一樣的數據庫來講都是同樣的。github

2. sql.driverConn

// driverConn wraps a driver.Conn with a mutex, to
// be held during all calls into the Conn. (including any calls onto
// interfaces returned via that Conn, such as calls on Tx, Stmt,
// Result, Rows)
type driverConn struct {
   db        *DB
   createdAt time.Time

   sync.Mutex  // guards following
   ci          driver.Conn  // 由不一樣的驅動本身實現,對應一條具體的數據庫鏈接
   needReset   bool         // The connection session should be reset before use if true.
   closed      bool         // 當前鏈接的狀態,是否已經關閉
   finalClosed bool         // ci.Close has been called
   openStmt    map[*driverStmt]bool

   // guarded by db.mu
   inUse      bool
   onPut      []func() // code (with db.mu held) run when conn is next returned  // 歸還鏈接的時候調用
   dbmuClosed bool     // same as closed, but guarded by db.mu, for removeClosedStmtLocked
}

對單個鏈接的封裝,包含了實際的數據庫鏈接以及相關的狀態信息等golang

3. driver.Conn

// Conn is a connection to a database. It is not used concurrently
// by multiple goroutines.
//
// Conn is assumed to be stateful.
type Conn interface {
   // Prepare returns a prepared statement, bound to this connection.
   Prepare(query string) (Stmt, error)

   // Close invalidates and potentially stops any current
   // prepared statements and transactions, marking this
   // connection as no longer in use.
   //
   // Because the sql package maintains a free pool of
   // connections and only calls Close when there's a surplus of
   // idle connections, it shouldn't be necessary for drivers to
   // do their own connection caching.
   Close() error

   // Begin starts and returns a new transaction.
   //
   // Deprecated: Drivers should implement ConnBeginTx instead (or additionally).
   Begin() (Tx, error)
}

一條具體的數據庫鏈接,須要由不一樣驅動本身去實現接口sql

4. driver.Driver

type Driver interface {
    Open(name string) (Conn, error)
}

Driver 只包含一個函數,Open()用來返回一個可用鏈接,多是新創建的,也多是以前緩存的關閉的鏈接。數據庫

5. driver.DriverContext

type DriverContext interface {
// OpenConnector must parse the name in the same format that Driver.Open
// parses the name parameter.
    OpenConnector(name string) (Connector, error)
}

DriverContext 的目的是維護drievr上下文信息,避免了每次新建鏈接的時候都須要解析一遍 dsn。須要有Driver對象本身去實現。緩存

6. driver.Connector

type Connector interface {
// Connect returns a connection to the database.
// Connect may return a cached connection (one previously
// closed), but doing so is unnecessary; the sql package
// maintains a pool of idle connections for efficient re-use.
//
// The provided context.Context is for dialing purposes only
// (see net.DialContext) and should not be stored or used for
// other purposes.
//
// The returned connection is only used by one goroutine at a
// time.
    Connect(context.Context) (Conn, error)
// Driver returns the underlying Driver of the Connector,
// mainly to maintain compatibility with the Driver method
// on sql.DB.
    Driver() Driver
}

driver.Connector 是driver的插口,是一個接口類型的對象,由不一樣類型的數據庫來實現。
driver.Connector 包含兩個函數。安全

  • Connect 用來創建鏈接
  • Driver 用來返回一個 Driver 對象,Driver也是個接口類型對象,須要不一樣的數據庫本身去實現。

主要操做流程

1. 註冊驅動

import (
    _ "github.com/go-sql-driver/mysql"
)

var (
    driversMu sync.RWMutex
    drivers   = make(map[string]driver.Driver)
)
func Register(name string, driver driver.Driver) {
    driversMu.Lock()
    defer driversMu.Unlock()
    if driver == nil {
        panic("sql: Register driver is nil")
    }
    if _, dup := drivers[name]; dup {
        panic("sql: Register called twice for driver " + name)
    }
    drivers[name] = driver
}

/database/sql 提供的是一個通用的數據庫鏈接池,當咱們鏈接不一樣的數據庫時,只須要將對應的數據庫驅動註冊進去就可使用。session

這裏的註冊,實際上就是將數據庫名稱和對應的數據庫驅動(數據庫鏈接包裝器)添加的一個map中,每一個import進來的庫,須要在init函數中調用註冊函數來實現。

2. 建立鏈接池句柄 sql.Open()

func Open(driverName, dataSourceName string) (*DB, error) {
    driversMu.RLock()
    driveri, ok := drivers[driverName]  // 1
    driversMu.RUnlock()
    if !ok {
        return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
    }
    if driverCtx, ok := driveri.(driver.DriverContext); ok {  // 2
        connector, err := driverCtx.OpenConnector(dataSourceName)
        if err != nil {
            return nil, err
        }
        return OpenDB(connector), nil  // 3
    }
    return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil  // 4
}

func OpenDB(c driver.Connector) *DB {
   ctx, cancel := context.WithCancel(context.Background())
   db := &DB{
      connector:    c,
      openerCh:     make(chan struct{}, connectionRequestQueueSize),
      lastPut:      make(map[*driverConn]string),
      connRequests: make(map[uint64]chan connRequest),
      stop:         cancel,
   }

   go db.connectionOpener(ctx)  // 經過channel通知來建立鏈接
   // go db.connectionResetter(ctx) // 用於重置鏈接,1.14廢棄
   return db
}

Open函數一般解釋爲初始化db,這裏只是經過驅動名稱,獲取到對應的驅動,並對驅動進行一系列的初始化操做,須要注意的是,Open並不會和db創建鏈接,只是在操做這些數據結構,啓動後臺協程之類的動做。

這裏的dataSourceName簡稱dsn,包含了鏈接數據庫所必須的參數,用戶名密碼ip端口等信息,由不一樣的驅動本身實現解析,固然,有些驅動也支持在dsn中配置一些數據庫參數,如autocommit等。因爲解析字符串獲得這些信息會有必定的資源消耗,所以,還提供了對解析後的結果緩存的功能,避免了每次創建新的鏈接都須要解析一次,要作到這一點,須要驅動實現 driver.DriverContext 接口。

這個時候你就有了這樣一個結構,不過此時的鏈接池中並無鏈接,也就是說沒有真正訪問db

golang 數據庫鏈接池database/sql 實現原理分析

3. 設置數據庫鏈接參數

最大空閒鏈接數,空閒鏈接數超過該值就會被關閉,默認爲defaultMaxIdleConns(2)

func (db *DB) SetMaxIdleConns(n int) {}

最大容許打開的鏈接數,超過該數量後,不容許創建新的鏈接,工做協程只能阻塞等待鏈接的釋放

func (db *DB) SetMaxOpenConns(n int) {}

鏈接能夠被重用的最大時間,換言之,一個鏈接多久後會被關閉,不過會等待當前的請求完成後纔會關閉,be closed lazily,一個很雞肋的參數

func (db *DB) SetConnMaxLifetime(d time.Duration) {
    // 經過啓動一個單獨的協程 connectionCleaner 來實現 
    startCleanerLocked {
        go db.connectionCleaner(db.shortestIdleTimeLocked())
    }
}

1.15 以後新增參數,鏈接最大空閒時間,idle時間超過該值會被關閉,不過會等待當前的請求完成後纔會關閉,be closed lazily

func (db *DB) SetConnMaxIdleTime(d time.Duration) {
    // 1.15 實現了對空閒鏈接的超時回收,複用了SetConnMaxLifetime的部分邏輯,也是在connectionCleaner協程中實現的
}

SetConnMaxLifetime 和 SetConnMaxIdleTime 細節實現

  • 1.14 實現
func (db *DB) startCleanerLocked() {
   if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil {
      db.cleanerCh = make(chan struct{}, 1)
      go db.connectionCleaner(db.maxLifetime)
   }
}

func (db *DB) connectionCleaner(d time.Duration) {
   const minInterval = time.Second

   if d < minInterval {
      d = minInterval
   }
   t := time.NewTimer(d)

   for {
      // 當maxlife時間到達
      // 或者maxlife發生改變及db被close
      select {
      case <-t.C:
      case <-db.cleanerCh: // maxLifetime was changed or db was closed.
      }

      db.mu.Lock()
      d = db.maxLifetime
      if db.closed || db.numOpen == 0 || d <= 0 {
         db.cleanerCh = nil
         db.mu.Unlock()
         return
      }

      // 循環處理free狀態的鏈接
      expiredSince := nowFunc().Add(-d)
      var closing []*driverConn
      for i := 0; i < len(db.freeConn); i++ {
         c := db.freeConn[i]
         if c.createdAt.Before(expiredSince) {
            closing = append(closing, c)
            last := len(db.freeConn) - 1
            db.freeConn[i] = db.freeConn[last]
            db.freeConn[last] = nil
            db.freeConn = db.freeConn[:last]
            i--
         }
      }
      db.maxLifetimeClosed += int64(len(closing))
      db.mu.Unlock()

      for _, c := range closing {
         c.Close()
      }

      // 若是maxlife被重置,須要更新定時器時間
      if d < minInterval {
         d = minInterval
      }
      t.Reset(d)
   }
}
  • 1.15 實現
func (db *DB) startCleanerLocked() {
  if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil {
    db.cleanerCh = make(chan struct{}, 1)
    go db.connectionCleaner(db.shortestIdleTimeLocked())  // maxidle和maxlife取較小值
  }
}

func (db *DB) connectionCleaner(d time.Duration) {
  const minInterval = time.Second

  if d < minInterval {
    d = minInterval
  }
  t := time.NewTimer(d)

  for {
    select {
    case <-t.C:
    case <-db.cleanerCh: // maxLifetime was changed or db was closed.
    }

    db.mu.Lock()
    d = db.shortestIdleTimeLocked()
    if db.closed || db.numOpen == 0 || d <= 0 {
      db.cleanerCh = nil
      db.mu.Unlock()
      return
    }

    closing := db.connectionCleanerRunLocked()
    db.mu.Unlock()
    for _, c := range closing {
      c.Close()
    }

    if d < minInterval {
      d = minInterval
    }
    t.Reset(d)
  }
}

// 對idle超時和life超時的鏈接分別收集,統一返回
func (db *DB) connectionCleanerRunLocked() (closing []*driverConn) {
  if db.maxLifetime > 0 {
    expiredSince := nowFunc().Add(-db.maxLifetime)
    for i := 0; i < len(db.freeConn); i++ {
      c := db.freeConn[i]
      if c.createdAt.Before(expiredSince) {
        closing = append(closing, c)
        last := len(db.freeConn) - 1
        db.freeConn[i] = db.freeConn[last]
        db.freeConn[last] = nil
        db.freeConn = db.freeConn[:last]
        i--
      }
    }
    db.maxLifetimeClosed += int64(len(closing))
  }

  if db.maxIdleTime > 0 {
    expiredSince := nowFunc().Add(-db.maxIdleTime)
    var expiredCount int64
    for i := 0; i < len(db.freeConn); i++ {
      c := db.freeConn[i]
      if db.maxIdleTime > 0 && c.returnedAt.Before(expiredSince) {
        closing = append(closing, c)
        expiredCount++
        last := len(db.freeConn) - 1
        db.freeConn[i] = db.freeConn[last]
        db.freeConn[last] = nil
        db.freeConn = db.freeConn[:last]
        i--
      }
    }
    db.maxIdleTimeClosed += expiredCount
  }
  return
}

1.14 和 1.15的實現邏輯基本一致,只是增長了對idle超時的判斷作了兼容

4. 訪問數據庫

當咱們作完上面這些初始化動做後,按照咱們的習慣,一般會嘗試性鏈接下db,用來判斷鏈接參數是否正常,如用戶名密碼是否正確,但並非發送用戶請求,通常的作法是調用 db.Ping(),

func (db *DB) Ping() error {
   return db.PingContext(context.Background())
}

func (db *DB) PingContext(ctx context.Context) error {
   var dc *driverConn
   var err error

   // 獲取一個可用鏈接,後面會看到同樣的邏輯,這裏先跳過細節
   for i := 0; i < maxBadConnRetries; i++ {
      dc, err = db.conn(ctx, cachedOrNewConn)
      if err != driver.ErrBadConn {
         break
      }
   }
   if err == driver.ErrBadConn {
      dc, err = db.conn(ctx, alwaysNewConn)  // db.conn 是來獲取可用鏈接的,是數據庫鏈接池較爲核心的一部分
   }
   if err != nil {
      return err
   }

   // 發送ping命令
   return db.pingDC(ctx, dc, dc.releaseConn)
}

func (db *DB) pingDC(ctx context.Context, dc *driverConn, release func(error)) error {
   var err error
   if pinger, ok := dc.ci.(driver.Pinger); ok {
      withLock(dc, func() {
         err = pinger.Ping(ctx)  // 這裏須要驅動本身去實現,對應mysql來講,發送的是sql_type=14(COM_PING)的請求包
      })
   }
   release(err)   // 將該鏈接放回到free池
   return err
}

5. 發送sql請求

這裏看幾個最簡單的發送sql的方法

// 沒有結果集,值返回ok/error包
func (db *DB) Exec(query string, args ...interface{}) (Result, error) {}
func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {}

// 返回大於0條結果集
func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {}
func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {}

// 預期結果集只有一行,沒有結果集Scan時報ErrNoRows,Scan結果若是有多行,只取第一行,多餘的數據行丟棄
func (db *DB) QueryRow(query string, args ...interface{}) *Row {}
func (db *DB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {}

這裏有幾個注意事項:

  • 咱們能夠發現,每個方法都會同時有另一個帶 Context 後綴的方法,查看調用關係的話,會發現,不帶Context的函數(Exec/Query/QueryRow)其實裏面就是調用的帶Context的函數(ExecContext/QueryContext/QueryRowContext),這裏的Context和大多數庫函數同樣,用來進行信號的同步,例如超時限制等,通常不須要單獨設置
  • 咱們能夠發現,每一個函數參數都是支持可變參數列表,用法和prepare用法同樣,用 ? 作佔位符,那咱們直接拼好sql和使用佔位符哪一種更優呢?
    rows1, err := db.Query("select * from t1 where a = 1」)
    rows2, err := db.Query("select * from t1 where a = ?", 1)

這兩條sql執行的結果是同樣的,可是底層是不同的,與不一樣驅動的具體實現略有差異。

以mysql爲例,區別在於第一個Query,實際發送了一條sql(sql_type:3),第二條Query,實際發送了兩條sql(sql_type:22 和 sql_tyep:23),先prepare,再execute,雖然說二進制協議要快些,可是每次都會發送兩條sql,第一次發送的prepare,以後只會execute一次且不會主動回收這個prepare信息。

這個接口設計之初,應該就是按照prepare+execute的思想設計的,當佔位符參數個數爲0時,可否優化直接發送一條sql,要看底層的驅動接口是否支持,換言之,prepare+execute

接下來,以Query爲例,看下具體的實現流程

func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {
   return db.QueryContext(context.Background(), query, args...)
}

func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
   var rows *Rows
   var err error

   // 執行query,優先從鏈接池獲取鏈接,若是獲取到badconn(以及關閉的鏈接),重試,最多重試maxBadConnRetries(2)次
   for i := 0; i < maxBadConnRetries; i++ {
      rows, err = db.query(ctx, query, args, cachedOrNewConn)
      if err != driver.ErrBadConn {
         break
      }
   }

   // 必定建立新的鏈接執行query
   if err == driver.ErrBadConn {
      return db.query(ctx, query, args, alwaysNewConn)
   }
   return rows, err
}

func (db *DB) query(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) {
   // 獲取鏈接
   dc, err := db.conn(ctx, strategy)
   if err != nil {
      return nil, err
   }

   // 使用獲取的鏈接執行查詢
   return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
}

能夠發現,執行一條普通sql,須要兩步,第一步,獲取鏈接(db.conn),第二步,執行查詢(db.queryDC)

6. 獲取鏈接

// 提供了兩種獲取鏈接的策略,alwaysNewConn & cachedOrNewConn,字面意思,老是新建 & 優先複用free鏈接

func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
   // 全局加鎖 這裏有個鏈接池的大鎖,須要注意
   db.mu.Lock()
   if db.closed {
      db.mu.Unlock()
      return nil, errDBClosed
   }

   // context 超時檢測
   select {
   default:
   case <-ctx.Done():
      db.mu.Unlock()
      return nil, ctx.Err()
   }
   lifetime := db.maxLifetime

   // 優先從free池中獲取鏈接
   numFree := len(db.freeConn)
   if strategy == cachedOrNewConn && numFree > 0 {
      // 取第一個free鏈接
      conn := db.freeConn[0]
      // 切片拷貝
      copy(db.freeConn, db.freeConn[1:])
      // 調整切片長度
      db.freeConn = db.freeConn[:numFree-1]
      conn.inUse = true
      db.mu.Unlock()
      // 檢查鏈接是否超時,超時則返回錯誤
      if conn.expired(lifetime) {
         conn.Close()
         return nil, driver.ErrBadConn
      }

      // 對鏈接狀態進行重置,一般是使用過的鏈接須要重置,避免鏈接已經處於不可用狀態
      if err := conn.resetSession(ctx); err == driver.ErrBadConn {
         conn.Close()
         return nil, driver.ErrBadConn
      }
      return conn, nil
   }

   // 已經沒有free鏈接,或者策略要求建立一個新鏈接

   // 當前打開的鏈接已經達到了容許打開鏈接數的上限,須要阻塞等待
   if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
      // Make the connRequest channel. It's buffered so that the
      // connectionOpener doesn't block while waiting for the req to be read.

      // 創建一個惟一key和請求鏈接connRequest channel的映射
      req := make(chan connRequest, 1)
      reqKey := db.nextRequestKeyLocked()
      db.connRequests[reqKey] = req
      db.waitCount++
      db.mu.Unlock()

      waitStart := time.Now()
      // Timeout the connection request with the context.
      select {
      // 若是超時,從map中刪除該key,記錄統計信息,並檢查鏈接是否已經就緒
      case <-ctx.Done():
         // Remove the connection request and ensure no value has been sent
         // on it after removing.
         db.mu.Lock()
         delete(db.connRequests, reqKey)
         db.mu.Unlock()
         atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
         // 若是已經生成了可用鏈接,將新鏈接放回到free池中
         select {
         default:
         case ret, ok := <-req:
            if ok && ret.conn != nil {
               db.putConn(ret.conn, ret.err, false)
            }
         }
         return nil, ctx.Err()

      case ret, ok := <-req:
         atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

         if !ok {
            return nil, errDBClosed
         }
         // Only check if the connection is expired if the strategy is cachedOrNewConns.
         // If we require a new connection, just re-use the connection without looking
         // at the expiry time. If it is expired, it will be checked when it is placed
         // back into the connection pool.
         // This prioritizes giving a valid connection to a client over the exact connection
         // lifetime, which could expire exactly after this point anyway.
         // 對cachedOrNewConn策略的鏈接請求,須要判斷鏈接是否過時
         // 若是是請求新鏈接,則不作判斷,等鏈接被放回free池中時再回收
         if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {
            ret.conn.Close()
            return nil, driver.ErrBadConn
         }
         if ret.conn == nil {
            return nil, ret.err
         }

         // Reset the session if required.
         if err := ret.conn.resetSession(ctx); err == driver.ErrBadConn {
            ret.conn.Close()
            return nil, driver.ErrBadConn
         }
         return ret.conn, ret.err
      }
   }

   // 因爲未達到鏈接數上限,直接建立新鏈接
   db.numOpen++ // optimistically
   db.mu.Unlock()
   ci, err := db.connector.Connect(ctx)
   if err != nil {
      db.mu.Lock()
      db.numOpen-- // correct for earlier optimism
      db.maybeOpenNewConnections()
      db.mu.Unlock()
      return nil, err
   }
   db.mu.Lock()
   dc := &driverConn{
      db:        db,
      createdAt: nowFunc(),
      ci:        ci,
      inUse:     true,
   }
   db.addDepLocked(dc, dc)
   db.mu.Unlock()
   return dc, nil
}

綜上,當咱們向鏈接池申請鏈接時,

  • 若是策略是 cachedOrNewConn,free鏈接池中有,則直接取出;
  • 若是鏈接池沒有空閒鏈接或者策略爲alwaysNewConn,當前鏈接不超過上限,則直接建立;
  • 不然經過channel去異步建立創建,調用點阻塞等待鏈接。

7. 執行查詢

Query

// ctx 是調用sql設置的上下文
// txctx 是事務的上下文,若是有
// releaseConn 上層傳遞的函數句柄,鏈接使用完後,將該鏈接放回到鏈接池

func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {
   queryerCtx, ok := dc.ci.(driver.QueryerContext)
   var queryer driver.Queryer
   if !ok {
      queryer, ok = dc.ci.(driver.Queryer)
   }
   if ok {
      var nvdargs []driver.NamedValue
      var rowsi driver.Rows
      var err error
      withLock(dc, func() {
         nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
         if err != nil {
            return
         }
         rowsi, err = ctxDriverQuery(ctx, queryerCtx, queryer, query, nvdargs)
      })
      // err要麼爲nil,要麼爲ErrSkip之外的其餘錯誤
      // ErrSkip 一般爲某些可選接口不存在,能夠嘗試其餘接口
      if err != driver.ErrSkip {
         if err != nil {
            releaseConn(err)
            return nil, err
         }
         // err != nil
         // 數據庫鏈接的全部權轉交給了rows,rows須要主動Close,以將該鏈接放回到free鏈接池中
         rows := &Rows{
            dc:          dc,
            releaseConn: releaseConn,
            rowsi:       rowsi,
         }

         // 經過context,當收到上層事件或者事務關閉的消息,rows可以自動調用Close釋放鏈接
         rows.initContextClose(ctx, txctx)
         return rows, nil
      }
   }

   // prepare
   var si driver.Stmt
   var err error
   withLock(dc, func() {
      si, err = ctxDriverPrepare(ctx, dc.ci, query)
   })
   if err != nil {
      releaseConn(err)
      return nil, err
   }

   // execute
   ds := &driverStmt{Locker: dc, si: si}
   rowsi, err := rowsiFromStatement(ctx, dc.ci, ds, args...)
   if err != nil {
      ds.Close()
      releaseConn(err)
      return nil, err
   }

   // Note: ownership of ci passes to the *Rows, to be freed
   // with releaseConn.
   rows := &Rows{
      dc:          dc,
      releaseConn: releaseConn,
      rowsi:       rowsi,
      closeStmt:   ds,
   }

   // 同上
   rows.initContextClose(ctx, txctx)
   return rows, nil
}

能夠發現,在sql包這一層,已經作好了全部的鏈接管理的動做,具體的收發包/包協議邏輯給了不一樣的驅動本身實現,當執行完查詢後,鏈接的全部權轉交給了rows對象,意味着須要rows主動調用 Close() 函數,纔會將當前使用的鏈接放回鏈接池中去。

QueryRow

一樣的,QueryRow() 和 Query() 其實底層是用的一套方法,返回值也僅僅是多包了一層

func (db *DB) QueryRow(query string, args ...interface{}) *Row {
   return db.QueryRowContext(context.Background(), query, args...)
}

func (db *DB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {
   rows, err := db.QueryContext(ctx, query, args...)
   return &Row{rows: rows, err: err}
}

// Row 和 Rows 的關係
type Row struct {
   // One of these two will be non-nil:
   err  error // deferred error for easy chaining
   rows *Rows
}

細心的話,可以發現 Row 僅僅提供了 Scan 一個方法,甚至 Close() 都沒有,相比 Rows,看着又些單薄,那如何釋放鏈接呢?

在 Row 的 Scan() 方法裏,會從rows讀取第一條數據,在最後,調用了rows的Close() 方法

func (r *Row) Scan(dest ...interface{}) error {
   if r.err != nil {
      return r.err
   }

   defer r.rows.Close()
   for _, dp := range dest {
      if _, ok := dp.(*RawBytes); ok {
         return errors.New("sql: RawBytes isn't allowed on Row.Scan")
      }
   }

   if !r.rows.Next() {
      if err := r.rows.Err(); err != nil {
         return err
      }
      return ErrNoRows
   }
   err := r.rows.Scan(dest...)
   if err != nil {
      return err
   }
   // Make sure the query can be processed to completion with no errors.
   return r.rows.Close()
}

意味着,當咱們使用 QueryRow() 時,必須使用row.Scan( ) 來獲取結果,不然該鏈接就不會放回鏈接池中去。

Exec

Exec 因爲不須要結果集,所以,對鏈接的release就不像前兩個那麼麻煩,除此以外的處理流程基本同樣。

func (db *DB) execDC(ctx context.Context, dc *driverConn, release func(error), query string, args []interface{}) (res Result, err error) {
   // 調用 Exec 函數就不須要額外關心鏈接的release,在函數結束以前就放回free池中
   defer func() {
      release(err)
   }()
   execerCtx, ok := dc.ci.(driver.ExecerContext)
   var execer driver.Execer
   if !ok {
      execer, ok = dc.ci.(driver.Execer)
   }

   // 和Query同樣,若是驅動有實現這兩個接口,就直接調用,不然由sql包主動觸發調用prepare+execute
   if ok {
      var nvdargs []driver.NamedValue
      var resi driver.Result
      withLock(dc, func() {
         nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
         if err != nil {
            return
         }
         resi, err = ctxDriverExec(ctx, execerCtx, execer, query, nvdargs)
      })
      if err != driver.ErrSkip {
         if err != nil {
            return nil, err
         }
         return driverResult{dc, resi}, nil
      }
   }

   var si driver.Stmt
   withLock(dc, func() {
      si, err = ctxDriverPrepare(ctx, dc.ci, query)
   })
   if err != nil {
      return nil, err
   }
   ds := &driverStmt{Locker: dc, si: si}
   defer ds.Close()

   // 從 statement 中保存結果
   return resultFromStatement(ctx, dc.ci, ds, args...)
}

8. 優雅地使用stmt

上面提到,直接使用佔位符的方式來執行二進制sql,實際每次會發送兩條sql,並不能提升執行效率,那statement的正確執行方式是什麼呢?

stmt, err := db.Prepare("select * from t1 where a = ?」)   // prepare,sql_type=22
if err != nil {
   return
}
_, err = stmt.Exec(1)  // 第一次執行,sql_type=23
if err != nil {
   return
}
rows, err := stmt.Query(1)  // 第二次執行,鏈接全部權轉交給rows,sql_type=23
if err != nil {
   return
}
_ = rows.Close()  // 歸還鏈接的全部權

_ = stmt.Close()  // sql_type=25

咱們知道,db是一個鏈接池對象,這裏prepare只須要顯示調用一次,以後stmt在執行時,若是獲取到了新的鏈接或者沒有執行過prepare的鏈接,那麼它會首先調用prepare,以後再去執行execute,所以,咱們無需擔憂是否會在一個沒有prepare過的鏈接上execute。
一樣,stmt在調用Close()時,會對全部鏈接上都執行close,關閉掉這個stmt,所以,關閉以前,要保證這個stmt不會再被執行。

9. 釋放鏈接

前面提到,咱們鏈接執行完一次普通查詢,就須要及時放回到freeConn鏈接池中,中間鏈接的擁有權雖然會轉移,但最終都須要被回收,其實,開啓事務的請求也相似,會在事務提交或回滾後釋放鏈接。鏈接釋放的方法從上層不斷向下傳遞,全部可能擁有鏈接全部權的對象,均可能接受到該釋放鏈接到方法。

// 用來將使用完的鏈接放回到free鏈接池中

func (dc *driverConn) releaseConn(err error) {
   dc.db.putConn(dc, err, true)
}

func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
   // 檢查鏈接是否還能複用
   if err != driver.ErrBadConn {
      if !dc.validateConnection(resetSession) {
         err = driver.ErrBadConn
      }
   }

   // debugGetPut 是測試信息
   db.mu.Lock()
   if !dc.inUse {
      db.mu.Unlock()
      if debugGetPut {
         fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
      }
      panic("sql: connection returned that was never out")
   }

   if err != driver.ErrBadConn && dc.expired(db.maxLifetime) {
      err = driver.ErrBadConn
   }
   if debugGetPut {
      db.lastPut[dc] = stack()
   }
   dc.inUse = false

   // 在這個鏈接上註冊的一些statement的關閉函數
   for _, fn := range dc.onPut {
      fn()
   }
   dc.onPut = nil

   // 若是當前鏈接已經不可用,意味着可能會有新的鏈接請求,調用maybeOpenNewConnections進行檢測
   if err == driver.ErrBadConn {
      // Don't reuse bad connections.
      // Since the conn is considered bad and is being discarded, treat it
      // as closed. Don't decrement the open count here, finalClose will
      // take care of that.
      db.maybeOpenNewConnections()
      db.mu.Unlock()
      dc.Close()
      return
   }

   // hook 的一個函數,用於測試,默認爲nil
   if putConnHook != nil {
      putConnHook(db, dc)
   }
   added := db.putConnDBLocked(dc, nil)
   db.mu.Unlock()

   if !added {
      dc.Close()
      return
   }
}

10. 鏈接管理

對鏈接的管理,主要包括鏈接的申請,鏈接的回收及複用,異步釋放超時的鏈接。

鏈接管理的整個流程以下

golang 數據庫鏈接池database/sql 實現原理分析

11. 不開啓事務,如何固定佔用一條鏈接

經過前面這些內容,可以發現,在不開啓事務的狀況下,鏈接完成一筆請求,回被放回到free池裏去,因此哪怕連續執行兩條select,也有可能用的不是同一個實際的數據庫鏈接,某些特殊場景,好比咱們執行完存儲過程,想要select輸出型結果時,這裏就不知足要求。

簡化下需求,實際上是咱們想要長時間佔用一個鏈接,開啓事務是一種解決方案,不過額外引入事務,可能會形成鎖的延遲釋放(以mysql兩階段鎖爲例), 這裏能夠用Context方法來實現,用法舉例

{
   var a int
   ctx := context.Background()
   cn, err := db.Conn(ctx)  // 綁定一個鏈接
   if err != nil {
      return
   }

   // 執行第一次查詢,將鏈接全部權轉交給rows1
   rows1, err := cn.QueryContext(ctx, "select * from t1")
   if err != nil {
      return
   }
   _ = rows1.Scan(&a)
   _ = rows1.Close() // rows1 close,將鏈接全部權交給cn 

   // 執行第二次查詢,將鏈接全部權轉交給rows2
   rows2, err = cn.QueryContext(ctx, "select * from t1")
   if err != nil {
      return
   }
   _ = rows2.Scan(&a)
   _ = rows2.Close() // rows1 close,將鏈接全部權交給cn

   // cn close,鏈接回收,放回free隊列
   _ = cn.Close()
}

關於db.Conn( ) 返回的sql.Conn對象,須要和driver.Conn 作區分,sql.Conn 是對driverConn的再一次封裝,是爲裏提供連續的單個數據庫鏈接,driver.Conn 是不一樣驅動要實現的接口

// Conn represents a single database connection rather than a pool of database
// connections. Prefer running queries from DB unless there is a specific
// need for a continuous single database connection.
//
// A Conn must call Close to return the connection to the database pool
// and may do so concurrently with a running query.
//
// After a call to Close, all operations on the
// connection fail with ErrConnDone.

type Conn struct {
   db *DB

   // closemu prevents the connection from closing while there
   // is an active query. It is held for read during queries
   // and exclusively during close.
   closemu sync.RWMutex

   // dc is owned until close, at which point
   // it's returned to the connection pool.
   dc *driverConn

   // done transitions from 0 to 1 exactly once, on close.
   // Once done, all operations fail with ErrConnDone.
   // Use atomic operations on value when checking value.
   done int32
}

12. 監控鏈接池狀態

因爲mysql協議是同步的,所以,當客戶端遊大量的併發請求,可是鏈接數要小於併發數的狀況下,是會有一部分請求被阻塞,等待其它請求釋放鏈接,在某些場景或使用不當的狀況下,這裏也可能會成爲瓶頸。不過庫中並無詳細記錄每一筆請求的鏈接等待時間,只提供了累計的等待時間之和,以及其它的監控指標,在定位問題時能夠用作參考。

庫提供了 db.Stats( ) 方法,會從db對象中獲取全部的監控指標,並生成對象 DBStats 對象

func (db *DB) Stats() DBStats {
   wait := atomic.LoadInt64(&db.waitDuration)

   db.mu.Lock()
   defer db.mu.Unlock()

   stats := DBStats{
      MaxOpenConnections: db.maxOpen,

      Idle:            len(db.freeConn),
      OpenConnections: db.numOpen,
      InUse:           db.numOpen - len(db.freeConn),

      WaitCount:         db.waitCount,
      WaitDuration:      time.Duration(wait),
      MaxIdleClosed:     db.maxIdleClosed,
      MaxLifetimeClosed: db.maxLifetimeClosed,
   }
   return stats
}

一個簡單的使用例子

func monitorConn(db *sql.DB) {
   go func(db *sql.DB) {
      mt := time.NewTicker(monitorDbInterval * time.Second)
      for {
         select {
         case <-mt.C:
            stat := db.Stats()
            logutil.Errorf("monitor db conn(%p): maxopen(%d), open(%d), use(%d), idle(%d), "+
               "wait(%d), idleClose(%d), lifeClose(%d), totalWait(%v)",
               db,
               stat.MaxOpenConnections, stat.OpenConnections,
               stat.InUse, stat.Idle,
               stat.WaitCount, stat.MaxIdleClosed,
               stat.MaxLifetimeClosed, stat.WaitDuration)
         }
      }
   }(db)
}

須要注意的是,1.15 以前,對 stat.MaxLifetimeClosed 對象統計會有異常,1.15 以後作了修復。

Attention

  • 注意鏈接全部者的傳遞關係,使用完成後要及時回收,如rows.Close(),row.Scan()等,不回收會形成鏈接泄漏,新的請求會被一直阻塞
  • 儘可能避免使用佔位符的方式執行sql,推薦本身完成sql的拼接或正常使用stmt
  • 1.15 後支持了對單個鏈接空閒時間的限制
  • db.Conn( ) 可以持續佔用一條鏈接,可是在該鏈接中,就沒有辦法調用以前prepare生成的stmt,可是在事務中能夠,tx.Stmt( )能夠生成特定於該事務的stmt
  • go提供了數據庫鏈接池回收策略,是針對freeConn的,換句話說,鏈接若是被一直佔用,哪怕已經超過了生存時間,也不會被回收
  • 咱們注意到,每次對鏈接池操做時,都要先加一把全局大鎖,所以,當鏈接數較多(>1000),且請求量較大時,會存在較爲嚴重的鎖競爭,這一點經過top(sys)指標,以及pprof也能發現,由於,一個簡單的方式,是將一個大的鏈接池拆分爲多個小的鏈接池,通常狀況下,經過簡單的輪詢將請求打散在多個鏈接池上,能有效下降鎖的粒度

【完】

相關文章
相關標籤/搜索