Golang SQL鏈接池梳理

1、如何理解數據庫鏈接

數據庫鏈接池是由客戶端維護的存放數據庫鏈接的池子,鏈接被維護在池子裏面,誰用誰來取,目的是下降頻繁的建立和關閉鏈接的開銷。css

關於如何理解數據庫鏈接,你們能夠藉助這個TCP編程的Demo來理解。mysql

爲了便於理解,能夠MySQL-Server的鏈接池想象成就是這個簡單的Tcp-Serverlinux

func main() {
	// 1. 監聽端口 2.accept鏈接 3.開goroutine處理鏈接
	listen, err := net.Listen("tcp", "0.0.0.0:9090")
	if err != nil {
		fmt.Printf("error : %v", err)
		return
	}
	for{
		conn, err := listen.Accept()
		if err != nil {
			fmt.Printf("Fail listen.Accept : %v", err)
			continue
		}
		go ProcessConn(conn)
	}
}

// 處理網絡請求
func ProcessConn(conn net.Conn) {
	// defer conn.Close()
	for  {
		bt,err:= coder.Decode(conn)
		if err != nil {
			fmt.Printf("Fail to decode error [%v]", err)
			return
		}
		s := string(bt)
		fmt.Printf("Read from conn:[%v]\n",s)
	}
}

對於咱們如今看的sql包下的鏈接池,能夠簡化認爲它就是以下的tcp-clientgit

