labix.org/mgo是golang經常使用的mongo driver
,筆者的項目中重度依賴,不過項目年久失修,已經不維護。因此結論是用官方包。git
最近在使用中,我發現了一個問題,服務對mongo的長鏈接一直緩慢增加,形似mongo鏈接泄漏。github
查看了下mgo源碼,發現mgo內部維護了鏈接池,而默認鏈接池大小socketsPerServer
是4096,鏈接池的鏈接新增是惰性的,不會在初始化時創建全部鏈接,而是在有新請求且當前無剩餘可用鏈接時,創建新鏈接,使用結束後就放入鏈接池中供之後使用,期待鏈接池所維護的最多鏈接數是4096個。鏈接池大小限制代碼以下,請注意mongoServer.unusedSockets
(空閒的鏈接)和mongoServer.liveSockets
(生效過的鏈接)的含義:golang
// 鏈接池大小:
var socketsPerServer = 4096
// 鏈接池大小設置:
func SetPoolLimitPerServer(limit int) {
socketsPerServer = limit
}
// 鏈接池管理結構
type mongoServer struct {
...
unusedSockets []*mongoSocket // 空閒的鏈接
liveSockets []*mongoSocket // 生效過的鏈接,這裏記錄全部創建的鏈接,當鏈接使用結束會複製一份進unusedSockets,可是不會從liveSockets刪除
...
}
複製代碼
如上代碼,socketsPerServer
是能夠經過方法SetPoolLimitPerServer
改寫的,因而我在下面的代碼中,OpenDB裏調用SetPoolLimitPerServer
將鏈接池上限改爲了20,可是經過併發測試發現,鏈接池的限制並不許確,實際運行下來最後創建的鏈接依然會超過20。mongodb
主要代碼以下,我插入200條數據,爲了防止數據庫壓力,經過筆者寫的goworker以穩定的併發90寫入,其餘會進行排隊。數據庫
最後經過mgo提供的GetStats()接口(或者在服務端經過db.serverStatus()也能看到鏈接數量connections.current
),可以看到池裏的鏈接爲SocketsAlive
59個,遠遠超過了20,這是爲何呢?併發
// 測試mgo鏈接泄露問題
func main() {
db := OpenDB(xl, fmgo.Config{Config: mgo3.Config{Host: "127.0.0.1:27017", DB: "test_black", Coll: "connectiontest", Mode: "strong", SyncTimeoutInS: 5}})
defer db.Close()
mgo.SetStats(true)
// 初始化 goworker,併發不超過90
worker := goworker.New(goworker.WorkerConfig{
ConcurrencyNum: 90,
})
// 總共插入200條數據
for i := 0; i < 200; i++ {
var j = i
worker.Add(func() {
db.Insert(mgoDBInfo{Name: strconv.Itoa(j) + "_name"})
})
}
worker.IsDone()
stats := mgo.GetStats()
fmt.Printf("%+v", stats)
// 輸出:{Clusters:0 MasterConns:59 SlaveConns:0 SentOps:459 ReceivedOps:259 ReceivedDocs:259 SocketsAlive:59 SocketsInUse:0 SocketRefs:0}
time.Sleep(100 * time.Second)
}
複製代碼
問題出如今當有新的請求時,mgo
的鏈接池管理邏輯,當須要新建一個鏈接時,併發狀況下會有問題,精簡代碼以下app
func (server *mongoServer) AcquireSocket(limit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
for {
server.Lock()
n := len(server.unusedSockets)
// 判斷當前正在使用的鏈接,是否到鏈接池上限,若是到了上限就退出等待
if len(server.liveSockets)-n >= limit {
server.Unlock()
return nil, false, errSocketLimit
}
if n > 0{
// 拿一個可用的 unusedSockets
} else {
server.Unlock()
// bug here
// 這裏 unlock->connect->lock,是由於若是不unlock,新建鏈接時間太長,致使阻塞全部併發AcquireSocket的請求
// 這樣作在併發時就會有一個bug,若是鏈接池大小是20,而同時併發的進入30個AcquireSocket,全部的請求都會走到下面的Connect(),並正常的拿到鏈接,加入liveSockets,返回成功,致使鏈接池裏有30個鏈接
socket, err = server.Connect()
if err == nil {
server.Lock()
if server.closed {
server.UnLock()
return nil, abended, errServerClosed
}
server.liveSockets = append(server.liveSockets, socket)
server.Unlock()
}
}
return
}
panic("unreachable")
}
複製代碼
從代碼邏輯看,mgo
對鏈接池的理解和通常的理解不一樣:socket
mgo
的鏈接池管理裏,從上面的方法裏用if len(server.liveSockets)-n >= limit
判斷鏈接池是否已經滿能夠看出, mgo
認爲當前正在用的鏈接數 <= 鏈接池大小 就能夠了,沒有使用的鏈接不該該受鏈接池大小控制。(注意,第一部分已經強調,liveSockets是歷史創建的全部鏈接數量,不是正在使用socket的數量)很明顯,mgo
的定義是不對的,這樣會致使真實創建的鏈接(正在使用的+池裏未使用的)>鏈接池限制。測試
針對這一點,咱們在拿到新鏈接並加鎖後,判斷一下當前創建的鏈接是否已經超限,超限就關閉當前鏈接並等待,解決這個問題。ui
這個解決方案,會放棄新建的鏈接,對資源是有必定的浪費的,由於畢竟新建鏈接是耗時的。可是一旦創建後,就致使了鏈接泄漏,因此是不得已而爲之。
func (server *mongoServer) AcquireSocket(limit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
socket, err = server.Connect()
if err == nil {
server.Lock()
if server.closed {
server.UnLock()
return nil, abended, errServerClosed
}
// fix bug start
// +1 是要算上當前新建的這個鏈接
if limit > 0 && len(server.liveSockets)-n+1 > limit {
server.Unlock()
socket.Release()
socket.Close()
return nil, false, errSocketLimit
}
// fix bug end
server.liveSockets = append(server.liveSockets, socket)
server.Unlock()
}
}
複製代碼