概覽:mysql
爲何須要鏈接池git
鏈接失效問題github
database/sql 中的鏈接池sql
使用鏈接池管理Thrift連接數據庫
如下主要使用Golang做爲編程語言編程
我以爲使用鏈接池最大的一個好處就是減小鏈接的建立和關閉,增長系統負載能力,
以前就有遇到一個問題:TCP TIME_WAIT鏈接數過多致使服務不可用,由於未開啓數據庫鏈接池,再加上mysql併發較大,致使須要頻繁的建立連接,最終產生了上萬的TIME_WAIT的tcp連接,影響了系統性能。網絡
連接池中的的功能主要是管理一堆的連接,包括建立和關閉,因此本身在fatih/pool基礎上,改造了一下:https://github.com/silenceper/pool ,使得更加通用一些,增長的一些功能點以下:併發
鏈接對象不僅僅是net.Conn
,變爲了interface{}
(池中存儲本身想要的格式)app
增長了連接的最大空閒時間(保證了當鏈接空閒過久,連接失效的問題)less
主要是用到了channel
來管理鏈接,而且可以很好的利用管道的順序性,當須要使用的時候Get
一個鏈接,使用完畢以後Put
放回channel
中。
使用鏈接池以後就再也不是短鏈接,而是長鏈接了,就引起了一些問題:
由於網絡環境是複雜的,中間可能由於防火牆等緣由,致使長時間空閒的鏈接會斷開,因此能夠經過兩個方法來解決:
客戶端增長心跳,定時的給服務端發送請求
給鏈接池中的鏈接增長最大空閒時間,超時的鏈接再也不使用
在https://github.com/silenceper/pool就增長了一個這樣最大空閒時間的參數,在鏈接建立或者鏈接被從新返回鏈接池中時重置,給每一個鏈接都增長了一個鏈接的建立時間,在取出的時候對時間進行比較:https://github.com/silenceper/pool/blob/master/channel.go#L85
遠程服務端頗有可能重啓,那麼以前建立的連接就失效了。客戶端在使用的時候就須要判斷這些失效的鏈接並丟棄,在database/sql
中就判斷了這些失效的鏈接,使用這種錯誤表示var ErrBadConn = errors.New("driver: bad connection")
另外值得一提的就是在database/sql
對這種ErrBadConn
錯誤進行了重試,默認重試次數是兩次,因此可以保證即使是連接失效或者斷開了,本次的請求可以正常響應(繼續往下看就是分析了)。
鏈接失效的特徵
對鏈接進行read讀操做時,返回EOF
錯誤
對鏈接進行write操做時,返回write tcp 127.0.0.1:52089->127.0.0.1:8002: write: broken pipe
錯誤
在database/sql
中使用鏈接鏈接池很簡單,主要涉及下面這些配置:
db.SetMaxIdleConns(10) //鏈接池中最大空閒鏈接數 db.SetMaxOpenConns(20) //打開的最大鏈接數 db.SetConnMaxLifetime(300*time.Second)//鏈接的最大空閒時間(可選)
注:若是
MaxIdleConns
大於0而且MaxOpenConns
小於MaxIdleConns
,那麼會將MaxIdleConns
置爲MaxIdleConns
來看下db這個結構,以及字段相關說明:
type DB struct { //具體的數據庫實現的interface{}, //例如https://github.com/go-sql-driver/mysql 就註冊並並實現了driver.Open方法,主要是在裏面實現了一些鑑權的操做 driver driver.Driver //dsn鏈接 dsn string //在prepared statement中用到 numClosed uint64 mu sync.Mutex // protects following fields //可以使用的空閒的連接 freeConn []*driverConn //用來傳遞鏈接請求的管道 connRequests []chan connRequest //當前打開的鏈接數 numOpen int //當須要建立新的連接的時候,往這個管道中發送一個struct數據, //由於在Open數據庫的就啓用了一個goroutine執行connectionOpener方法讀取管道中的數據 openerCh chan struct{} //數據庫是否已經被關閉 closed bool //用來保證鎖被正確的關閉 dep map[finalCloser]depSet //stacktrace of last conn's put; debug only lastPut map[*driverConn]string //最大空閒鏈接 maxIdle int //最大打開的鏈接 maxOpen int //鏈接的最大空閒時間 maxLifetime time.Duration //定時清理空閒鏈接的管道 cleanerCh chan struct{} }
看一個查詢數據庫的例子:
rows, err := db.Query("select * from table1")
在調用db.Query
方法以下:
func (db *DB) Query(query string, args ...interface{}) (*Rows, error) { var rows *Rows var err error //這裏就作了對失效的連接的重試操做 for i := 0; i < maxBadConnRetries; i++ { rows, err = db.query(query, args, cachedOrNewConn) if err != driver.ErrBadConn { break } } if err == driver.ErrBadConn { return db.query(query, args, alwaysNewConn) } return rows, err }
在什麼狀況下會返回,能夠從這裏看到:
readPack,writePack
繼續跟進去就到了
func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) {
方法主要是建立tcp鏈接,並判斷了鏈接的生存時間lifetime,以及鏈接數的一些限制,若是超過的設定的最大打開連接數限制等待connRequest
管道中有鏈接產生(在putConn
釋放連接的時候就會往這個管道中寫入數據)
什麼時候釋放連接?
當咱們調用rows.Close()
的時候,就會把當前正在使用的連接從新放回freeConn
或者寫入到db.connRequests
管道中
//putConnDBLocked 方法 //若是有db.connRequests有在等待鏈接的話,就把當前鏈接給它用 if c := len(db.connRequests); c > 0 { req := db.connRequests[0] // This copy is O(n) but in practice faster than a linked list. // TODO: consider compacting it down less often and // moving the base instead? copy(db.connRequests, db.connRequests[1:]) db.connRequests = db.connRequests[:c-1] if err == nil { dc.inUse = true } req <- connRequest{ conn: dc, err: err, } return true } else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) { //沒人須要我這個連接,我就把他從新返回`freeConn`鏈接池中 db.freeConn = append(db.freeConn, dc) db.startCleanerLocked() return true }
這裏是使用鏈接池https://github.com/silenceper/pool,如何構建一個thrift連接
客戶端建立Thrift的代碼:
type Client struct { *user.UserClient } //建立Thrift客戶端連接的方法 factory := func() (interface{}, error) { protocolFactory := thrift.NewTBinaryProtocolFactoryDefault() transportFactory := thrift.NewTTransportFactory() var transport thrift.TTransport var err error transport, err = thrift.NewTSocket(rpcConfig.Listen) if err != nil { panic(err) } transport = transportFactory.GetTransport(transport) //defer transport.Close() if err := transport.Open(); err != nil { panic(err) } rpcClient := user.NewUserClientFactory(transport, protocolFactory) //在鏈接池中直接放置Client對象 return &Client{UserClient: rpcClient}, nil } //關閉鏈接的方法 close := func(v interface{}) error { v.(*Client).Transport.Close() return nil } //建立了一個 初始化鏈接是 poolConfig := &pool.PoolConfig{ InitialCap: 10, MaxCap: 20, Factory: factory, Close: close, IdleTimeout: 300 * time.Second, } p, err := pool.NewChannelPool(poolConfig) if err != nil { panic(err) } //取得連接 conn, err := p.Get() if err != nil { return nil, err } v, ok := conn.(*Client) ...使用鏈接調用遠程方法 //將鏈接從新放回鏈接池中 p.Put(conn)
pool鏈接池代碼地址:https://github.com/silenceper...
原文地址:http://silenceper.com/blog/201611/%E8%81%8A%E8%81%8Atcp%E8%BF%9E%E6%8E%A5%E6%B1%A0/