conn, err := net.Dial("tcp", ":9090")
	defer conn.Close()
	if err != nil {
		fmt.Printf("error : %v", err)
		return
	}

	// 將數據編碼併發送出去
	coder.Encode(conn,"hi server i am here");

	time.Sleep(time.Second*10

整體的思路能夠認爲,程序啓動的時候,根據咱們的配置,sql包中的DB會爲咱們提早建立幾條這樣的conn,而後維護起來,不close()掉,咱們想使用的時候問他拿便可。github

至於爲何是這個tcp的demo呢?由於數據庫鏈接的創建底層依賴的是tcp鏈接。基於tcp鏈接的基礎上實現客戶端和服務端數據的傳輸,再往上封裝一層mysql的握手、鑑權、交互協議對數據包進行解析、反解析,進而跑通整個流程。golang

2、鏈接池的工做原理

  • 鏈接池的創建
    • 後臺系統初始化時,鏈接池會根據系統的配置創建。
    • 可是在接受客戶端請求以前,並無真正的建立鏈接。
    • 在go語言中,先註冊驅動_ "github.com/go-sql-driver/mysql"
    • 初始化DB,調用Open函數,這時其實沒有真正的去獲取鏈接,而是去獲取DB操做的數據結構。
  • 鏈接池中鏈接的使用和管理
  • 鏈接池的關閉
    • 釋放鏈接
    • 關閉鏈接的請求隊列
    • connectionOpener(負責打開鏈接的協程)
    • connectionResetter(重製鏈接狀態的協程)
    • connectionCleaner(按期清理過時鏈接的協程)

3、database/sql包結構

image-20200719230058101

driver/driver.go :定義了實現數據庫驅動所須要的接口,這些接口由sql包和具體的驅動包來實現sql

driver/types.go:定義了數據類型別名和轉換數據庫

convert:rows的scan編程

sql.go: 關於SQL數據庫的一些通用的接口、類型。包括:鏈接池、數據類型、鏈接、事物、statement緩存

import "github.com/go-sql-driver/mysql」 // 具體的驅動包
import "database/sql"

// 初始化鏈接
func initDB() (err error) {
	db, err = sql.Open("mysql", "root:root@tcp(127.0.0.1:3306)/test")
	if err != nil {
		panic(err)
	}
	// todo 不要在這裏關閉它, 函數一結束,defer就執行了
	// defer db.Close()
	err = db.Ping()
	if err != nil {
		return err
	}
	return nil
}

4、三個重要的結構體

4.一、DB

/**
	DB是表明零個或多個基礎鏈接池的數據庫句柄。 對於多個goroutine併發使用是安全的。
	sql包會自動建立並釋放鏈接。 它還維護空閒鏈接的空閒池。 
	若是數據庫具備每一個鏈接狀態的概念,則能夠在事務(Tx)或鏈接(Conn)中可靠地觀察到這種狀態。
  調用DB.Begin以後,返回的Tx將綁定到單個鏈接。 
  在事務上調用Commit或Rollback後,該事務的鏈接將返回到DB的空閒鏈接池。
  池大小能夠經過SetMaxIdleConns控制。
*/
type DB struct {
	// Atomic access only. At top of struct to prevent mis-alignment
	// on 32-bit platforms. Of type time.Duration.
  // 統計使用:等待新的鏈接所須要的總時間
	waitDuration int64 // Total time waited for new connections.

  // 由具體的數據庫驅動實現的 connector
	connector driver.Connector
  
	// numClosed is an atomic counter which represents a total number of
	// closed connections. Stmt.openStmt checks it before cleaning closed
	// connections in Stmt.css.
  // 關閉的鏈接數
	numClosed uint64

	mu           sync.Mutex // protects following fields
  
  // 鏈接池,在go中,鏈接的封裝結構體是:driverConn
	freeConn     []*driverConn
  
  // 鏈接請求的map, key是自增的int64類型的數,用於惟一標示這個請求分配的
	connRequests map[uint64]chan connRequest
  
  // 相似於binlog中的next trx_ix ,下一個事物的id
	nextRequest  uint64 // Next key to use in connRequests.
  
  // 已經打開,或者等待打開的鏈接數
	numOpen      int    // number of opened and pending open connections
  
	// Used to signal the need for new connections
	// a goroutine running connectionOpener() reads on this chan and
	// maybeOpenNewConnections sends on the chan (one send per needed connection)
	// It is closed during db.Close(). The close tells the connectionOpener
	// goroutine to exit.
  // 他是個chan,用於通知connectionOpener()協程應該打開新的鏈接了。
	openerCh          chan struct{}
  
  // 他是個chan,用於通知connectionResetter協程:重製鏈接的狀態。
	resetterCh        chan *driverConn
  
	closed            bool
  
  // 依賴,key是鏈接、statement
	dep               map[finalCloser]depSet
	lastPut           map[*driverConn]string // stacktrace of last conn's put; debug only
  
  // 鏈接池的大小,0意味着使用默認的大小2, 小於0表示不使用鏈接池
	maxIdle           int    // zero means defaultMaxIdleConns; negative means 0
  // 最大打開的鏈接數,包含鏈接池中的鏈接和鏈接池以外的空閒鏈接, 0表示不作限制
	maxOpen           int    // <= 0 means unlimited
  
  // 鏈接被重用的時間,設置爲0表示一直能夠被重用。
	maxLifetime       time.Duration  // maximum amount of time a connection may be reused
  
  // 他是個chan,用於通知connectionCleaner協程去請求過時的鏈接
  // 當有設置最大存活時間時纔會生效
	cleanerCh         chan struct{}
  
  // 等待的鏈接總數,當maxIdle爲0時,waitCount也會一直爲
  // 由於maxIdle爲0,每個請求過來都會打開一條新的鏈接。
	waitCount         int64 // Total number of connections waited for.
  
  // 釋放鏈接時,由於鏈接池已滿而關閉的鏈接總數
  // 若是maxLifeTime沒有被設置,maxIdleClosed爲0
	maxIdleClosed     int64 // Total number of connections closed due to idle.
  
  // 由於超過了最大鏈接時間,而被關閉的鏈接總數
	maxLifetimeClosed int64 // Total number of connections closed due to max free limit.
  
  // 當DB被關閉時,關閉connection opener和session resetter這兩個協程
	stop func() // stop cancels the connection opener and the session resetter.
}

4.二、driverConn

鏈接的封裝結構體:driverConn

// driverConn wraps a driver.Conn with a mutex, to
// be held during all calls into the Conn. (including any calls onto
// interfaces returned via that Conn, such as calls on Tx, Stmt,
// Result, Rows)
/**
	driverConn使用互斥鎖包裝Conn包裝
*/
type driverConn struct {
  // 持有對整個數據庫的抽象結構體
	db        *DB   		
	createdAt time.Time 

	sync.Mutex  // guards following
  
  // 對應於具體的鏈接,eg.mysqlConn
	ci          driver.Conn
  
  // 標記當前鏈接的狀態:當前鏈接是否已經關閉
	closed      bool
  // 標記當前鏈接的狀態:當前鏈接是否最終關閉,包裝 ci.Close has been called
	finalClosed bool // ci.Close has been called
  
  // 在這些鏈接上打開的statement
	openStmt    map[*driverStmt]bool
  
  // connectionResetter返回的結果
	lastErr     error // lastError captures the result of the session resetter.

	// guarded by db.mu
  // 鏈接是否被佔用了
	inUse      bool
  
  // 在歸還鏈接時須要運行的代碼。在noteUnusedDriverStatement中添加
	onPut      []func() // code (with db.mu held) run when conn is next returned
  
	dbmuClosed bool     // same as closed, but guarded by db.mu, for removeClosedStmtLocked
}

4.三、Conn

具體的鏈接: driver包下的Conn以下,是個接口,須要被具體的實現。

// Conn is assumed to be stateful.
type Conn interface {
	// Prepare returns a prepared statement, bound to this connection.
	Prepare(query string) (Stmt, error)

	// Close invalidates and potentially stops any current
	// prepared statements and transactions, marking this
	// connection as no longer in use.
	//
	// Because the sql package maintains a free pool of
	// connections and only calls Close when there's a surplus of
	// idle connections, it shouldn't be necessary for drivers to
	// do their own connection caching.
	Close() error

	// Begin starts and returns a new transaction.
	//
	// Deprecated: Drivers should implement ConnBeginTx instead (or additionally).
	Begin() (Tx, error)
}

5、流程梳理

5.一、先獲取DB實例

在golang中,要想獲取鏈接,通常咱們都得經過下面這段代碼獲取到DB的封裝結構體實例。

經過上面的三個結構體能夠看出 DB 、driverConn、Conn的關係以下:

因此咱們的代碼通常長成下面這樣,先獲取一個DB結構體的實例,DB結果體中有維護鏈接池、以及和建立鏈接,關閉鏈接協程通訊的channel,已經各類配置參數。

上圖中淺藍色部分的 freeConn就是空閒鏈接池,裏面的driver包下的Conn interface就是具體的鏈接。

/**
 * MySQL鏈接相關的邏輯
 */
type Conenctor struct {
	BaseInfo BaseInfo
	DB       *sql.DB
}

func (c *Conenctor) Open() {
	// 讀取配置
	c.loadConfig()
	dataSource := c.BaseInfo.RootUserName + ":" + c.BaseInfo.RootPassword + "@tcp(" + c.BaseInfo.Addr + ":" + c.BaseInfo.Port + ")/" + c.BaseInfo.DBName
	db, Err := sql.Open("mysql", dataSource)
	if Err != nil {
		common.Error("Fail to opendb dataSource:[%v] Err:[%v]", dataSource, Err.Error())
		return
	}
	db.SetMaxOpenConns(500)
	db.SetMaxIdleConns(200)
	c.DB = db
	Err = db.Ping()
	if Err != nil {
		fmt.Printf("Fail to Ping DB Err :[%v]", Err.Error())
		return
	}
}

5.二、流程梳理入口:

好比咱們本身寫代碼時,可能會搞這樣一個方法作增刪改

// 插入、更新、刪除
func (c *Conenctor) Exec(ctx context.Context, 
                         sqlText string,
                         params ...interface{}) (qr *QueryResults) {
	qr = &QueryResults{}
	result, err := c.DB.ExecContext(ctx, sqlText, params...)
	defer HandleException()
	if err != nil {
		qr.EffectRow = 0
		qr.Err = err
		common.Error("Fail to exec qurey sqlText:[%v] params:[%v] err:[%v]", sqlText, params, err)
		return
	}
	qr.EffectRow, _ = result.RowsAffected()
	qr.LastInsertId, _ = result.LastInsertId()
	return
}

主要是使用DB.ExecContext()執行SQL,獲取返回值。

ctx是業務代碼傳入的上線文,一般是作超時限制使用。

其實這裏並非嚴格意義上的去執行sql,它實際上是經過和MySQL-Server之間創建的鏈接將sql+params發往MySQL-Server去解析和執行。

進入DB.ExecContext()

主要邏輯以下:exec()方法的主要功能是:獲取鏈接,發送sql和參數。

  • 若是獲取一次失敗一次,當失敗的次數達到sql包預約義的常量maxBadConnRetries的狀況下,將會建立新的鏈接使用
  • 未超過maxBadConnRetries,被打上cachedOrNewConn,優先從空閒池中獲取鏈接
func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
   var res Result
   var err error
   for i := 0; i < maxBadConnRetries; i++ {
      res, err = db.exec(ctx, query, args, cachedOrNewConn)
      if err != driver.ErrBadConn {
         break
      }
   }
   if err == driver.ErrBadConn {
      return db.exec(ctx, query, args, alwaysNewConn)
   }
   return res, err
}

