golang對數據庫的請求,抽象出來一套通用的鏈接池,用go的機制來講,golang只須要提供一個驅動(driver)的interface,底層不一樣數據庫協議,由用戶根據本身的數據庫實現對應的驅動便可。css
本文從源碼實現的角度,探索這裏的細節以及須要避免的坑,基於1.14代碼分析,部分bug在1.15中有修復或優化,這裏也會說起。mysql
golang版本:1.14git
└── sql ├── convert.go # 結果行的讀取與轉換 ├── convert_test.go ├── ctxutil.go # 綁定上下文的一些通用方法 ├── doc.txt ├── driver # driver 定義來實現數據庫驅動所須要的接口 │ ├── driver.go │ ├── types.go # 數據類型別名和轉換 │ └── types_test.go ├── example_cli_test.go ├── example_service_test.go ├── example_test.go ├── fakedb_test.go ├── sql.go # 通用的接口和類型,包括事物,鏈接等 └── sql_test.go
type DB struct { // Atomic access only. At top of struct to prevent mis-alignment // on 32-bit platforms. Of type time.Duration. waitDuration int64 // 等待新的鏈接所須要的總時間 connector driver.Connector // 數據庫驅動本身實現 // numClosed is an atomic counter which represents a total number of // closed connections. Stmt.openStmt checks it before cleaning closed // connections in Stmt.css. numClosed uint64 // 關閉的鏈接數 mu sync.Mutex // protects following fields freeConn []*driverConn connRequests map[uint64]chan connRequest nextRequest uint64 // Next key to use in connRequests. numOpen int // number of opened and pending open connections // Used to signal the need for new connections // a goroutine running connectionOpener() reads on this chan and // maybeOpenNewConnections sends on the chan (one send per needed connection) // It is closed during db.Close(). The close tells the connectionOpener // goroutine to exit. openerCh chan struct{} // 用於通知須要建立新的鏈接 // resetterCh chan *driverConn // 已廢棄 closed bool dep map[finalCloser]depSet // map[一級對象]map[二級對象]bool,一個外部以來,用於自動關閉 lastPut map[*driverConn]string // stacktrace of last conn's put; debug only maxIdle int // zero means defaultMaxIdleConns(2); negative means 0 maxOpen int // <= 0 means unlimited maxLifetime time.Duration // maximum amount of time a connection may be reused cleanerCh chan struct{} // 用於通知清理過時的鏈接,maxlife時間改變或者鏈接被關閉時會經過該channel通知 waitCount int64 // Total number of connections waited for. // 這些狀態數據,能夠經過db.Stat() 獲取 maxIdleClosed int64 // Total number of connections closed due to idle. maxLifetimeClosed int64 // Total number of connections closed due to max free limit. stop func() // stop cancels the connection opener and the session resetter. }
sql.DB不是一個鏈接,它是數據庫的抽象接口,也是整個鏈接池的句柄,對多個goroutine是併發安全的。它能夠根據driver打開關閉數據庫鏈接,管理鏈接池。這對不一樣的數據庫來講都是同樣的。github
// driverConn wraps a driver.Conn with a mutex, to // be held during all calls into the Conn. (including any calls onto // interfaces returned via that Conn, such as calls on Tx, Stmt, // Result, Rows) type driverConn struct { db *DB createdAt time.Time sync.Mutex // guards following ci driver.Conn // 由不一樣的驅動本身實現,對應一條具體的數據庫鏈接 needReset bool // The connection session should be reset before use if true. closed bool // 當前鏈接的狀態,是否已經關閉 finalClosed bool // ci.Close has been called openStmt map[*driverStmt]bool // guarded by db.mu inUse bool onPut []func() // code (with db.mu held) run when conn is next returned // 歸還鏈接的時候調用 dbmuClosed bool // same as closed, but guarded by db.mu, for removeClosedStmtLocked }
對單個鏈接的封裝,包含了實際的數據庫鏈接以及相關的狀態信息等golang
// Conn is a connection to a database. It is not used concurrently // by multiple goroutines. // // Conn is assumed to be stateful. type Conn interface { // Prepare returns a prepared statement, bound to this connection. Prepare(query string) (Stmt, error) // Close invalidates and potentially stops any current // prepared statements and transactions, marking this // connection as no longer in use. // // Because the sql package maintains a free pool of // connections and only calls Close when there's a surplus of // idle connections, it shouldn't be necessary for drivers to // do their own connection caching. Close() error // Begin starts and returns a new transaction. // // Deprecated: Drivers should implement ConnBeginTx instead (or additionally). Begin() (Tx, error) }
一條具體的數據庫鏈接,須要由不一樣驅動本身去實現接口sql
type Driver interface { Open(name string) (Conn, error) }
Driver 只包含一個函數,Open()用來返回一個可用鏈接,多是新創建的,也多是以前緩存的關閉的鏈接。數據庫
type DriverContext interface { // OpenConnector must parse the name in the same format that Driver.Open // parses the name parameter. OpenConnector(name string) (Connector, error) }
DriverContext 的目的是維護drievr上下文信息,避免了每次新建鏈接的時候都須要解析一遍 dsn。須要有Driver對象本身去實現。緩存
type Connector interface { // Connect returns a connection to the database. // Connect may return a cached connection (one previously // closed), but doing so is unnecessary; the sql package // maintains a pool of idle connections for efficient re-use. // // The provided context.Context is for dialing purposes only // (see net.DialContext) and should not be stored or used for // other purposes. // // The returned connection is only used by one goroutine at a // time. Connect(context.Context) (Conn, error) // Driver returns the underlying Driver of the Connector, // mainly to maintain compatibility with the Driver method // on sql.DB. Driver() Driver }
driver.Connector 是driver的插口,是一個接口類型的對象,由不一樣類型的數據庫來實現。
driver.Connector 包含兩個函數。安全
import ( _ "github.com/go-sql-driver/mysql" ) var ( driversMu sync.RWMutex drivers = make(map[string]driver.Driver) ) func Register(name string, driver driver.Driver) { driversMu.Lock() defer driversMu.Unlock() if driver == nil { panic("sql: Register driver is nil") } if _, dup := drivers[name]; dup { panic("sql: Register called twice for driver " + name) } drivers[name] = driver }
/database/sql 提供的是一個通用的數據庫鏈接池,當咱們鏈接不一樣的數據庫時,只須要將對應的數據庫驅動註冊進去就可使用。session
這裏的註冊,實際上就是將數據庫名稱和對應的數據庫驅動(數據庫鏈接包裝器)添加的一個map中,每一個import進來的庫,須要在init函數中調用註冊函數來實現。
func Open(driverName, dataSourceName string) (*DB, error) { driversMu.RLock() driveri, ok := drivers[driverName] // 1 driversMu.RUnlock() if !ok { return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName) } if driverCtx, ok := driveri.(driver.DriverContext); ok { // 2 connector, err := driverCtx.OpenConnector(dataSourceName) if err != nil { return nil, err } return OpenDB(connector), nil // 3 } return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil // 4 } func OpenDB(c driver.Connector) *DB { ctx, cancel := context.WithCancel(context.Background()) db := &DB{ connector: c, openerCh: make(chan struct{}, connectionRequestQueueSize), lastPut: make(map[*driverConn]string), connRequests: make(map[uint64]chan connRequest), stop: cancel, } go db.connectionOpener(ctx) // 經過channel通知來建立鏈接 // go db.connectionResetter(ctx) // 用於重置鏈接,1.14廢棄 return db }
Open函數一般解釋爲初始化db,這裏只是經過驅動名稱,獲取到對應的驅動,並對驅動進行一系列的初始化操做,須要注意的是,Open並不會和db創建鏈接,只是在操做這些數據結構,啓動後臺協程之類的動做。
這裏的dataSourceName簡稱dsn,包含了鏈接數據庫所必須的參數,用戶名密碼ip端口等信息,由不一樣的驅動本身實現解析,固然,有些驅動也支持在dsn中配置一些數據庫參數,如autocommit等。因爲解析字符串獲得這些信息會有必定的資源消耗,所以,還提供了對解析後的結果緩存的功能,避免了每次創建新的鏈接都須要解析一次,要作到這一點,須要驅動實現 driver.DriverContext 接口。
這個時候你就有了這樣一個結構,不過此時的鏈接池中並無鏈接,也就是說沒有真正訪問db
最大空閒鏈接數,空閒鏈接數超過該值就會被關閉,默認爲defaultMaxIdleConns(2)
func (db *DB) SetMaxIdleConns(n int) {}
最大容許打開的鏈接數,超過該數量後,不容許創建新的鏈接,工做協程只能阻塞等待鏈接的釋放
func (db *DB) SetMaxOpenConns(n int) {}
鏈接能夠被重用的最大時間,換言之,一個鏈接多久後會被關閉,不過會等待當前的請求完成後纔會關閉,be closed lazily,一個很雞肋的參數
func (db *DB) SetConnMaxLifetime(d time.Duration) { // 經過啓動一個單獨的協程 connectionCleaner 來實現 startCleanerLocked { go db.connectionCleaner(db.shortestIdleTimeLocked()) } }
1.15 以後新增參數,鏈接最大空閒時間,idle時間超過該值會被關閉,不過會等待當前的請求完成後纔會關閉,be closed lazily
func (db *DB) SetConnMaxIdleTime(d time.Duration) { // 1.15 實現了對空閒鏈接的超時回收,複用了SetConnMaxLifetime的部分邏輯,也是在connectionCleaner協程中實現的 }
SetConnMaxLifetime 和 SetConnMaxIdleTime 細節實現
func (db *DB) startCleanerLocked() { if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil { db.cleanerCh = make(chan struct{}, 1) go db.connectionCleaner(db.maxLifetime) } } func (db *DB) connectionCleaner(d time.Duration) { const minInterval = time.Second if d < minInterval { d = minInterval } t := time.NewTimer(d) for { // 當maxlife時間到達 // 或者maxlife發生改變及db被close select { case <-t.C: case <-db.cleanerCh: // maxLifetime was changed or db was closed. } db.mu.Lock() d = db.maxLifetime if db.closed || db.numOpen == 0 || d <= 0 { db.cleanerCh = nil db.mu.Unlock() return } // 循環處理free狀態的鏈接 expiredSince := nowFunc().Add(-d) var closing []*driverConn for i := 0; i < len(db.freeConn); i++ { c := db.freeConn[i] if c.createdAt.Before(expiredSince) { closing = append(closing, c) last := len(db.freeConn) - 1 db.freeConn[i] = db.freeConn[last] db.freeConn[last] = nil db.freeConn = db.freeConn[:last] i-- } } db.maxLifetimeClosed += int64(len(closing)) db.mu.Unlock() for _, c := range closing { c.Close() } // 若是maxlife被重置,須要更新定時器時間 if d < minInterval { d = minInterval } t.Reset(d) } }
func (db *DB) startCleanerLocked() { if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil { db.cleanerCh = make(chan struct{}, 1) go db.connectionCleaner(db.shortestIdleTimeLocked()) // maxidle和maxlife取較小值 } } func (db *DB) connectionCleaner(d time.Duration) { const minInterval = time.Second if d < minInterval { d = minInterval } t := time.NewTimer(d) for { select { case <-t.C: case <-db.cleanerCh: // maxLifetime was changed or db was closed. } db.mu.Lock() d = db.shortestIdleTimeLocked() if db.closed || db.numOpen == 0 || d <= 0 { db.cleanerCh = nil db.mu.Unlock() return } closing := db.connectionCleanerRunLocked() db.mu.Unlock() for _, c := range closing { c.Close() } if d < minInterval { d = minInterval } t.Reset(d) } } // 對idle超時和life超時的鏈接分別收集,統一返回 func (db *DB) connectionCleanerRunLocked() (closing []*driverConn) { if db.maxLifetime > 0 { expiredSince := nowFunc().Add(-db.maxLifetime) for i := 0; i < len(db.freeConn); i++ { c := db.freeConn[i] if c.createdAt.Before(expiredSince) { closing = append(closing, c) last := len(db.freeConn) - 1 db.freeConn[i] = db.freeConn[last] db.freeConn[last] = nil db.freeConn = db.freeConn[:last] i-- } } db.maxLifetimeClosed += int64(len(closing)) } if db.maxIdleTime > 0 { expiredSince := nowFunc().Add(-db.maxIdleTime) var expiredCount int64 for i := 0; i < len(db.freeConn); i++ { c := db.freeConn[i] if db.maxIdleTime > 0 && c.returnedAt.Before(expiredSince) { closing = append(closing, c) expiredCount++ last := len(db.freeConn) - 1 db.freeConn[i] = db.freeConn[last] db.freeConn[last] = nil db.freeConn = db.freeConn[:last] i-- } } db.maxIdleTimeClosed += expiredCount } return }
1.14 和 1.15的實現邏輯基本一致,只是增長了對idle超時的判斷作了兼容
當咱們作完上面這些初始化動做後,按照咱們的習慣,一般會嘗試性鏈接下db,用來判斷鏈接參數是否正常,如用戶名密碼是否正確,但並非發送用戶請求,通常的作法是調用 db.Ping(),
func (db *DB) Ping() error { return db.PingContext(context.Background()) } func (db *DB) PingContext(ctx context.Context) error { var dc *driverConn var err error // 獲取一個可用鏈接,後面會看到同樣的邏輯,這裏先跳過細節 for i := 0; i < maxBadConnRetries; i++ { dc, err = db.conn(ctx, cachedOrNewConn) if err != driver.ErrBadConn { break } } if err == driver.ErrBadConn { dc, err = db.conn(ctx, alwaysNewConn) // db.conn 是來獲取可用鏈接的,是數據庫鏈接池較爲核心的一部分 } if err != nil { return err } // 發送ping命令 return db.pingDC(ctx, dc, dc.releaseConn) } func (db *DB) pingDC(ctx context.Context, dc *driverConn, release func(error)) error { var err error if pinger, ok := dc.ci.(driver.Pinger); ok { withLock(dc, func() { err = pinger.Ping(ctx) // 這裏須要驅動本身去實現,對應mysql來講,發送的是sql_type=14(COM_PING)的請求包 }) } release(err) // 將該鏈接放回到free池 return err }
這裏看幾個最簡單的發送sql的方法
// 沒有結果集,值返回ok/error包 func (db *DB) Exec(query string, args ...interface{}) (Result, error) {} func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {} // 返回大於0條結果集 func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {} func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {} // 預期結果集只有一行,沒有結果集Scan時報ErrNoRows,Scan結果若是有多行,只取第一行,多餘的數據行丟棄 func (db *DB) QueryRow(query string, args ...interface{}) *Row {} func (db *DB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {}
這裏有幾個注意事項:
rows1, err := db.Query("select * from t1 where a = 1」) rows2, err := db.Query("select * from t1 where a = ?", 1)
這兩條sql執行的結果是同樣的,可是底層是不同的,與不一樣驅動的具體實現略有差異。
以mysql爲例,區別在於第一個Query,實際發送了一條sql(sql_type:3),第二條Query,實際發送了兩條sql(sql_type:22 和 sql_tyep:23),先prepare,再execute,雖然說二進制協議要快些,可是每次都會發送兩條sql,第一次發送的prepare,以後只會execute一次且不會主動回收這個prepare信息。
這個接口設計之初,應該就是按照prepare+execute的思想設計的,當佔位符參數個數爲0時,可否優化直接發送一條sql,要看底層的驅動接口是否支持,換言之,prepare+execute
接下來,以Query爲例,看下具體的實現流程
func (db *DB) Query(query string, args ...interface{}) (*Rows, error) { return db.QueryContext(context.Background(), query, args...) } func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) { var rows *Rows var err error // 執行query,優先從鏈接池獲取鏈接,若是獲取到badconn(以及關閉的鏈接),重試,最多重試maxBadConnRetries(2)次 for i := 0; i < maxBadConnRetries; i++ { rows, err = db.query(ctx, query, args, cachedOrNewConn) if err != driver.ErrBadConn { break } } // 必定建立新的鏈接執行query if err == driver.ErrBadConn { return db.query(ctx, query, args, alwaysNewConn) } return rows, err } func (db *DB) query(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) { // 獲取鏈接 dc, err := db.conn(ctx, strategy) if err != nil { return nil, err } // 使用獲取的鏈接執行查詢 return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args) }
能夠發現,執行一條普通sql,須要兩步,第一步,獲取鏈接(db.conn),第二步,執行查詢(db.queryDC)
// 提供了兩種獲取鏈接的策略,alwaysNewConn & cachedOrNewConn,字面意思,老是新建 & 優先複用free鏈接
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) { // 全局加鎖 這裏有個鏈接池的大鎖,須要注意 db.mu.Lock() if db.closed { db.mu.Unlock() return nil, errDBClosed } // context 超時檢測 select { default: case <-ctx.Done(): db.mu.Unlock() return nil, ctx.Err() } lifetime := db.maxLifetime // 優先從free池中獲取鏈接 numFree := len(db.freeConn) if strategy == cachedOrNewConn && numFree > 0 { // 取第一個free鏈接 conn := db.freeConn[0] // 切片拷貝 copy(db.freeConn, db.freeConn[1:]) // 調整切片長度 db.freeConn = db.freeConn[:numFree-1] conn.inUse = true db.mu.Unlock() // 檢查鏈接是否超時,超時則返回錯誤 if conn.expired(lifetime) { conn.Close() return nil, driver.ErrBadConn } // 對鏈接狀態進行重置,一般是使用過的鏈接須要重置,避免鏈接已經處於不可用狀態 if err := conn.resetSession(ctx); err == driver.ErrBadConn { conn.Close() return nil, driver.ErrBadConn } return conn, nil } // 已經沒有free鏈接,或者策略要求建立一個新鏈接 // 當前打開的鏈接已經達到了容許打開鏈接數的上限,須要阻塞等待 if db.maxOpen > 0 && db.numOpen >= db.maxOpen { // Make the connRequest channel. It's buffered so that the // connectionOpener doesn't block while waiting for the req to be read. // 創建一個惟一key和請求鏈接connRequest channel的映射 req := make(chan connRequest, 1) reqKey := db.nextRequestKeyLocked() db.connRequests[reqKey] = req db.waitCount++ db.mu.Unlock() waitStart := time.Now() // Timeout the connection request with the context. select { // 若是超時,從map中刪除該key,記錄統計信息,並檢查鏈接是否已經就緒 case <-ctx.Done(): // Remove the connection request and ensure no value has been sent // on it after removing. db.mu.Lock() delete(db.connRequests, reqKey) db.mu.Unlock() atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart))) // 若是已經生成了可用鏈接,將新鏈接放回到free池中 select { default: case ret, ok := <-req: if ok && ret.conn != nil { db.putConn(ret.conn, ret.err, false) } } return nil, ctx.Err() case ret, ok := <-req: atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart))) if !ok { return nil, errDBClosed } // Only check if the connection is expired if the strategy is cachedOrNewConns. // If we require a new connection, just re-use the connection without looking // at the expiry time. If it is expired, it will be checked when it is placed // back into the connection pool. // This prioritizes giving a valid connection to a client over the exact connection // lifetime, which could expire exactly after this point anyway. // 對cachedOrNewConn策略的鏈接請求,須要判斷鏈接是否過時 // 若是是請求新鏈接,則不作判斷,等鏈接被放回free池中時再回收 if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) { ret.conn.Close() return nil, driver.ErrBadConn } if ret.conn == nil { return nil, ret.err } // Reset the session if required. if err := ret.conn.resetSession(ctx); err == driver.ErrBadConn { ret.conn.Close() return nil, driver.ErrBadConn } return ret.conn, ret.err } } // 因爲未達到鏈接數上限,直接建立新鏈接 db.numOpen++ // optimistically db.mu.Unlock() ci, err := db.connector.Connect(ctx) if err != nil { db.mu.Lock() db.numOpen-- // correct for earlier optimism db.maybeOpenNewConnections() db.mu.Unlock() return nil, err } db.mu.Lock() dc := &driverConn{ db: db, createdAt: nowFunc(), ci: ci, inUse: true, } db.addDepLocked(dc, dc) db.mu.Unlock() return dc, nil }
綜上,當咱們向鏈接池申請鏈接時,
// ctx 是調用sql設置的上下文 // txctx 是事務的上下文,若是有 // releaseConn 上層傳遞的函數句柄,鏈接使用完後,將該鏈接放回到鏈接池 func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) { queryerCtx, ok := dc.ci.(driver.QueryerContext) var queryer driver.Queryer if !ok { queryer, ok = dc.ci.(driver.Queryer) } if ok { var nvdargs []driver.NamedValue var rowsi driver.Rows var err error withLock(dc, func() { nvdargs, err = driverArgsConnLocked(dc.ci, nil, args) if err != nil { return } rowsi, err = ctxDriverQuery(ctx, queryerCtx, queryer, query, nvdargs) }) // err要麼爲nil,要麼爲ErrSkip之外的其餘錯誤 // ErrSkip 一般爲某些可選接口不存在,能夠嘗試其餘接口 if err != driver.ErrSkip { if err != nil { releaseConn(err) return nil, err } // err != nil // 數據庫鏈接的全部權轉交給了rows,rows須要主動Close,以將該鏈接放回到free鏈接池中 rows := &Rows{ dc: dc, releaseConn: releaseConn, rowsi: rowsi, } // 經過context,當收到上層事件或者事務關閉的消息,rows可以自動調用Close釋放鏈接 rows.initContextClose(ctx, txctx) return rows, nil } } // prepare var si driver.Stmt var err error withLock(dc, func() { si, err = ctxDriverPrepare(ctx, dc.ci, query) }) if err != nil { releaseConn(err) return nil, err } // execute ds := &driverStmt{Locker: dc, si: si} rowsi, err := rowsiFromStatement(ctx, dc.ci, ds, args...) if err != nil { ds.Close() releaseConn(err) return nil, err } // Note: ownership of ci passes to the *Rows, to be freed // with releaseConn. rows := &Rows{ dc: dc, releaseConn: releaseConn, rowsi: rowsi, closeStmt: ds, } // 同上 rows.initContextClose(ctx, txctx) return rows, nil }
能夠發現,在sql包這一層,已經作好了全部的鏈接管理的動做,具體的收發包/包協議邏輯給了不一樣的驅動本身實現,當執行完查詢後,鏈接的全部權轉交給了rows對象,意味着須要rows主動調用 Close() 函數,纔會將當前使用的鏈接放回鏈接池中去。
一樣的,QueryRow() 和 Query() 其實底層是用的一套方法,返回值也僅僅是多包了一層
func (db *DB) QueryRow(query string, args ...interface{}) *Row { return db.QueryRowContext(context.Background(), query, args...) } func (db *DB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row { rows, err := db.QueryContext(ctx, query, args...) return &Row{rows: rows, err: err} } // Row 和 Rows 的關係 type Row struct { // One of these two will be non-nil: err error // deferred error for easy chaining rows *Rows }
細心的話,可以發現 Row 僅僅提供了 Scan 一個方法,甚至 Close() 都沒有,相比 Rows,看着又些單薄,那如何釋放鏈接呢?
在 Row 的 Scan() 方法裏,會從rows讀取第一條數據,在最後,調用了rows的Close() 方法
func (r *Row) Scan(dest ...interface{}) error { if r.err != nil { return r.err } defer r.rows.Close() for _, dp := range dest { if _, ok := dp.(*RawBytes); ok { return errors.New("sql: RawBytes isn't allowed on Row.Scan") } } if !r.rows.Next() { if err := r.rows.Err(); err != nil { return err } return ErrNoRows } err := r.rows.Scan(dest...) if err != nil { return err } // Make sure the query can be processed to completion with no errors. return r.rows.Close() }
意味着,當咱們使用 QueryRow() 時,必須使用row.Scan( ) 來獲取結果,不然該鏈接就不會放回鏈接池中去。
Exec 因爲不須要結果集,所以,對鏈接的release就不像前兩個那麼麻煩,除此以外的處理流程基本同樣。
func (db *DB) execDC(ctx context.Context, dc *driverConn, release func(error), query string, args []interface{}) (res Result, err error) { // 調用 Exec 函數就不須要額外關心鏈接的release,在函數結束以前就放回free池中 defer func() { release(err) }() execerCtx, ok := dc.ci.(driver.ExecerContext) var execer driver.Execer if !ok { execer, ok = dc.ci.(driver.Execer) } // 和Query同樣,若是驅動有實現這兩個接口,就直接調用,不然由sql包主動觸發調用prepare+execute if ok { var nvdargs []driver.NamedValue var resi driver.Result withLock(dc, func() { nvdargs, err = driverArgsConnLocked(dc.ci, nil, args) if err != nil { return } resi, err = ctxDriverExec(ctx, execerCtx, execer, query, nvdargs) }) if err != driver.ErrSkip { if err != nil { return nil, err } return driverResult{dc, resi}, nil } } var si driver.Stmt withLock(dc, func() { si, err = ctxDriverPrepare(ctx, dc.ci, query) }) if err != nil { return nil, err } ds := &driverStmt{Locker: dc, si: si} defer ds.Close() // 從 statement 中保存結果 return resultFromStatement(ctx, dc.ci, ds, args...) }
上面提到,直接使用佔位符的方式來執行二進制sql,實際每次會發送兩條sql,並不能提升執行效率,那statement的正確執行方式是什麼呢?
stmt, err := db.Prepare("select * from t1 where a = ?」) // prepare,sql_type=22 if err != nil { return } _, err = stmt.Exec(1) // 第一次執行,sql_type=23 if err != nil { return } rows, err := stmt.Query(1) // 第二次執行,鏈接全部權轉交給rows,sql_type=23 if err != nil { return } _ = rows.Close() // 歸還鏈接的全部權 _ = stmt.Close() // sql_type=25
咱們知道,db是一個鏈接池對象,這裏prepare只須要顯示調用一次,以後stmt在執行時,若是獲取到了新的鏈接或者沒有執行過prepare的鏈接,那麼它會首先調用prepare,以後再去執行execute,所以,咱們無需擔憂是否會在一個沒有prepare過的鏈接上execute。
一樣,stmt在調用Close()時,會對全部鏈接上都執行close,關閉掉這個stmt,所以,關閉以前,要保證這個stmt不會再被執行。
前面提到,咱們鏈接執行完一次普通查詢,就須要及時放回到freeConn鏈接池中,中間鏈接的擁有權雖然會轉移,但最終都須要被回收,其實,開啓事務的請求也相似,會在事務提交或回滾後釋放鏈接。鏈接釋放的方法從上層不斷向下傳遞,全部可能擁有鏈接全部權的對象,均可能接受到該釋放鏈接到方法。
// 用來將使用完的鏈接放回到free鏈接池中 func (dc *driverConn) releaseConn(err error) { dc.db.putConn(dc, err, true) } func (db *DB) putConn(dc *driverConn, err error, resetSession bool) { // 檢查鏈接是否還能複用 if err != driver.ErrBadConn { if !dc.validateConnection(resetSession) { err = driver.ErrBadConn } } // debugGetPut 是測試信息 db.mu.Lock() if !dc.inUse { db.mu.Unlock() if debugGetPut { fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc]) } panic("sql: connection returned that was never out") } if err != driver.ErrBadConn && dc.expired(db.maxLifetime) { err = driver.ErrBadConn } if debugGetPut { db.lastPut[dc] = stack() } dc.inUse = false // 在這個鏈接上註冊的一些statement的關閉函數 for _, fn := range dc.onPut { fn() } dc.onPut = nil // 若是當前鏈接已經不可用,意味着可能會有新的鏈接請求,調用maybeOpenNewConnections進行檢測 if err == driver.ErrBadConn { // Don't reuse bad connections. // Since the conn is considered bad and is being discarded, treat it // as closed. Don't decrement the open count here, finalClose will // take care of that. db.maybeOpenNewConnections() db.mu.Unlock() dc.Close() return } // hook 的一個函數,用於測試,默認爲nil if putConnHook != nil { putConnHook(db, dc) } added := db.putConnDBLocked(dc, nil) db.mu.Unlock() if !added { dc.Close() return } }
對鏈接的管理,主要包括鏈接的申請,鏈接的回收及複用,異步釋放超時的鏈接。
鏈接管理的整個流程以下
經過前面這些內容,可以發現,在不開啓事務的狀況下,鏈接完成一筆請求,回被放回到free池裏去,因此哪怕連續執行兩條select,也有可能用的不是同一個實際的數據庫鏈接,某些特殊場景,好比咱們執行完存儲過程,想要select輸出型結果時,這裏就不知足要求。
簡化下需求,實際上是咱們想要長時間佔用一個鏈接,開啓事務是一種解決方案,不過額外引入事務,可能會形成鎖的延遲釋放(以mysql兩階段鎖爲例), 這裏能夠用Context方法來實現,用法舉例
{ var a int ctx := context.Background() cn, err := db.Conn(ctx) // 綁定一個鏈接 if err != nil { return } // 執行第一次查詢,將鏈接全部權轉交給rows1 rows1, err := cn.QueryContext(ctx, "select * from t1") if err != nil { return } _ = rows1.Scan(&a) _ = rows1.Close() // rows1 close,將鏈接全部權交給cn // 執行第二次查詢,將鏈接全部權轉交給rows2 rows2, err = cn.QueryContext(ctx, "select * from t1") if err != nil { return } _ = rows2.Scan(&a) _ = rows2.Close() // rows1 close,將鏈接全部權交給cn // cn close,鏈接回收,放回free隊列 _ = cn.Close() }
關於db.Conn( ) 返回的sql.Conn對象,須要和driver.Conn 作區分,sql.Conn 是對driverConn的再一次封裝,是爲裏提供連續的單個數據庫鏈接,driver.Conn 是不一樣驅動要實現的接口
// Conn represents a single database connection rather than a pool of database // connections. Prefer running queries from DB unless there is a specific // need for a continuous single database connection. // // A Conn must call Close to return the connection to the database pool // and may do so concurrently with a running query. // // After a call to Close, all operations on the // connection fail with ErrConnDone. type Conn struct { db *DB // closemu prevents the connection from closing while there // is an active query. It is held for read during queries // and exclusively during close. closemu sync.RWMutex // dc is owned until close, at which point // it's returned to the connection pool. dc *driverConn // done transitions from 0 to 1 exactly once, on close. // Once done, all operations fail with ErrConnDone. // Use atomic operations on value when checking value. done int32 }
因爲mysql協議是同步的,所以,當客戶端遊大量的併發請求,可是鏈接數要小於併發數的狀況下,是會有一部分請求被阻塞,等待其它請求釋放鏈接,在某些場景或使用不當的狀況下,這裏也可能會成爲瓶頸。不過庫中並無詳細記錄每一筆請求的鏈接等待時間,只提供了累計的等待時間之和,以及其它的監控指標,在定位問題時能夠用作參考。
庫提供了 db.Stats( ) 方法,會從db對象中獲取全部的監控指標,並生成對象 DBStats 對象
func (db *DB) Stats() DBStats { wait := atomic.LoadInt64(&db.waitDuration) db.mu.Lock() defer db.mu.Unlock() stats := DBStats{ MaxOpenConnections: db.maxOpen, Idle: len(db.freeConn), OpenConnections: db.numOpen, InUse: db.numOpen - len(db.freeConn), WaitCount: db.waitCount, WaitDuration: time.Duration(wait), MaxIdleClosed: db.maxIdleClosed, MaxLifetimeClosed: db.maxLifetimeClosed, } return stats }
一個簡單的使用例子
func monitorConn(db *sql.DB) { go func(db *sql.DB) { mt := time.NewTicker(monitorDbInterval * time.Second) for { select { case <-mt.C: stat := db.Stats() logutil.Errorf("monitor db conn(%p): maxopen(%d), open(%d), use(%d), idle(%d), "+ "wait(%d), idleClose(%d), lifeClose(%d), totalWait(%v)", db, stat.MaxOpenConnections, stat.OpenConnections, stat.InUse, stat.Idle, stat.WaitCount, stat.MaxIdleClosed, stat.MaxLifetimeClosed, stat.WaitDuration) } } }(db) }
須要注意的是,1.15 以前,對 stat.MaxLifetimeClosed 對象統計會有異常,1.15 以後作了修復。
【完】