聊聊TCP鏈接池

概覽:mysql

  • 爲何須要鏈接池git

  • 鏈接失效問題github

  • database/sql 中的鏈接池sql

  • 使用鏈接池管理Thrift連接數據庫


如下主要使用Golang做爲編程語言編程

爲何須要鏈接池

我以爲使用鏈接池最大的一個好處就是減小鏈接的建立和關閉,增長系統負載能力
以前就有遇到一個問題:TCP TIME_WAIT鏈接數過多致使服務不可用,由於未開啓數據庫鏈接池,再加上mysql併發較大,致使須要頻繁的建立連接,最終產生了上萬的TIME_WAIT的tcp連接,影響了系統性能。網絡

連接池中的的功能主要是管理一堆的連接,包括建立和關閉,因此本身在fatih/pool基礎上,改造了一下:https://github.com/silenceper/pool ,使得更加通用一些,增長的一些功能點以下:併發

  • 鏈接對象不僅僅是net.Conn,變爲了interface{}(池中存儲本身想要的格式)app

  • 增長了連接的最大空閒時間(保證了當鏈接空閒過久,連接失效的問題)less

主要是用到了channel來管理鏈接,而且可以很好的利用管道的順序性,當須要使用的時候Get一個鏈接,使用完畢以後Put放回channel中。

鏈接失效問題

使用鏈接池以後就再也不是短鏈接,而是長鏈接了,就引起了一些問題:

一、長時間空閒,鏈接斷開?

由於網絡環境是複雜的,中間可能由於防火牆等緣由,致使長時間空閒的鏈接會斷開,因此能夠經過兩個方法來解決:

  • 客戶端增長心跳,定時的給服務端發送請求

  • 給鏈接池中的鏈接增長最大空閒時間,超時的鏈接再也不使用

https://github.com/silenceper/pool就增長了一個這樣最大空閒時間的參數,在鏈接建立或者鏈接被從新返回鏈接池中時重置,給每一個鏈接都增長了一個鏈接的建立時間,在取出的時候對時間進行比較:https://github.com/silenceper/pool/blob/master/channel.go#L85

二、當服務端重啓以後,鏈接失效?

遠程服務端頗有可能重啓,那麼以前建立的連接就失效了。客戶端在使用的時候就須要判斷這些失效的鏈接並丟棄,在database/sql中就判斷了這些失效的鏈接,使用這種錯誤表示var ErrBadConn = errors.New("driver: bad connection")

另外值得一提的就是在database/sql對這種ErrBadConn錯誤進行了重試,默認重試次數是兩次,因此可以保證即使是連接失效或者斷開了,本次的請求可以正常響應(繼續往下看就是分析了)。

鏈接失效的特徵

  • 對鏈接進行read讀操做時,返回EOF錯誤

  • 對鏈接進行write操做時,返回write tcp 127.0.0.1:52089->127.0.0.1:8002: write: broken pipe錯誤

database/sql 中的鏈接池

database/sql中使用鏈接鏈接池很簡單,主要涉及下面這些配置:

db.SetMaxIdleConns(10) //鏈接池中最大空閒鏈接數
    db.SetMaxOpenConns(20) //打開的最大鏈接數
    db.SetConnMaxLifetime(300*time.Second)//鏈接的最大空閒時間(可選)

注:若是MaxIdleConns大於0而且MaxOpenConns小於MaxIdleConns ,那麼會將MaxIdleConns置爲MaxIdleConns

來看下db這個結構,以及字段相關說明:

type DB struct {
    //具體的數據庫實現的interface{},
    //例如https://github.com/go-sql-driver/mysql 就註冊並並實現了driver.Open方法,主要是在裏面實現了一些鑑權的操做
    driver driver.Driver  
    //dsn鏈接
    dsn    string
    //在prepared statement中用到
    numClosed uint64

    mu           sync.Mutex // protects following fields
    //可以使用的空閒的連接
    freeConn     []*driverConn
    //用來傳遞鏈接請求的管道
    connRequests []chan connRequest
    //當前打開的鏈接數
    numOpen      int    
    //當須要建立新的連接的時候,往這個管道中發送一個struct數據,
    //由於在Open數據庫的就啓用了一個goroutine執行connectionOpener方法讀取管道中的數據
    openerCh    chan struct{}
    //數據庫是否已經被關閉
    closed      bool
    //用來保證鎖被正確的關閉
    dep         map[finalCloser]depSet
    //stacktrace of last conn's put; debug only
    lastPut     map[*driverConn]string 
    //最大空閒鏈接
    maxIdle     int                  
    //最大打開的鏈接
    maxOpen     int                    
    //鏈接的最大空閒時間
    maxLifetime time.Duration          
    //定時清理空閒鏈接的管道
    cleanerCh   chan struct{}
}