跟進exec() --> db.conn(ctx, strategy)

func (db *DB) exec(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (Result, error) {
  // 這個strategy就是上一步咱們告訴他是建立新鏈接,仍是優先從緩存池中獲取鏈接。
	dc, err := db.conn(ctx, strategy)
  ..
}

5.三、獲取鏈接

跟進conn()方法

conn方法的返回值是driverConn,也就是咱們上面說的數據庫鏈接,做用就是說,跟據傳遞進來的獲取策略,獲取數據庫鏈接,若是正常就返回獲取到的數據庫鏈接,異常就返回錯誤err

這張圖是conn獲取鏈接的流程圖,根據下面這段代碼畫出來的,註釋有寫在代碼上

// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
	db.mu.Lock()
  // 先監測db是否關閉了
	if db.closed {
		db.mu.Unlock()
    // DB都關閉了,直接返回DBClosed錯誤,不必再去獲取鏈接。
		return nil, errDBClosed
	}
  // 檢查用戶傳遞進來的Context是否過時了
	select {
	default:
  // 若是用戶那邊使用了ctx.Done(),毫無疑問,會進入這個case中,返回Ctx錯誤  
	case <-ctx.Done():
		db.mu.Unlock()
		return nil, ctx.Err()
	}
  // 鏈接被重用的時間,若是爲0,表示 理論上這個鏈接永不過時,一直能夠被使用
	lifetime := db.maxLifetime

  // 看一下空閒鏈接池(他是個slice)是不是還有空閒的鏈接
	numFree := len(db.freeConn)
  // 若是獲取策略是優先從鏈接池中獲取,而且鏈接池中確實存在空閒的鏈接,就從freeConn中取鏈接使用。
	if strategy == cachedOrNewConn && numFree > 0 {
    // 假設空閒池還剩下五條鏈接:【1,2,3,4,5】
    // 取出第一條 conn == 1
		conn := db.freeConn[0]
    // 切片的拷貝,實現remove掉第一個鏈接的目的。
		copy(db.freeConn, db.freeConn[1:])
    // 若是db.freeConn[1:]會致使freeConn變小,因此這裏是 db.freeConn = db.freeConn[:numFree-1]
		db.freeConn = db.freeConn[:numFree-1]
    // 這裏獲取的鏈接是driverConn,它實際上是對真實鏈接,driver.Conn的封裝。
    // 在driver.Conn的基礎上多一層封裝能夠實如今driver.Conn的基礎上,加持上狀態信息,以下
		conn.inUse = true
		db.mu.Unlock()
    // 檢查是否過時
		if conn.expired(lifetime) {
			conn.Close()
			return nil, driver.ErrBadConn
		}
		// Lock around reading lastErr to ensure the session resetter finished.
    // 加鎖處理,確保這個conn不曾被標記爲 lastErr狀態。
    // 一旦被標記爲這個狀態說明 ConnectionRestter協程在重置conn的狀態時發生了錯誤。也就是這個鏈接其實已經壞掉了,不可以使用。
		conn.Lock()
		err := conn.lastErr
		conn.Unlock()
    // 若是檢測到這種錯誤,driver.ErrBadConn 表示鏈接不可用,關閉鏈接,返回錯誤。
		if err == driver.ErrBadConn {
			conn.Close()
			return nil, driver.ErrBadConn
		}
		return conn, nil
	}

	// Out of free connections or we were asked not to use one. If we're not
	// allowed to open any more connections, make a request and wait.
  // db.maxOpen > 0 表示當前DB實例容許打開鏈接
  // db.numOpen >= db.maxOpen表示當前DB能打開的鏈接數,已經大於它能打開的最大鏈接數,就構建一個request,而後等待獲取鏈接
	if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
		// Make the connRequest channel. It's buffered so that the
		// connectionOpener doesn't block while waiting for the req to be read.
	
    // 構建connRequest這個channel,緩存大小是1
    // 用於告訴connectionOpener協程,須要打開一個新的鏈接。
		req := make(chan connRequest, 1)
    
    /**
      nextRequestKeyLocked函數以下:
     
      func (db *DB) nextRequestKeyLocked() uint64 {
				next := db.nextRequest
				db.nextRequest++
				return next
			}
			
			主要做用就是將nextRequest+1,
			至於這個nextRequest的做用咱們前面也說過了,它至關於binlog中的next_trx下一個事物的事物id。
			言外之意是這個nextRequest遞增的(由於這段代碼被加了lock)。
			看以下的代碼中,將這個自增後的nextRequest當返回值返回出去。
			而後緊接着將它做爲map的key
			
			至於這個map嘛:
		  在本文一開始的位置,咱們介紹了DB結構體有這樣一個屬性,鏈接請求的map, key是自增的int64類型的數,
      用於惟一標示這個請求分配的
      connRequests map[uint64]chan connRequest 
     */
		reqKey := db.nextRequestKeyLocked()
    // 將這個第n個請求對應channel緩存起來,開始等待有合適的機會分配給他鏈接
		db.connRequests[reqKey] = req
    // 等待數增長,解鎖
		db.waitCount++
		db.mu.Unlock()
    
		waitStart := time.Now()

		// Timeout the connection request with the context.
    // 進入下面的slice中
		select {
    // 若是客戶端傳入的上下文超時了,進入這個case
		case <-ctx.Done():
			// Remove the connection request and ensure no value has been sent
			// on it after removing.
      // 當上下文超時時,表示上層的客戶端代碼想斷開,意味着在這個方法收到這個信號後須要退出了
      // 這裏將db的connRequests中的reqKey清除,防止還給他分配一個鏈接。
			db.mu.Lock()
			delete(db.connRequests, reqKey)
			db.mu.Unlock()

			atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
			// 這裏也會嘗試從req channel中獲取一下有沒有可用的鏈接
      // 若是有的話執行 db.putConn(ret.conn, ret.err, false) ,目的是釋放掉這個鏈接
			select {
			default:
			case ret, ok := <-req:
				if ok && ret.conn != nil {
          // 看到這裏只須要知道他是用來釋放鏈接的就ok,繼續往下看,稍後再殺回來
					db.putConn(ret.conn, ret.err, false)
				}
			}
      //返回ctx異常。
			return nil, ctx.Err()
    // 嘗試從 reqchannel 中取出鏈接
		case ret, ok := <-req:
			atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
			// 處理錯誤
			if !ok {
				return nil, errDBClosed
			}
      // 檢測鏈接是否過時了,前面也提到過,DB實例有維護一個參數,maxLifeTime,0表示永不過時
			if ret.err == nil && ret.conn.expired(lifetime) {
				ret.conn.Close()
				return nil, driver.ErrBadConn
			}
      // 健壯性檢查
			if ret.conn == nil {
				return nil, ret.err
			}
      
			// Lock around reading lastErr to ensure the session resetter finished.
      // 檢查鏈接是否可用
			ret.conn.Lock()
			err := ret.conn.lastErr
			ret.conn.Unlock()
			if err == driver.ErrBadConn {
				ret.conn.Close()
				return nil, driver.ErrBadConn
			}
			return ret.conn, ret.err
		}
	}
  // 代碼能運行到這裏說明上面的if條件沒有被命中。
  // 換句話說,來到這裏說明具有以下條件
  // 1:當前DB實例的空閒鏈接池中已經沒有空閒鏈接了,獲取明確指定,不從空閒池中獲取鏈接,就想新建鏈接。
  // 2: 當前DB實例容許打開鏈接
  // 3: DB實例目前打開的鏈接數尚未到達它能打開的最大鏈接數的上限。
  
	// 記錄當前DB已經打開的鏈接數+1
	db.numOpen++ // optimistically
	db.mu.Unlock()
	ci, err := db.connector.Connect(ctx)
	if err != nil {
		db.mu.Lock()
		db.numOpen-- // correct for earlier optimism
		db.maybeOpenNewConnections()
		db.mu.Unlock()
		return nil, err
	}
	db.mu.Lock()
  // 構建一個鏈接實例,並返回
	dc := &driverConn{
		db:        db,
		createdAt: nowFunc(),
		ci:        ci,
		inUse:     true,
	}
	db.addDepLocked(dc, dc)
	db.mu.Unlock()
	return dc, nil
}

