近期一同事負責的線上模塊,老是時不時的返回一下 504,檢查發現,這個服務的內存使用異常的大,pprof分析後,發現有上萬個goroutine,排查分析以後,是沒有規範使用gorm包致使的,那麼具體是什麼緣由呢,會不會也像 《Go Http包解析:爲何須要response.Body.Close()》 文中同樣,由於沒有釋放鏈接致使的呢?mysql
首先咱們先來看一個示例,而後,猜想一下打印的結果git
package main import ( "log" "net/http" _ "net/http/pprof" "time" "github.com/jinzhu/gorm" _ "github.com/jinzhu/gorm/dialects/mysql" ) var ( db *gorm.DB ) type User struct { ID int64 `gorm:"column:id;primary_key" json:"id"` Name string `gorm:"column:name" json:"name"` } func (user *User) TableName() string { return "ranger_user" } func main() { go func() { log.Println(http.ListenAndServe(":6060", nil)) }() for true { GetUserList() time.Sleep(time.Second) } } func GetUserList() ([]*User, error) { users := make([]*User, 0) db := open() rows, err := db.Model(&User{}).Where("id > ?", 1).Rows() if err != nil { panic(err) } // 爲了試驗而寫的特殊邏輯 for rows.Next() { user := &User{} err = db.ScanRows(rows, user) return nil, err } return users, nil } func open() *gorm.DB { if db != nil { return db } var err error db, err = gorm.Open("mysql", "user:pass@(ip:port)/db?charset=utf8&parseTime=True&loc=Local") if err != nil { panic(err) } return db } 複製代碼
咱們先看一下上面的demo,貌似沒有什麼問題,咱們就運行一段時間看看github
有點尷尬,我就一簡單的查詢返回,怎麼會有那麼多goroutine?sql
繼續看一下都是哪些函數產生了goroutine數據庫
startWatcher.func1
是個什麼鬼json
func (mc *mysqlConn) startWatcher() { watcher := make(chan mysqlContext, 1) mc.watcher = watcher finished := make(chan struct{}) mc.finished = finished go func() { for { var ctx mysqlContext select { case ctx = <-watcher: case <-mc.closech: return } select { case <-ctx.Done(): mc.cancel(ctx.Err()) case <-finished: case <-mc.closech: return } } }() } 複製代碼
startWatcher
這個函數的調用者,只有 MySQLDriver.Open
會調用,也就是建立新的鏈接的時候,纔會去建立一個監控者的goroutinesegmentfault
根據 《Go Http包解析:爲何須要response.Body.Close()》 中的分析結果,能夠大膽猜想,有多是mysql每次去查詢的時候,獲取一個鏈接,沒有空閒的鏈接,則建立一個新的,查詢完成後釋放鏈接到鏈接池,以便下一個請求使用,而因爲沒有調用rows.Close(), 致使拿了鏈接以後,沒有再放回鏈接池複用,致使每一個請求過來都建立一個新的請求,從而致使產生了大量的goroutine去運行startWatcher.func1
監控新建立的鏈接 。因此咱們相似於 response.Close 同樣,進行一下 rows.Close() 是否是就ok了,接下來驗證一下緩存
對上面的測試代碼增長一行rows.Close()markdown
defer rows.Close() for rows.Next() { user := &User{} err = db.ScanRows(rows, user) return nil, err } 複製代碼
繼續觀察goroutine的變化網絡
goroutine 再也不上升,貌似問題就解決了
rows.Close()
的,不少狀況下並無出現goroutine的暴增,這是爲何照例,仍是先把可能用到的結構體提早放出來,混個眼熟
// Rows is the result of a query. Its cursor starts before the first row // of the result set. Use Next to advance from row to row. type Rows struct { dc *driverConn // owned; must call releaseConn when closed to release releaseConn func(error) // driverConn.releaseConn, 在query的時候,會傳遞過來 rowsi driver.Rows cancel func() // called when Rows is closed, may be nil. closeStmt *driverStmt // if non-nil, statement to Close on close // closemu prevents Rows from closing while there // is an active streaming result. It is held for read during non-close operations // and exclusively during close. // // closemu guards lasterr and closed. closemu sync.RWMutex closed bool lasterr error // non-nil only if closed is true // lastcols is only used in Scan, Next, and NextResultSet which are expected // not to be called concurrently. lastcols []driver.Value }s 複製代碼
創建鏈接、scope結構體、Model、Where 方法的邏輯就再也不贅述了,上一篇文章《GORM之ErrRecordNotFound採坑記錄》已經粗略講過了,直接進入Rows
函數的解析
// Rows return `*sql.Rows` with given conditions func (s *DB) Rows() (*sql.Rows, error) { return s.NewScope(s.Value).rows() } func (scope *Scope) rows() (*sql.Rows, error) { defer scope.trace(scope.db.nowFunc()) result := &RowsQueryResult{} // 設置 row_query_result,供 callback 函數使用 scope.InstanceSet("row_query_result", result) scope.callCallbacks(scope.db.parent.callbacks.rowQueries) return result.Rows, result.Error } 複製代碼
感受這裏很快就進入了callback
的回調
根據上一篇文章的經驗,rowQueries
所註冊的回調函數,能夠在 callback_row_query.go 中的 init() 函數中找到
func init() { DefaultCallback.RowQuery().Register("gorm:row_query", rowQueryCallback) } // queryCallback used to query data from database func rowQueryCallback(scope *Scope) { // 對應 上面函數裏面的 scope.InstanceSet("row_query_result", result) if result, ok := scope.InstanceGet("row_query_result"); ok { // 組裝出來對應的sql語句,eg: SELECT * FROM `ranger_user` WHERE (id > ?) scope.prepareQuerySQL() if str, ok := scope.Get("gorm:query_option"); ok { scope.SQL += addExtraSpaceIfExist(fmt.Sprint(str)) } if rowResult, ok := result.(*RowQueryResult); ok { rowResult.Row = scope.SQLDB().QueryRow(scope.SQL, scope.SQLVars...) } else if rowsResult, ok := result.(*RowsQueryResult); ok { // result 對應的結構體是 RowsQueryResult,因此執行到這裏,繼續跟進這個函數 rowsResult.Rows, rowsResult.Error = scope.SQLDB().Query(scope.SQL, scope.SQLVars...) } } } 複製代碼
上面能夠看到,rowQueryCallback
僅僅是組裝了一下sql,而後又去調用go 提供的sql包,來進行查詢
// Query executes a query that returns rows, typically a SELECT. // The args are for any placeholder parameters in the query. // query是sql語句,args則是sql中? 所表明的值 func (db *DB) Query(query string, args ...interface{}) (*Rows, error) { return db.QueryContext(context.Background(), query, args...) } // QueryContext executes a query that returns rows, typically a SELECT. // The args are for any placeholder parameters in the query. func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) { var rows *Rows var err error // maxBadConnRetries = 2 for i := 0; i < maxBadConnRetries; i++ { // cachedOrNewConn 則是告訴query 去使用緩存的鏈接或者建立一個新的鏈接 rows, err = db.query(ctx, query, args, cachedOrNewConn) if err != driver.ErrBadConn { break } } // 若是嘗試了maxBadConnRetries次後,鏈接仍是有問題的,則建立一個新的鏈接去執行sql if err == driver.ErrBadConn { return db.query(ctx, query, args, alwaysNewConn) } return rows, err } func (db *DB) query(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) { // 根據上面定的獲取鏈接的策略,來獲取一個有效的鏈接 dc, err := db.conn(ctx, strategy) if err != nil { return nil, err } // 使用獲取的鏈接,進行查詢 return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args) } 複製代碼
上面的邏輯理解不難,這裏有兩個變量,解釋一下
cachedOrNewConn: connReuseStrategy 類型,本質是uint8類型,值是1,這個標誌會傳遞給下面的db.conn
函數,告訴這個函數,返回鏈接的策略
1. 若是鏈接池中有空閒鏈接,返回一個空閒的
2. 若是鏈接池中沒有空的鏈接,且沒有超過最大建立的鏈接數,則建立一個新的返回
3. 若是鏈接池中沒有空的鏈接,且超過最大建立的鏈接數,則等待鏈接釋放後,返回這個空閒鏈接
複製代碼
alwaysNewConn:
// conn returns a newly-opened or cached *driverConn. func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) { db.mu.Lock() if db.closed { db.mu.Unlock() return nil, errDBClosed } // Check if the context is expired. // 校驗一下ctx是否過時了 select { default: case <-ctx.Done(): db.mu.Unlock() return nil, ctx.Err() } lifetime := db.maxLifetime // Prefer a free connection, if possible. numFree := len(db.freeConn) if strategy == cachedOrNewConn && numFree > 0 { // 若是選擇鏈接的策略是 cachedOrNewConn,而且有空閒的鏈接,則嘗試獲取鏈接池中的第一個鏈接 conn := db.freeConn[0] copy(db.freeConn, db.freeConn[1:]) db.freeConn = db.freeConn[:numFree-1] conn.inUse = true db.mu.Unlock() // 判斷當前鏈接的空閒時間是否超過了設定的最大空閒時間 if conn.expired(lifetime) { conn.Close() return nil, driver.ErrBadConn } // Lock around reading lastErr to ensure the session resetter finished. // 判斷鏈接的lastErr,確保鏈接是被重置過的 conn.Lock() err := conn.lastErr conn.Unlock() if err == driver.ErrBadConn { conn.Close() return nil, driver.ErrBadConn } return conn, nil } // Out of free connections or we were asked not to use one. If we're not // allowed to open any more connections, make a request and wait. // 走到這裏說明沒有獲取到空閒鏈接,判斷建立的鏈接數量是否超過最大容許的鏈接數量 if db.maxOpen > 0 && db.numOpen >= db.maxOpen { // Make the connRequest channel. It's buffered so that the // connectionOpener doesn't block while waiting for the req to be read. // 建立一個chan,用於接收釋放的空閒鏈接 req := make(chan connRequest, 1) // 建立一個key reqKey := db.nextRequestKeyLocked() // 將key 和chan綁定,便於根據key 定位所對應的chan db.connRequests[reqKey] = req db.waitCount++ db.mu.Unlock() waitStart := time.Now() // Timeout the connection request with the context. select { case <-ctx.Done(): // Remove the connection request and ensure no value has been sent // on it after removing. // 若是ctx失效了,則這個空閒鏈接也不須要了,刪除剛剛建立的key,防止這個鏈接被移除後再次爲這個key獲取鏈接 db.mu.Lock() delete(db.connRequests, reqKey) db.mu.Unlock() atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart))) select { default: case ret, ok := <-req: // 若是獲取到了空閒鏈接,則放回鏈接池裏面 if ok && ret.conn != nil { db.putConn(ret.conn, ret.err, false) } } return nil, ctx.Err() case ret, ok := <-req: // 此時拿到了空閒鏈接,且ctx沒有過時,則判斷鏈接是否有效 atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart))) if !ok { return nil, errDBClosed } // 判斷鏈接是否過時 if ret.err == nil && ret.conn.expired(lifetime) { ret.conn.Close() return nil, driver.ErrBadConn } if ret.conn == nil { return nil, ret.err } // Lock around reading lastErr to ensure the session resetter finished. // 判斷鏈接的lastErr,確保鏈接是被重置過的 ret.conn.Lock() err := ret.conn.lastErr ret.conn.Unlock() if err == driver.ErrBadConn { ret.conn.Close() return nil, driver.ErrBadConn } return ret.conn, ret.err } } // 上面兩個都不知足,則建立一個新的鏈接,也就是 獲取鏈接的策略是 alwaysNewConn 的時候 db.numOpen++ // optimistically db.mu.Unlock() ci, err := db.connector.Connect(ctx) if err != nil { db.mu.Lock() db.numOpen-- // correct for earlier optimism // 若是鏈接建立失敗,則再嘗試建立一次 db.maybeOpenNewConnections() db.mu.Unlock() return nil, err } db.mu.Lock() dc := &driverConn{ db: db, createdAt: nowFunc(), ci: ci, inUse: true, } // 關閉鏈接時會用到 db.addDepLocked(dc, dc) db.mu.Unlock() return dc, nil } 複製代碼
在上面的邏輯中,能夠看到,獲取鏈接的策略跟咱們上面解釋 cachedOrNewConn 和 alwaysNewConn 時是同樣的,可是,這裏面有兩個問題
釋放鏈接主要依靠 putconn
來完成的,在 conn
函數的下面代碼中
case ret, ok := <-req: // 若是獲取到了空閒鏈接,則放回鏈接池裏面 if ok && ret.conn != nil { db.putConn(ret.conn, ret.err, false) } } 複製代碼
也調用了,把獲取到但再也不須要的鏈接放回池子裏,下面看一下釋放鏈接的過程
// putConn adds a connection to the db's free pool. // err is optionally the last error that occurred on this connection. func (db *DB) putConn(dc *driverConn, err error, resetSession bool) { db.mu.Lock() // 釋放一個正在用的鏈接,panic if !dc.inUse { panic("sql: connection returned that was never out") } dc.inUse = false // 省略部分無關代碼... if err == driver.ErrBadConn { // Don't reuse bad connections. // Since the conn is considered bad and is being discarded, treat it // as closed. Don't decrement the open count here, finalClose will // take care of that. // maybeOpenNewConnections 這個函數又見到了,它到底幹了什麼 db.maybeOpenNewConnections() db.mu.Unlock() dc.Close() return } ... if db.closed { // Connections do not need to be reset if they will be closed. // Prevents writing to resetterCh after the DB has closed. resetSession = false } if resetSession { if _, resetSession = dc.ci.(driver.SessionResetter); resetSession { // Lock the driverConn here so it isn't released until // the connection is reset. // The lock must be taken before the connection is put into // the pool to prevent it from being taken out before it is reset. dc.Lock() } } // 把鏈接放回鏈接池中,也是這個函數的核心邏輯 added := db.putConnDBLocked(dc, nil) db.mu.Unlock() // 若是釋放鏈接失敗,則關閉鏈接 if !added { if resetSession { dc.Unlock() } dc.Close() return } if !resetSession { return } // 嘗試將鏈接放回resetterCh chan裏面,若是失敗,則標識鏈接異常 select { default: // If the resetterCh is blocking then mark the connection // as bad and continue on. dc.lastErr = driver.ErrBadConn dc.Unlock() case db.resetterCh <- dc: } } 複製代碼
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool { if db.closed { return false } // 已經超出最大的鏈接數量了,不須要再放回了 if db.maxOpen > 0 && db.numOpen > db.maxOpen { return false } // 若是有其餘等待獲取空閒鏈接的協程,則 if c := len(db.connRequests); c > 0 { var req chan connRequest var reqKey uint64 // connRequests 獲取一個 chan,並把這個鏈接返回到這個 chan裏面 for reqKey, req = range db.connRequests { break } delete(db.connRequests, reqKey) // Remove from pending requests. if err == nil { dc.inUse = true } req <- connRequest{ conn: dc, err: err, } return true } else if err == nil && !db.closed { // 若是沒有超出最大數量限制,則把這個鏈接放到 freeConn 這個slice裏面 if db.maxIdleConnsLocked() > len(db.freeConn) { db.freeConn = append(db.freeConn, dc) db.startCleanerLocked() return true } db.maxIdleClosed++ } return false } 複製代碼
梳理完釋放鏈接的邏輯,咱們能夠看出鏈接複用的大體流程
###maybeOpenNewConnections
這個函數,在上面的分析中已經出現了兩次了,先分析一下 這個函數到底作了什麼
func (db *DB) maybeOpenNewConnections() { // 計算須要建立的鏈接數,總共建立的有效鏈接數不能超過設置的最大鏈接數 numRequests := len(db.connRequests) if db.maxOpen > 0 { numCanOpen := db.maxOpen - db.numOpen if numRequests > numCanOpen { numRequests = numCanOpen } } for numRequests > 0 { db.numOpen++ // optimistically numRequests-- if db.closed { return } // 往 openerCh 這個chan裏面插入一條數據 db.openerCh <- struct{}{} } } 複製代碼
在前面的分析中,若是在獲取鏈接時,發現產生的鏈接數>= 最大容許的鏈接數,則在 db.connRequests 這個map中建立一個惟一的 key value,用於接收釋放的空閒鏈接,可是若是在釋放鏈接的過程當中,發現這個鏈接失效了,這個鏈接就沒法複用,這時候就會走到這個函數,嘗試建立一個新的鏈接,給其餘等待的請求使用
這裏就會發現一個問題: 爲何 db.openerCh <- struct{}{}
這樣一條簡單的命令就能建立一個鏈接,接下來就須要分析 db.openerCh 的接收方了
###connectionOpener
這個函數在db結構體建立的時候,就會開始執行了,一個常駐的goroutine
// Runs in a separate goroutine, opens new connections when requested. func (db *DB) connectionOpener(ctx context.Context) { for { select { case <-ctx.Done(): return case <-db.openerCh: // 這邊接收到數據後,就開始建立一個新的鏈接 db.openNewConnection(ctx) } } } 複製代碼
// Open one new connection func (db *DB) openNewConnection(ctx context.Context) { // maybeOpenNewConnctions has already executed db.numOpen++ before it sent // on db.openerCh. This function must execute db.numOpen-- if the // connection fails or is closed before returning. // 調用 sql driver 庫來建立一個鏈接 ci, err := db.connector.Connect(ctx) db.mu.Lock() defer db.mu.Unlock() // 若是db已經關閉,則關閉鏈接並返回 if db.closed { if err == nil { ci.Close() } db.numOpen-- return } if err != nil { // 建立鏈接失敗了,從新調用 maybeOpenNewConnections 再建立一次 db.numOpen-- db.putConnDBLocked(nil, err) db.maybeOpenNewConnections() return } dc := &driverConn{ db: db, createdAt: nowFunc(), ci: ci, } // 走到 putConnDBLocked,把鏈接交給等待的請求方或者鏈接池中 if db.putConnDBLocked(dc, err) { db.addDepLocked(dc, dc) } else { db.numOpen-- ci.Close() } } 複製代碼
這裏是鏈接數據庫的主要邏輯
func (t dsnConnector) Connect(_ context.Context) (driver.Conn, error) { return t.driver.Open(t.dsn) } func (d MySQLDriver) Open(dsn string) (driver.Conn, error) { var err error // New mysqlConn mc := &mysqlConn{ maxAllowedPacket: maxPacketSize, maxWriteSize: maxPacketSize - 1, closech: make(chan struct{}), } // 解析dsn mc.cfg, err = ParseDSN(dsn) if err != nil { return nil, err } mc.parseTime = mc.cfg.ParseTime // Connect to Server // 找到對應網絡鏈接類型(tcp...) 的鏈接函數,並建立鏈接 dialsLock.RLock() dial, ok := dials[mc.cfg.Net] dialsLock.RUnlock() if ok { mc.netConn, err = dial(mc.cfg.Addr) } else { nd := net.Dialer{Timeout: mc.cfg.Timeout} mc.netConn, err = nd.Dial(mc.cfg.Net, mc.cfg.Addr) } if err != nil { return nil, err } // Enable TCP Keepalives on TCP connections // 開啓Keepalives if tc, ok := mc.netConn.(*net.TCPConn); ok { if err := tc.SetKeepAlive(true); err != nil { // Don't send COM_QUIT before handshake. mc.netConn.Close() mc.netConn = nil return nil, err } } // Call startWatcher for context support (From Go 1.8) // 這裏調用startWatcher,開始對鏈接進行監控,及時釋放鏈接 if s, ok := interface{}(mc).(watcher); ok { s.startWatcher() } // 下面一些設置與分析無關,忽略... return mc, nil } 複製代碼
這個函數主要是對鏈接進行監控
func (mc *mysqlConn) startWatcher() { watcher := make(chan mysqlContext, 1) mc.watcher = watcher finished := make(chan struct{}) mc.finished = finished go func() { for { var ctx mysqlContext select { case ctx = <-watcher: case <-mc.closech: return } select { // ctx 過時的時候,關閉鏈接,這時候會關閉mc.closech case <-ctx.Done(): mc.cancel(ctx.Err()) case <-finished: // 關閉鏈接 case <-mc.closech: return } } }() } 複製代碼
建立鏈接的邏輯
至此,基本上鍊接建立及複用的流程大概清晰了,至此,對於咱們最開始遇到的問題也有了一個明確的解釋:
func (rs *Rows) Close() error { return rs.close(nil) } func (rs *Rows) close(err error) error { rs.closemu.Lock() defer rs.closemu.Unlock() // ... rs.closed = true // 相關字段的一些設置, 忽略 .... rs.releaseConn(err) return err } // 經過putConn 把鏈接釋放 func (dc *driverConn) releaseConn(err error) { dc.db.putConn(dc, err, true) } 複製代碼
rs.releaseConn 所對應的函數,能夠在 queryDC 這個方法裏面找到,這裏就直接列出來了
能夠看到,rows.Close() 最後就是經過 putConn
把當前的鏈接釋放以便複用
Next 爲scan方法準備下一條記錄,以便scan方法讀取,若是沒有下一行的話,或者準備下一條記錄的時候出錯了,就會返回false
func (rs *Rows) Next() bool { var doClose, ok bool withLock(rs.closemu.RLocker(), func() { // 準備下一條記錄 doClose, ok = rs.nextLocked() }) if doClose { // 若是 doClose 爲true,說明沒有記錄了,或者準備下一條記錄的時候,出錯了,此時關閉鏈接 rs.Close() } return ok } func (rs *Rows) nextLocked() (doClose, ok bool) { // 若是 已經關閉了,就不要讀取下一條了 if rs.closed { return false, false } // Lock the driver connection before calling the driver interface // rowsi to prevent a Tx from rolling back the connection at the same time. rs.dc.Lock() defer rs.dc.Unlock() if rs.lastcols == nil { rs.lastcols = make([]driver.Value, len(rs.rowsi.Columns())) } // 獲取下一條記錄,並放到lastcols裏面 rs.lasterr = rs.rowsi.Next(rs.lastcols) if rs.lasterr != nil { // Close the connection if there is a driver error. // 讀取出錯,返回true,以便後面關閉鏈接 if rs.lasterr != io.EOF { return true, false } nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet) if !ok { // 沒有獲取到記錄了,返回true,以便後面關閉鏈接 return true, false } // The driver is at the end of the current result set. // Test to see if there is another result set after the current one. // Only close Rows if there is no further result sets to read. if !nextResultSet.HasNextResultSet() { doClose = true } return doClose, false } return false, true } 複製代碼
Next() 的邏輯:
因此,也就是爲何一下的demo並不會出現問題同樣
for rows.Next() { user := &User{} err = db.ScanRows(rows, user) if err != nil { continue } } 複製代碼
走到這裏,開頭提出的問題應該已經有了明確的答案了: rows.Next() 在獲取到最後一條記錄以後,會調用 rows.Close() 將鏈接放回鏈接池或交給其餘等待的請求方,因此不須要手動調用 rows.Close(),
而出問題的demo中,因爲rows.Next() 沒有執行到最後一條記錄處,也沒有調用 rows.Close(), 因此在獲取到鏈接後一直沒有被放回進行復用,致使了每來一個請求建立一個新的鏈接,產生一個新的監控者 startWatcher.func1
, 最終致使了內存爆炸💥