看一個查詢數據庫的例子:

rows, err := db.Query("select * from table1")

在調用db.Query方法以下:

func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {
    var rows *Rows
    var err error
    //這裏就作了對失效的連接的重試操做
    for i := 0; i < maxBadConnRetries; i++ {
        rows, err = db.query(query, args, cachedOrNewConn)
        if err != driver.ErrBadConn {
            break
        }
    }
    if err == driver.ErrBadConn {
        return db.query(query, args, alwaysNewConn)
    }
    return rows, err
}

在什麼狀況下會返回,能夠從這裏看到:
readPackwritePack

繼續跟進去就到了

func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) {

方法主要是建立tcp鏈接,並判斷了鏈接的生存時間lifetime,以及鏈接數的一些限制,若是超過的設定的最大打開連接數限制等待connRequest管道中有鏈接產生(在putConn釋放連接的時候就會往這個管道中寫入數據)

什麼時候釋放連接?

當咱們調用rows.Close()的時候,就會把當前正在使用的連接從新放回freeConn或者寫入到db.connRequests管道中

//putConnDBLocked 方法
    
    //若是有db.connRequests有在等待鏈接的話,就把當前鏈接給它用
    if c := len(db.connRequests); c > 0 {
        req := db.connRequests[0]
        // This copy is O(n) but in practice faster than a linked list.
        // TODO: consider compacting it down less often and
        // moving the base instead?
        copy(db.connRequests, db.connRequests[1:])
        db.connRequests = db.connRequests[:c-1]
        if err == nil {
            dc.inUse = true
        }
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    } else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) {
    //沒人須要我這個連接,我就把他從新返回`freeConn`鏈接池中
        db.freeConn = append(db.freeConn, dc)
        db.startCleanerLocked()
        return true
    }

使用鏈接池管理Thrift連接

這裏是使用鏈接池https://github.com/silenceper/pool,如何構建一個thrift連接

客戶端建立Thrift的代碼:

type Client struct {
    *user.UserClient
}


//建立Thrift客戶端連接的方法
factory := func() (interface{}, error) {
    protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()
    transportFactory := thrift.NewTTransportFactory()

    var transport thrift.TTransport
    var err error
    transport, err = thrift.NewTSocket(rpcConfig.Listen)
    if err != nil {
        panic(err)
    }
    transport = transportFactory.GetTransport(transport)
    //defer transport.Close()
    if err := transport.Open(); err != nil {
        panic(err)
    }
    rpcClient := user.NewUserClientFactory(transport, protocolFactory)
    //在鏈接池中直接放置Client對象
    return &Client{UserClient: rpcClient}, nil
}
//關閉鏈接的方法
close := func(v interface{}) error {
    v.(*Client).Transport.Close()
    return nil
}

//建立了一個 初始化鏈接是
poolConfig := &pool.PoolConfig{
    InitialCap: 10,
    MaxCap:     20,
    Factory:     factory,
    Close:       close,
    IdleTimeout: 300 * time.Second,
}
p, err := pool.NewChannelPool(poolConfig)
if err != nil {
    panic(err)
}

//取得連接
conn, err := p.Get()
if err != nil {
    return nil, err
}
v, ok := conn.(*Client)

...使用鏈接調用遠程方法

//將鏈接從新放回鏈接池中
p.Put(conn)

pool鏈接池代碼地址:https://github.com/silenceper...

原文地址:http://silenceper.com/blog/201611/%E8%81%8A%E8%81%8Atcp%E8%BF%9E%E6%8E%A5%E6%B1%A0/

相關文章
相關標籤/搜索