5.四、釋放鏈接

鏈接被是事後是須要被釋放的

釋放鏈接的邏輯封裝在DB實例中

db.putConn(ret.conn, ret.err, false)

釋放鏈接的流程圖以下:

流程圖根據以下的代碼畫出。

方法詳細信息以下:

func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
  // 釋放鏈接的操做加鎖
	db.mu.Lock()
  // debug的信息
	if !dc.inUse {
		if debugGetPut {
			fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
		}
		panic("sql: connection returned that was never out")
	}
	if debugGetPut {
		db.lastPut[dc] = stack()
	}
  // 標記driverConn處理不可用的狀態
	dc.inUse = false

	for _, fn := range dc.onPut {
		fn()
	}
	dc.onPut = nil

  // 本方法的入參中有參數err
  // 當會話獲取出這個鏈接後,發現這個鏈接過時了、或者被標記上來lastErr時,再調用這個putConn方法時,同時會將這個錯誤傳遞進來,而後在這裏判斷,當出現壞掉的鏈接時就不直接把這個鏈接放回空閒鏈接池了。
	if err == driver.ErrBadConn {
		// Don't reuse bad connections.
		// Since the conn is considered bad and is being discarded, treat it
		// as closed. Don't decrement the open count here, finalClose will
		// take care of that.
    // 這個方法的做用以下:
    // 他會去判斷當前DB維護的map的容量,也就是前面提到的那種狀況:當DB容許打開鏈接,可是如今的鏈接數已經達到當前DB容許打開的最大鏈接數上限了,那麼針對接下來想要獲取鏈接的請求的處理邏輯就是,構建一個req channel,放入connRequests這個map中,表示他們正在等待鏈接的創建。
    // 換句話說,這時系統時繁忙的,業務處於高峯,那麼問題來了,如今居然出現了一個壞掉的鏈接,那爲了把對業務線的影響降到最低,是否是得主動新建一個新的鏈接放到空閒鏈接池中呢?
    // 	db.maybeOpenNewConnections() 函數主要乾的就是這個事。
    // 	方法詳情以下
    /*
    	func (db *DB) maybeOpenNewConnections() {
					numRequests := len(db.connRequests)
					if db.maxOpen > 0 {
						numCanOpen := db.maxOpen - db.numOpen
					if numRequests > numCanOpen {
						numRequests = numCanOpen
					}
			}
			for numRequests > 0 {
						db.numOpen++ // optimistically
						numRequests--
						if db.closed {
							return
						}
				  // 它只是往這個	openerCh channel中寫入一個空的結構體,會有專門的協程負責建立鏈接
					db.openerCh <- struct{}{}
			}
		}
    */
		db.maybeOpenNewConnections()
    //  解鎖,關閉鏈接,返回
		db.mu.Unlock()
		dc.Close()
		return
	}
	if putConnHook != nil {
		putConnHook(db, dc)
	}
  // 若是DB已經關閉了,標記 resetSession爲 false
	if db.closed {
		// Connections do not need to be reset if they will be closed.
		// Prevents writing to resetterCh after the DB has closed.
    // 當DB都已經關了,意味着DB裏面的鏈接池都沒有了,那固然不須要關閉鏈接池中的鏈接了~
		resetSession = false
	}
  // 若是DB沒有關閉的話,進入if代碼塊
	if resetSession {
    // 將dricerConn中的Conn驗證轉換爲driver.SessionResetter
		if _, resetSession = dc.ci.(driver.SessionResetter); resetSession {
      // 在此處鎖定driverConn,以便在鏈接重置以前不會釋放。
      // 必須在將鏈接放入池以前獲取鎖,以防止在重置以前將其取出
			dc.Lock()
		}
	}
  // 真正將鏈接放回空閒鏈接池中
  // 知足connRequest或將driverConn放入空閒池並返回true或false
  /*
  	func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
  			// 檢測若是DB都關閉塊,直接返回flase
				if db.closed {
					return false
				}
				// 若是DB當前打開的鏈接數大於DB能打開的最大的鏈接數,返回false
				if db.maxOpen > 0 && db.numOpen > db.maxOpen {
					return false
				}
				//若是等待獲取鏈接的map中有存貨
		 if c := len(db.connRequests); c > 0 {
		 		
				var req chan connRequest
				var reqKey uint64
				// 取出map中的第一個key
				for reqKey, req = range db.connRequests {
					break
				}
				// 將這個key,value再map中刪除
				delete(db.connRequests, reqKey) // Remove from pending requests.
				// 從新標記這個鏈接是可用的狀態
				if err == nil {
					dc.inUse = true
				}
				// 將這個鏈接放入到 req channel中,給等待鏈接到會話使用
				req <- connRequest{
					conn: dc,
					err:  err,
				}
				return true
				
		// 來到這個if,說明此時沒有任何請求在等待獲取鏈接,而且沒有發生錯誤,DB也沒有關閉
		} else if err == nil && !db.closed {
				// 比較當前空閒鏈接池的大小(默認是2) 和 freeConn空閒鏈接數的數量
				// 意思是,若是空閒的鏈接超出了這個規定的閾值,空閒鏈接是須要被收回的。
				if db.maxIdleConnsLocked() > len(db.freeConn) {
				  // 收回
					db.freeConn = append(db.freeConn, dc)
					db.startCleanerLocked()
					return true
				}
				// 若是空閒鏈接還沒到閾值,保留這個鏈接看成空閒鏈接
				db.maxIdleClosed++
		}		
				// 收回空閒鏈接返回false
				return false
}
  */
  
  // 若是將鏈接成功放入了空閒鏈接池,或者將鏈接成功給了等待鏈接到會話使用,此處返回true
  // 收回空閒鏈接返回false
  // 代碼詳情就是在上面的這段註釋中
	added := db.putConnDBLocked(dc, nil)
	db.mu.Unlock()
	
  // 若是
	if !added {
    // 若是DB沒有關閉,進入if
		if resetSession {
			dc.Unlock()
		}
		dc.Close()
		return
	}
  // 從新校驗,若是鏈接關閉了,進入if
	if !resetSession {
		return
	}
  
  // 若是負責重置 conn狀態的線程阻塞住了,那麼標記這個driverConn爲lastErr
	select {
	default:
		// If the resetterCh is blocking then mark the connection
		// as bad and continue on.
		dc.lastErr = driver.ErrBadConn
		dc.Unlock()
	case db.resetterCh <- dc:
	}
}

