數據庫鏈接池是由客戶端維護的存放數據庫鏈接的池子,鏈接被維護在池子裏面,誰用誰來取,目的是下降頻繁的建立和關閉鏈接的開銷。css
關於如何理解數據庫鏈接,你們能夠藉助這個TCP編程的Demo來理解。mysql
爲了便於理解,能夠MySQL-Server的鏈接池想象成就是這個簡單的Tcp-Serverlinux
func main() { // 1. 監聽端口 2.accept鏈接 3.開goroutine處理鏈接 listen, err := net.Listen("tcp", "0.0.0.0:9090") if err != nil { fmt.Printf("error : %v", err) return } for{ conn, err := listen.Accept() if err != nil { fmt.Printf("Fail listen.Accept : %v", err) continue } go ProcessConn(conn) } } // 處理網絡請求 func ProcessConn(conn net.Conn) { // defer conn.Close() for { bt,err:= coder.Decode(conn) if err != nil { fmt.Printf("Fail to decode error [%v]", err) return } s := string(bt) fmt.Printf("Read from conn:[%v]\n",s) } }
對於咱們如今看的sql包下的鏈接池,能夠簡化認爲它就是以下的tcp-clientgit
conn, err := net.Dial("tcp", ":9090") defer conn.Close() if err != nil { fmt.Printf("error : %v", err) return } // 將數據編碼併發送出去 coder.Encode(conn,"hi server i am here"); time.Sleep(time.Second*10
整體的思路能夠認爲,程序啓動的時候,根據咱們的配置,sql包中的DB會爲咱們提早建立幾條這樣的conn,而後維護起來,不close()掉,咱們想使用的時候問他拿便可。github
至於爲何是這個tcp的demo呢?由於數據庫鏈接的創建底層依賴的是tcp鏈接。基於tcp鏈接的基礎上實現客戶端和服務端數據的傳輸,再往上封裝一層mysql的握手、鑑權、交互協議對數據包進行解析、反解析,進而跑通整個流程。golang
_ "github.com/go-sql-driver/mysql"
driver/driver.go :定義了實現數據庫驅動所須要的接口,這些接口由sql包和具體的驅動包來實現sql
driver/types.go:定義了數據類型別名和轉換數據庫
convert:rows的scan編程
sql.go: 關於SQL數據庫的一些通用的接口、類型。包括:鏈接池、數據類型、鏈接、事物、statement緩存
import "github.com/go-sql-driver/mysql」 // 具體的驅動包 import "database/sql" // 初始化鏈接 func initDB() (err error) { db, err = sql.Open("mysql", "root:root@tcp(127.0.0.1:3306)/test") if err != nil { panic(err) } // todo 不要在這裏關閉它, 函數一結束,defer就執行了 // defer db.Close() err = db.Ping() if err != nil { return err } return nil }
/** DB是表明零個或多個基礎鏈接池的數據庫句柄。 對於多個goroutine併發使用是安全的。 sql包會自動建立並釋放鏈接。 它還維護空閒鏈接的空閒池。 若是數據庫具備每一個鏈接狀態的概念,則能夠在事務(Tx)或鏈接(Conn)中可靠地觀察到這種狀態。 調用DB.Begin以後,返回的Tx將綁定到單個鏈接。 在事務上調用Commit或Rollback後,該事務的鏈接將返回到DB的空閒鏈接池。 池大小能夠經過SetMaxIdleConns控制。 */ type DB struct { // Atomic access only. At top of struct to prevent mis-alignment // on 32-bit platforms. Of type time.Duration. // 統計使用:等待新的鏈接所須要的總時間 waitDuration int64 // Total time waited for new connections. // 由具體的數據庫驅動實現的 connector connector driver.Connector // numClosed is an atomic counter which represents a total number of // closed connections. Stmt.openStmt checks it before cleaning closed // connections in Stmt.css. // 關閉的鏈接數 numClosed uint64 mu sync.Mutex // protects following fields // 鏈接池,在go中,鏈接的封裝結構體是:driverConn freeConn []*driverConn // 鏈接請求的map, key是自增的int64類型的數,用於惟一標示這個請求分配的 connRequests map[uint64]chan connRequest // 相似於binlog中的next trx_ix ,下一個事物的id nextRequest uint64 // Next key to use in connRequests. // 已經打開,或者等待打開的鏈接數 numOpen int // number of opened and pending open connections // Used to signal the need for new connections // a goroutine running connectionOpener() reads on this chan and // maybeOpenNewConnections sends on the chan (one send per needed connection) // It is closed during db.Close(). The close tells the connectionOpener // goroutine to exit. // 他是個chan,用於通知connectionOpener()協程應該打開新的鏈接了。 openerCh chan struct{} // 他是個chan,用於通知connectionResetter協程:重製鏈接的狀態。 resetterCh chan *driverConn closed bool // 依賴,key是鏈接、statement dep map[finalCloser]depSet lastPut map[*driverConn]string // stacktrace of last conn's put; debug only // 鏈接池的大小,0意味着使用默認的大小2, 小於0表示不使用鏈接池 maxIdle int // zero means defaultMaxIdleConns; negative means 0 // 最大打開的鏈接數,包含鏈接池中的鏈接和鏈接池以外的空閒鏈接, 0表示不作限制 maxOpen int // <= 0 means unlimited // 鏈接被重用的時間,設置爲0表示一直能夠被重用。 maxLifetime time.Duration // maximum amount of time a connection may be reused // 他是個chan,用於通知connectionCleaner協程去請求過時的鏈接 // 當有設置最大存活時間時纔會生效 cleanerCh chan struct{} // 等待的鏈接總數,當maxIdle爲0時,waitCount也會一直爲 // 由於maxIdle爲0,每個請求過來都會打開一條新的鏈接。 waitCount int64 // Total number of connections waited for. // 釋放鏈接時,由於鏈接池已滿而關閉的鏈接總數 // 若是maxLifeTime沒有被設置,maxIdleClosed爲0 maxIdleClosed int64 // Total number of connections closed due to idle. // 由於超過了最大鏈接時間,而被關閉的鏈接總數 maxLifetimeClosed int64 // Total number of connections closed due to max free limit. // 當DB被關閉時,關閉connection opener和session resetter這兩個協程 stop func() // stop cancels the connection opener and the session resetter. }
鏈接的封裝結構體:driverConn
// driverConn wraps a driver.Conn with a mutex, to // be held during all calls into the Conn. (including any calls onto // interfaces returned via that Conn, such as calls on Tx, Stmt, // Result, Rows) /** driverConn使用互斥鎖包裝Conn包裝 */ type driverConn struct { // 持有對整個數據庫的抽象結構體 db *DB createdAt time.Time sync.Mutex // guards following // 對應於具體的鏈接,eg.mysqlConn ci driver.Conn // 標記當前鏈接的狀態:當前鏈接是否已經關閉 closed bool // 標記當前鏈接的狀態:當前鏈接是否最終關閉,包裝 ci.Close has been called finalClosed bool // ci.Close has been called // 在這些鏈接上打開的statement openStmt map[*driverStmt]bool // connectionResetter返回的結果 lastErr error // lastError captures the result of the session resetter. // guarded by db.mu // 鏈接是否被佔用了 inUse bool // 在歸還鏈接時須要運行的代碼。在noteUnusedDriverStatement中添加 onPut []func() // code (with db.mu held) run when conn is next returned dbmuClosed bool // same as closed, but guarded by db.mu, for removeClosedStmtLocked }
具體的鏈接: driver包下的Conn以下,是個接口,須要被具體的實現。
// Conn is assumed to be stateful. type Conn interface { // Prepare returns a prepared statement, bound to this connection. Prepare(query string) (Stmt, error) // Close invalidates and potentially stops any current // prepared statements and transactions, marking this // connection as no longer in use. // // Because the sql package maintains a free pool of // connections and only calls Close when there's a surplus of // idle connections, it shouldn't be necessary for drivers to // do their own connection caching. Close() error // Begin starts and returns a new transaction. // // Deprecated: Drivers should implement ConnBeginTx instead (or additionally). Begin() (Tx, error) }
在golang中,要想獲取鏈接,通常咱們都得經過下面這段代碼獲取到DB的封裝結構體實例。
經過上面的三個結構體能夠看出 DB 、driverConn、Conn的關係以下:
因此咱們的代碼通常長成下面這樣,先獲取一個DB結構體的實例,DB結果體中有維護鏈接池、以及和建立鏈接,關閉鏈接協程通訊的channel,已經各類配置參數。
上圖中淺藍色部分的 freeConn就是空閒鏈接池,裏面的driver包下的Conn interface就是具體的鏈接。
/** * MySQL鏈接相關的邏輯 */ type Conenctor struct { BaseInfo BaseInfo DB *sql.DB } func (c *Conenctor) Open() { // 讀取配置 c.loadConfig() dataSource := c.BaseInfo.RootUserName + ":" + c.BaseInfo.RootPassword + "@tcp(" + c.BaseInfo.Addr + ":" + c.BaseInfo.Port + ")/" + c.BaseInfo.DBName db, Err := sql.Open("mysql", dataSource) if Err != nil { common.Error("Fail to opendb dataSource:[%v] Err:[%v]", dataSource, Err.Error()) return } db.SetMaxOpenConns(500) db.SetMaxIdleConns(200) c.DB = db Err = db.Ping() if Err != nil { fmt.Printf("Fail to Ping DB Err :[%v]", Err.Error()) return } }
好比咱們本身寫代碼時,可能會搞這樣一個方法作增刪改
// 插入、更新、刪除 func (c *Conenctor) Exec(ctx context.Context, sqlText string, params ...interface{}) (qr *QueryResults) { qr = &QueryResults{} result, err := c.DB.ExecContext(ctx, sqlText, params...) defer HandleException() if err != nil { qr.EffectRow = 0 qr.Err = err common.Error("Fail to exec qurey sqlText:[%v] params:[%v] err:[%v]", sqlText, params, err) return } qr.EffectRow, _ = result.RowsAffected() qr.LastInsertId, _ = result.LastInsertId() return }
主要是使用DB.ExecContext()
執行SQL,獲取返回值。
ctx是業務代碼傳入的上線文,一般是作超時限制使用。
其實這裏並非嚴格意義上的去執行sql,它實際上是經過和MySQL-Server之間創建的鏈接將sql+params發往MySQL-Server去解析和執行。
進入DB.ExecContext()
主要邏輯以下:exec()
方法的主要功能是:獲取鏈接,發送sql和參數。
func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) { var res Result var err error for i := 0; i < maxBadConnRetries; i++ { res, err = db.exec(ctx, query, args, cachedOrNewConn) if err != driver.ErrBadConn { break } } if err == driver.ErrBadConn { return db.exec(ctx, query, args, alwaysNewConn) } return res, err }
跟進exec()
--> db.conn(ctx, strategy)
func (db *DB) exec(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (Result, error) { // 這個strategy就是上一步咱們告訴他是建立新鏈接,仍是優先從緩存池中獲取鏈接。 dc, err := db.conn(ctx, strategy) .. }
跟進conn()
方法
conn方法的返回值是driverConn,也就是咱們上面說的數據庫鏈接,做用就是說,跟據傳遞進來的獲取策略,獲取數據庫鏈接,若是正常就返回獲取到的數據庫鏈接,異常就返回錯誤err
這張圖是conn獲取鏈接的流程圖,根據下面這段代碼畫出來的,註釋有寫在代碼上
// conn returns a newly-opened or cached *driverConn. func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) { db.mu.Lock() // 先監測db是否關閉了 if db.closed { db.mu.Unlock() // DB都關閉了,直接返回DBClosed錯誤,不必再去獲取鏈接。 return nil, errDBClosed } // 檢查用戶傳遞進來的Context是否過時了 select { default: // 若是用戶那邊使用了ctx.Done(),毫無疑問,會進入這個case中,返回Ctx錯誤 case <-ctx.Done(): db.mu.Unlock() return nil, ctx.Err() } // 鏈接被重用的時間,若是爲0,表示 理論上這個鏈接永不過時,一直能夠被使用 lifetime := db.maxLifetime // 看一下空閒鏈接池(他是個slice)是不是還有空閒的鏈接 numFree := len(db.freeConn) // 若是獲取策略是優先從鏈接池中獲取,而且鏈接池中確實存在空閒的鏈接,就從freeConn中取鏈接使用。 if strategy == cachedOrNewConn && numFree > 0 { // 假設空閒池還剩下五條鏈接:【1,2,3,4,5】 // 取出第一條 conn == 1 conn := db.freeConn[0] // 切片的拷貝,實現remove掉第一個鏈接的目的。 copy(db.freeConn, db.freeConn[1:]) // 若是db.freeConn[1:]會致使freeConn變小,因此這裏是 db.freeConn = db.freeConn[:numFree-1] db.freeConn = db.freeConn[:numFree-1] // 這裏獲取的鏈接是driverConn,它實際上是對真實鏈接,driver.Conn的封裝。 // 在driver.Conn的基礎上多一層封裝能夠實如今driver.Conn的基礎上,加持上狀態信息,以下 conn.inUse = true db.mu.Unlock() // 檢查是否過時 if conn.expired(lifetime) { conn.Close() return nil, driver.ErrBadConn } // Lock around reading lastErr to ensure the session resetter finished. // 加鎖處理,確保這個conn不曾被標記爲 lastErr狀態。 // 一旦被標記爲這個狀態說明 ConnectionRestter協程在重置conn的狀態時發生了錯誤。也就是這個鏈接其實已經壞掉了,不可以使用。 conn.Lock() err := conn.lastErr conn.Unlock() // 若是檢測到這種錯誤,driver.ErrBadConn 表示鏈接不可用,關閉鏈接,返回錯誤。 if err == driver.ErrBadConn { conn.Close() return nil, driver.ErrBadConn } return conn, nil } // Out of free connections or we were asked not to use one. If we're not // allowed to open any more connections, make a request and wait. // db.maxOpen > 0 表示當前DB實例容許打開鏈接 // db.numOpen >= db.maxOpen表示當前DB能打開的鏈接數,已經大於它能打開的最大鏈接數,就構建一個request,而後等待獲取鏈接 if db.maxOpen > 0 && db.numOpen >= db.maxOpen { // Make the connRequest channel. It's buffered so that the // connectionOpener doesn't block while waiting for the req to be read. // 構建connRequest這個channel,緩存大小是1 // 用於告訴connectionOpener協程,須要打開一個新的鏈接。 req := make(chan connRequest, 1) /** nextRequestKeyLocked函數以下: func (db *DB) nextRequestKeyLocked() uint64 { next := db.nextRequest db.nextRequest++ return next } 主要做用就是將nextRequest+1, 至於這個nextRequest的做用咱們前面也說過了,它至關於binlog中的next_trx下一個事物的事物id。 言外之意是這個nextRequest遞增的(由於這段代碼被加了lock)。 看以下的代碼中,將這個自增後的nextRequest當返回值返回出去。 而後緊接着將它做爲map的key 至於這個map嘛: 在本文一開始的位置,咱們介紹了DB結構體有這樣一個屬性,鏈接請求的map, key是自增的int64類型的數, 用於惟一標示這個請求分配的 connRequests map[uint64]chan connRequest */ reqKey := db.nextRequestKeyLocked() // 將這個第n個請求對應channel緩存起來,開始等待有合適的機會分配給他鏈接 db.connRequests[reqKey] = req // 等待數增長,解鎖 db.waitCount++ db.mu.Unlock() waitStart := time.Now() // Timeout the connection request with the context. // 進入下面的slice中 select { // 若是客戶端傳入的上下文超時了,進入這個case case <-ctx.Done(): // Remove the connection request and ensure no value has been sent // on it after removing. // 當上下文超時時,表示上層的客戶端代碼想斷開,意味着在這個方法收到這個信號後須要退出了 // 這裏將db的connRequests中的reqKey清除,防止還給他分配一個鏈接。 db.mu.Lock() delete(db.connRequests, reqKey) db.mu.Unlock() atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart))) // 這裏也會嘗試從req channel中獲取一下有沒有可用的鏈接 // 若是有的話執行 db.putConn(ret.conn, ret.err, false) ,目的是釋放掉這個鏈接 select { default: case ret, ok := <-req: if ok && ret.conn != nil { // 看到這裏只須要知道他是用來釋放鏈接的就ok,繼續往下看,稍後再殺回來 db.putConn(ret.conn, ret.err, false) } } //返回ctx異常。 return nil, ctx.Err() // 嘗試從 reqchannel 中取出鏈接 case ret, ok := <-req: atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart))) // 處理錯誤 if !ok { return nil, errDBClosed } // 檢測鏈接是否過時了,前面也提到過,DB實例有維護一個參數,maxLifeTime,0表示永不過時 if ret.err == nil && ret.conn.expired(lifetime) { ret.conn.Close() return nil, driver.ErrBadConn } // 健壯性檢查 if ret.conn == nil { return nil, ret.err } // Lock around reading lastErr to ensure the session resetter finished. // 檢查鏈接是否可用 ret.conn.Lock() err := ret.conn.lastErr ret.conn.Unlock() if err == driver.ErrBadConn { ret.conn.Close() return nil, driver.ErrBadConn } return ret.conn, ret.err } } // 代碼能運行到這裏說明上面的if條件沒有被命中。 // 換句話說,來到這裏說明具有以下條件 // 1:當前DB實例的空閒鏈接池中已經沒有空閒鏈接了,獲取明確指定,不從空閒池中獲取鏈接,就想新建鏈接。 // 2: 當前DB實例容許打開鏈接 // 3: DB實例目前打開的鏈接數尚未到達它能打開的最大鏈接數的上限。 // 記錄當前DB已經打開的鏈接數+1 db.numOpen++ // optimistically db.mu.Unlock() ci, err := db.connector.Connect(ctx) if err != nil { db.mu.Lock() db.numOpen-- // correct for earlier optimism db.maybeOpenNewConnections() db.mu.Unlock() return nil, err } db.mu.Lock() // 構建一個鏈接實例,並返回 dc := &driverConn{ db: db, createdAt: nowFunc(), ci: ci, inUse: true, } db.addDepLocked(dc, dc) db.mu.Unlock() return dc, nil }
鏈接被是事後是須要被釋放的
釋放鏈接的邏輯封裝在DB實例中
db.putConn(ret.conn, ret.err, false)
釋放鏈接的流程圖以下:
流程圖根據以下的代碼畫出。
方法詳細信息以下:
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) { // 釋放鏈接的操做加鎖 db.mu.Lock() // debug的信息 if !dc.inUse { if debugGetPut { fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc]) } panic("sql: connection returned that was never out") } if debugGetPut { db.lastPut[dc] = stack() } // 標記driverConn處理不可用的狀態 dc.inUse = false for _, fn := range dc.onPut { fn() } dc.onPut = nil // 本方法的入參中有參數err // 當會話獲取出這個鏈接後,發現這個鏈接過時了、或者被標記上來lastErr時,再調用這個putConn方法時,同時會將這個錯誤傳遞進來,而後在這裏判斷,當出現壞掉的鏈接時就不直接把這個鏈接放回空閒鏈接池了。 if err == driver.ErrBadConn { // Don't reuse bad connections. // Since the conn is considered bad and is being discarded, treat it // as closed. Don't decrement the open count here, finalClose will // take care of that. // 這個方法的做用以下: // 他會去判斷當前DB維護的map的容量,也就是前面提到的那種狀況:當DB容許打開鏈接,可是如今的鏈接數已經達到當前DB容許打開的最大鏈接數上限了,那麼針對接下來想要獲取鏈接的請求的處理邏輯就是,構建一個req channel,放入connRequests這個map中,表示他們正在等待鏈接的創建。 // 換句話說,這時系統時繁忙的,業務處於高峯,那麼問題來了,如今居然出現了一個壞掉的鏈接,那爲了把對業務線的影響降到最低,是否是得主動新建一個新的鏈接放到空閒鏈接池中呢? // db.maybeOpenNewConnections() 函數主要乾的就是這個事。 // 方法詳情以下 /* func (db *DB) maybeOpenNewConnections() { numRequests := len(db.connRequests) if db.maxOpen > 0 { numCanOpen := db.maxOpen - db.numOpen if numRequests > numCanOpen { numRequests = numCanOpen } } for numRequests > 0 { db.numOpen++ // optimistically numRequests-- if db.closed { return } // 它只是往這個 openerCh channel中寫入一個空的結構體,會有專門的協程負責建立鏈接 db.openerCh <- struct{}{} } } */ db.maybeOpenNewConnections() // 解鎖,關閉鏈接,返回 db.mu.Unlock() dc.Close() return } if putConnHook != nil { putConnHook(db, dc) } // 若是DB已經關閉了,標記 resetSession爲 false if db.closed { // Connections do not need to be reset if they will be closed. // Prevents writing to resetterCh after the DB has closed. // 當DB都已經關了,意味着DB裏面的鏈接池都沒有了,那固然不須要關閉鏈接池中的鏈接了~ resetSession = false } // 若是DB沒有關閉的話,進入if代碼塊 if resetSession { // 將dricerConn中的Conn驗證轉換爲driver.SessionResetter if _, resetSession = dc.ci.(driver.SessionResetter); resetSession { // 在此處鎖定driverConn,以便在鏈接重置以前不會釋放。 // 必須在將鏈接放入池以前獲取鎖,以防止在重置以前將其取出 dc.Lock() } } // 真正將鏈接放回空閒鏈接池中 // 知足connRequest或將driverConn放入空閒池並返回true或false /* func (db *DB) putConnDBLocked(dc *driverConn, err error) bool { // 檢測若是DB都關閉塊,直接返回flase if db.closed { return false } // 若是DB當前打開的鏈接數大於DB能打開的最大的鏈接數,返回false if db.maxOpen > 0 && db.numOpen > db.maxOpen { return false } //若是等待獲取鏈接的map中有存貨 if c := len(db.connRequests); c > 0 { var req chan connRequest var reqKey uint64 // 取出map中的第一個key for reqKey, req = range db.connRequests { break } // 將這個key,value再map中刪除 delete(db.connRequests, reqKey) // Remove from pending requests. // 從新標記這個鏈接是可用的狀態 if err == nil { dc.inUse = true } // 將這個鏈接放入到 req channel中,給等待鏈接到會話使用 req <- connRequest{ conn: dc, err: err, } return true // 來到這個if,說明此時沒有任何請求在等待獲取鏈接,而且沒有發生錯誤,DB也沒有關閉 } else if err == nil && !db.closed { // 比較當前空閒鏈接池的大小(默認是2) 和 freeConn空閒鏈接數的數量 // 意思是,若是空閒的鏈接超出了這個規定的閾值,空閒鏈接是須要被收回的。 if db.maxIdleConnsLocked() > len(db.freeConn) { // 收回 db.freeConn = append(db.freeConn, dc) db.startCleanerLocked() return true } // 若是空閒鏈接還沒到閾值,保留這個鏈接看成空閒鏈接 db.maxIdleClosed++ } // 收回空閒鏈接返回false return false } */ // 若是將鏈接成功放入了空閒鏈接池,或者將鏈接成功給了等待鏈接到會話使用,此處返回true // 收回空閒鏈接返回false // 代碼詳情就是在上面的這段註釋中 added := db.putConnDBLocked(dc, nil) db.mu.Unlock() // 若是 if !added { // 若是DB沒有關閉,進入if if resetSession { dc.Unlock() } dc.Close() return } // 從新校驗,若是鏈接關閉了,進入if if !resetSession { return } // 若是負責重置 conn狀態的線程阻塞住了,那麼標記這個driverConn爲lastErr select { default: // If the resetterCh is blocking then mark the connection // as bad and continue on. dc.lastErr = driver.ErrBadConn dc.Unlock() case db.resetterCh <- dc: } }
這個connectionOpener是一個工做協程,它會去嘗試消費指定的channel,負責建立數據庫鏈接,其實在前面閱讀獲取鏈接的邏輯時,有這樣的兩種狀況會阻塞等待connectionOpener來新建立鏈接:
第一種:當獲取鏈接的策略是優先從cache鏈接池中獲取出來,可是空閒鏈接池已經沒有空閒的鏈接了,首先這時DB容許打開鏈接,可是DB能打開的鏈接數已經達到了它能打開的鏈接數的上線,因此得等待有空閒鏈接出現,或者等有鏈接被釋放後,DB能當前打開的鏈接數小於了它能打開的鏈接數的最大值,這時它會被阻塞等待去嘗試建立鏈接。
第二種:獲取鏈接的策略再也不是優先從空閒緩衝池中獲取鏈接,直接明瞭的想獲取最一條新鏈接,一樣的此時DB已經打開的鏈接數大於它能打開鏈接數的上線,它會被阻塞等待建立鏈接。
func OpenDB(c driver.Connector) *DB { ctx, cancel := context.WithCancel(context.Background()) db := &DB{ connector: c, openerCh: make(chan struct{}, connectionRequestQueueSize), resetterCh: make(chan *driverConn, 50), lastPut: make(map[*driverConn]string), connRequests: make(map[uint64]chan connRequest), stop: cancel, } // 能夠看到他是在DB被實例化時開啓的。 go db.connectionOpener(ctx) go db.connectionResetter(ctx) return db }
能夠看到它一直嘗試從db的openerCh中獲取內容,並且只要獲取到了內容,就會調用方法打開鏈接。
// Runs in a separate goroutine, opens new connections when requested. func (db *DB) connectionOpener(ctx context.Context) { for { select { case <-ctx.Done(): return // here case <-db.openerCh: db.openNewConnection(ctx) } } }
往channl中投放消息的邏輯在db的mayBeOpenNewConnections中
func (db *DB) maybeOpenNewConnections() { // 經過檢查這個map的長度來決定是否往opennerCh中投放消息 numRequests := len(db.connRequests) if db.maxOpen > 0 { numCanOpen := db.maxOpen - db.numOpen if numRequests > numCanOpen { numRequests = numCanOpen } } for numRequests > 0 { db.numOpen++ // optimistically numRequests-- if db.closed { return } // 一旦執行了這一步,connectionOpener 就會監聽到去建立鏈接。 db.openerCh <- struct{}{} } }
在DB結構體中有這樣一個屬性
// 鏈接池的大小,0意味着使用默認的大小2, 小於0表示不使用鏈接池 maxIdle int // zero means defaultMaxIdleConns; negative means 0
表示空閒鏈接池默認的大小,若是它爲0,表示都沒有緩存池,也就意味着會爲全部想獲取鏈接的請求建立新的conn,這是也就不會有這個opnerCh,更不會有connectionOpener
它一樣以一條協程的形式存在,用於定時清理數據庫鏈接池中過時的鏈接
func (db *DB) startCleanerLocked() { if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil { db.cleanerCh = make(chan struct{}, 1) go db.connectionCleaner(db.maxLifetime) } }
一樣的,DB中存在一個參數:maxLifetime
它表示數據庫鏈接最大的生命時長,若是將它設置爲0,表示這個鏈接永不過時,既然全部的鏈接永不過時,就不會存在connectionCleaner去定時根據maxLifetime
來定時清理鏈接。
它的調用時機是:須要將鏈接放回到鏈接池時調用。
咱們使用獲取的鏈接的封裝結構體是driverConn,其實它是會driver包下的Conn鏈接的又一層封裝,目的是加強
driver包下的Conn的,多出來了一些狀態。當將使用完畢的鏈接放入鏈接池時,就得將這些狀態清除掉。
使用誰去清除呢?就是這個go 協程:connectionRestter
當connectionRestter碰到錯誤時,會將這個conn標記爲lastErr,鏈接使用者在使用鏈接時會先校驗conn的諸多狀態,好比出現lastErr,會返回給客戶端 badConnErr
數據庫鏈接池大大小到底設置爲多少,得根據業務流量已經數據庫所在機器的性能綜合考慮。
mysql鏈接數到配置在 my.cnf中,具體的參數是max_connections。
當業務流量異常猛烈時,極可能會出現這個問題:to many connections
對於操縱系統內核來講,當他接受到一個tcp請求就會在本地建立一個由文件系統管理的socket文件。在linux中咱們將它叫作文件句柄。
linux爲防止單一進程將系統資源所有耗費掉,會限制進程最大能打開的鏈接數爲1024,這意味着,哪怕經過改配置文件,將mysql能打開的鏈接池設置爲9999,事實上它能打開的文件數最多不會超過1024。
這個問題也好解決:
命令:設置單個進程能打開的最大鏈接數爲65535
ulimit -HSn 65535
經過命令: 查看進程被限制的使用各類資源的量
ulimit -a core file size: 進程崩潰是轉儲文件大小限制 man loaded memort 最大鎖定內存大小 open file 能打開的文件句柄數
這些變量定義在 /etc/security/limits.conf配置文件中。
狀況1: 客戶端主動斷開
若是是客戶端主動將鏈接close(), 那往合格鏈接中寫數據時會獲得ErrBadConn的錯誤,若是此時依然能夠重試,將會獲取新的鏈接。
代碼以下:
func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) { var res Result var err error for i := 0; i < maxBadConnRetries; i++ { res, err = db.exec(ctx, query, args, cachedOrNewConn) if err != driver.ErrBadConn { break } } if err == driver.ErrBadConn { return db.exec(ctx, query, args, alwaysNewConn) } return res, err }
狀況2: 服務端掛啦
由於這種數據庫鏈接底層使用的是tcp實現。(tcp自己是支持全雙工的,客戶端和服務端支持同時往對方發送數據)依賴諸如:校驗和、確認應答和序列號機制、超時重傳、鏈接管理(3次握手,4次揮手)、以及滑動窗口、流量控制、擁賽避免去實現整個數據交互的可靠性,協調。
這時客戶端拿着一條自認爲是正常的鏈接,往鏈接裏面寫數據。然鵝,另外一端端服務端已經掛了~,可是不幸的是,客戶端的tcp鏈接根本感知不到~~~。
可是當它去讀取服務端的返回數據時會遇到錯誤:unexceptBadConn EOF
設置鏈接的屬性: maxLifeTime
上面也說過了,當設置了這個屬性後,DB會開啓一條協程connectionCleaner,專門負責清理過時的鏈接。
這在必定程度上避免了服務端將鏈接斷掉後,客戶端無感知的狀況。
maxLifeTime的值到底設置多大?參考值,比數據庫的wait_timeout小一些就ok。
主動檢查鏈接的有效性。
好比在鏈接放回到空閒鏈接池前ping測試。在使用鏈接發送數據前進行連通性測試。