5.五、connectionOpener

5.5.一、是什麼?

這個connectionOpener是一個工做協程,它會去嘗試消費指定的channel,負責建立數據庫鏈接,其實在前面閱讀獲取鏈接的邏輯時,有這樣的兩種狀況會阻塞等待connectionOpener來新建立鏈接:

第一種:當獲取鏈接的策略是優先從cache鏈接池中獲取出來,可是空閒鏈接池已經沒有空閒的鏈接了,首先這時DB容許打開鏈接,可是DB能打開的鏈接數已經達到了它能打開的鏈接數的上線,因此得等待有空閒鏈接出現,或者等有鏈接被釋放後,DB能當前打開的鏈接數小於了它能打開的鏈接數的最大值,這時它會被阻塞等待去嘗試建立鏈接。

第二種:獲取鏈接的策略再也不是優先從空閒緩衝池中獲取鏈接,直接明瞭的想獲取最一條新鏈接,一樣的此時DB已經打開的鏈接數大於它能打開鏈接數的上線,它會被阻塞等待建立鏈接。

image-20200731221533203

5.5.二、何時開啓的?
func OpenDB(c driver.Connector) *DB {
	ctx, cancel := context.WithCancel(context.Background())
	db := &DB{
		connector:    c,
		openerCh:     make(chan struct{}, connectionRequestQueueSize),
		resetterCh:   make(chan *driverConn, 50),
		lastPut:      make(map[*driverConn]string),
		connRequests: make(map[uint64]chan connRequest),
		stop:         cancel,
	}
	// 能夠看到他是在DB被實例化時開啓的。
	go db.connectionOpener(ctx)
	go db.connectionResetter(ctx)

	return db
}
5.5.三、代碼詳情

能夠看到它一直嘗試從db的openerCh中獲取內容,並且只要獲取到了內容,就會調用方法打開鏈接。

// Runs in a separate goroutine, opens new connections when requested.
func (db *DB) connectionOpener(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
    // here  
		case <-db.openerCh:
			db.openNewConnection(ctx)
		}
	}
}
5.5.四、誰往openerCh中投放消息?

往channl中投放消息的邏輯在db的mayBeOpenNewConnections中

func (db *DB) maybeOpenNewConnections() {
  // 經過檢查這個map的長度來決定是否往opennerCh中投放消息
	numRequests := len(db.connRequests)
	if db.maxOpen > 0 {
		numCanOpen := db.maxOpen - db.numOpen
		if numRequests > numCanOpen {
			numRequests = numCanOpen
		}
	}
	for numRequests > 0 {
		db.numOpen++ // optimistically
		numRequests--
		if db.closed {
			return
		}
    // 一旦執行了這一步,connectionOpener 就會監聽到去建立鏈接。
		db.openerCh <- struct{}{}
	}
}
5.5.五、注意點:

在DB結構體中有這樣一個屬性

// 鏈接池的大小,0意味着使用默認的大小2, 小於0表示不使用鏈接池
	maxIdle           int    // zero means defaultMaxIdleConns; negative means 0

表示空閒鏈接池默認的大小,若是它爲0,表示都沒有緩存池,也就意味着會爲全部想獲取鏈接的請求建立新的conn,這是也就不會有這個opnerCh,更不會有connectionOpener

5.六、connectionCleaner

5.6.一、是什麼?有啥用?

它一樣以一條協程的形式存在,用於定時清理數據庫鏈接池中過時的鏈接

func (db *DB) startCleanerLocked() {
	if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil {
		db.cleanerCh = make(chan struct{}, 1)
		go db.connectionCleaner(db.maxLifetime)
	}
}
5.6.二、注意點

一樣的,DB中存在一個參數:maxLifetime

它表示數據庫鏈接最大的生命時長,若是將它設置爲0,表示這個鏈接永不過時,既然全部的鏈接永不過時,就不會存在connectionCleaner去定時根據maxLifetime 來定時清理鏈接。

它的調用時機是:須要將鏈接放回到鏈接池時調用。

5.七、connectionRestter

5.7.一、做用

咱們使用獲取的鏈接的封裝結構體是driverConn,其實它是會driver包下的Conn鏈接的又一層封裝,目的是加強

driver包下的Conn的,多出來了一些狀態。當將使用完畢的鏈接放入鏈接池時,就得將這些狀態清除掉。

使用誰去清除呢?就是這個go 協程:connectionRestter

當connectionRestter碰到錯誤時,會將這個conn標記爲lastErr,鏈接使用者在使用鏈接時會先校驗conn的諸多狀態,好比出現lastErr,會返回給客戶端 badConnErr

6、MySQL鏈接池所受的限制

數據庫鏈接池大大小到底設置爲多少,得根據業務流量已經數據庫所在機器的性能綜合考慮。

mysql鏈接數到配置在 my.cnf中,具體的參數是max_connections。

當業務流量異常猛烈時,極可能會出現這個問題:to many connections

對於操縱系統內核來講,當他接受到一個tcp請求就會在本地建立一個由文件系統管理的socket文件。在linux中咱們將它叫作文件句柄。

linux爲防止單一進程將系統資源所有耗費掉,會限制進程最大能打開的鏈接數爲1024,這意味着,哪怕經過改配置文件,將mysql能打開的鏈接池設置爲9999,事實上它能打開的文件數最多不會超過1024。

這個問題也好解決:

命令:設置單個進程能打開的最大鏈接數爲65535

ulimit -HSn 65535

經過命令: 查看進程被限制的使用各類資源的量

ulimit -a 

core file size: 進程崩潰是轉儲文件大小限制
man loaded memort 最大鎖定內存大小
open file 能打開的文件句柄數

這些變量定義在 /etc/security/limits.conf配置文件中。

7、關於失效的鏈接

狀況1: 客戶端主動斷開

若是是客戶端主動將鏈接close(), 那往合格鏈接中寫數據時會獲得ErrBadConn的錯誤,若是此時依然能夠重試,將會獲取新的鏈接。

代碼以下:

func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
	var res Result
	var err error
	for i := 0; i < maxBadConnRetries; i++ {
		res, err = db.exec(ctx, query, args, cachedOrNewConn)
		if err != driver.ErrBadConn {
			break
		}
	}
	if err == driver.ErrBadConn {
		return db.exec(ctx, query, args, alwaysNewConn)
	}
	return res, err
}

狀況2: 服務端掛啦

由於這種數據庫鏈接底層使用的是tcp實現。(tcp自己是支持全雙工的,客戶端和服務端支持同時往對方發送數據)依賴諸如:校驗和、確認應答和序列號機制、超時重傳、鏈接管理(3次握手,4次揮手)、以及滑動窗口、流量控制、擁賽避免去實現整個數據交互的可靠性,協調。

這時客戶端拿着一條自認爲是正常的鏈接,往鏈接裏面寫數據。然鵝,另外一端端服務端已經掛了~,可是不幸的是,客戶端的tcp鏈接根本感知不到~~~。

可是當它去讀取服務端的返回數據時會遇到錯誤:unexceptBadConn EOF

8、鏈接的有效性

  • 思路1:

設置鏈接的屬性: maxLifeTime

上面也說過了,當設置了這個屬性後,DB會開啓一條協程connectionCleaner,專門負責清理過時的鏈接。

這在必定程度上避免了服務端將鏈接斷掉後,客戶端無感知的狀況。

maxLifeTime的值到底設置多大?參考值,比數據庫的wait_timeout小一些就ok。

  • 思路2:

主動檢查鏈接的有效性。

好比在鏈接放回到空閒鏈接池前ping測試。在使用鏈接發送數據前進行連通性測試。

相關文章
相關標籤/搜索