本司禮物系統使用了golang的 mongo庫 mgo,中間踩了一些坑,總結下避免你們再踩坑php
golang的mgo庫說明裏是說明了開啓鏈接複用的,但觀察實驗發現,這並無根本實現鏈接的控制,鏈接複用僅在有空閒鏈接時生效,高併發時無可用鏈接會不斷建立新鏈接,因此最終仍是須要程序員自行去限制最大鏈接才行。程序員
廢話很少說,開始上代碼golang
GlobalMgoSession, err := mgo.Dial(host)
func (m *MongoBaseDao) Get(tablename string, id string, result
interface
{})
interface
{} {
session := GlobalMgoSession.Clone()
defer session.Close()
collection := session.DB(globalMgoDbName).C(tablename)
err := collection.FindId(bson.ObjectIdHex(id)).One(result)
if
err != nil {
logkit.Logger.Error(
"mongo_base method:Get "
+ err.Error())
}
return
result
}
|
golang main入口啓動時,咱們會建立一個全局session,而後每次使用時clone session的信息和鏈接,用於本次請求,使用後調用session.Close() 釋放鏈接。session
// Clone works just like Copy, but also reuses the same socket as the original
// session, in case it had already reserved one due to its consistency
// guarantees. This behavior ensures that writes performed in the old session
// are necessarily observed when using the new session, as long as it was a
// strong or monotonic session. That said, it also means that long operations
// may cause other goroutines using the original session to wait.
func (s *Session) Clone() *Session {
s.m.Lock()
scopy := copySession(s, true)
s.m.Unlock()
return
scopy
}
// Close terminates the session. It's a runtime error to use a session
// after it has been closed.
func (s *Session) Close() {
s.m.Lock()
if
s.cluster_ != nil {
debugf(
"Closing session %p"
, s)
s.unsetSocket()
//釋放當前線程佔用的socket 置爲nil
s.cluster_.Release()
s.cluster_ = nil
}
s.m.Unlock()
}
|
Clone的方法註釋裏說明會重用原始session的socket鏈接,可是併發請求一大,其餘協程來不及釋放鏈接,當前協程會怎麼辦?併發
func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) {
// Read-only lock to check for previously reserved socket.
s.m.RLock()
// If there is a slave socket reserved and its use is acceptable, take it as long
// as there isn't a master socket which would be preferred by the read preference mode.
if
s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {
socket := s.slaveSocket
socket.Acquire()
s.m.RUnlock()
logkit.Logger.Info(
"sgp_test 1 acquireSocket slave is ok!"
)
return
socket, nil
}
if
s.masterSocket != nil {
socket := s.masterSocket
socket.Acquire()
s.m.RUnlock()
logkit.Logger.Info(
"sgp_test 1 acquireSocket master is ok!"
)
return
socket, nil
}
s.m.RUnlock()
// No go. We may have to request a new socket and change the session,
// so try again but with an exclusive lock now.
s.m.Lock()
defer s.m.Unlock()
if
s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {
s.slaveSocket.Acquire()
logkit.Logger.Info(
"sgp_test 2 acquireSocket slave is ok!"
)
return
s.slaveSocket, nil
}
if
s.masterSocket != nil {
s.masterSocket.Acquire()
logkit.Logger.Info(
"sgp_test 2 acquireSocket master is ok!"
)
return
s.masterSocket, nil
}
// Still not good. We need a new socket.
sock, err := s.cluster().AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit)
......
logkit.Logger.Info(
"sgp_test 3 acquireSocket cluster AcquireSocket is ok!"
)
return
sock, nil
}
|
在源碼中加debug,結果日誌說明一切:app
Mar 25 09:46:40 dev02.pandatv.com bikini[12607]: [info] sgp_test 1 acquireSocket master is ok!
Mar 25 09:46:40 dev02.pandatv.com bikini[12607]: [info] sgp_test 1 acquireSocket master is ok!
Mar 25 09:46:41 dev02.pandatv.com bikini[12607]: [info] sgp_test 1 acquireSocket slave is ok!
Mar 25 09:46:41 dev02.pandatv.com bikini[12607]: [info] sgp_test 3 acquireSocket cluster AcquireSocket is ok!
Mar 25 09:46:41 dev02.pandatv.com bikini[12607]: [info] sgp_test 3 acquireSocket cluster AcquireSocket is ok!
Mar 25 09:46:41 dev02.pandatv.com bikini[12607]: [info] sgp_test 3 acquireSocket cluster AcquireSocket is ok!
|
不斷的建立鏈接 AcquireSocketsocket
$ netstat -nat|grep -i 27017|wc -l高併發
400優化
若是每一個session 不調用close,會達到恐怖的4096,並堵死其餘請求,因此clone或copy session時必定要defer close掉ui
啓用maxPoolLimit 參數則會限制總鏈接大小,鏈接到限制則當前協程會sleep等待 直到能夠建立鏈接,高併發時鎖有問題,會致使多建立幾個鏈接
src/gopkg.in/mgo.v2/cluster.go
s, abended, err := server.AcquireSocket(poolLimit, socketTimeout)
if
err == errPoolLimit {
if
!warnedLimit {
warnedLimit = true
logkit.Logger.Error(
"sgp_test WARNING: Per-server connection limit reached. "
+ err.Error())
log(
"WARNING: Per-server connection limit reached."
)
}
time.Sleep(100 * time.Millisecond)
continue
}
session.go:
// SetPoolLimit sets the maximum number of sockets in use in a single server
// before this session will block waiting for a socket to be available.
// The default limit is 4096.
//
// This limit must be set to cover more than any expected workload of the
// application. It is a bad practice and an unsupported use case to use the
// database driver to define the concurrency limit of an application. Prevent
// such concurrency "at the door" instead, by properly restricting the amount
// of used resources and number of goroutines before they are created.
func (s *Session) SetPoolLimit(limit int) {
s.m.Lock()
s.poolLimit = limit
s.m.Unlock()
}
|
鏈接池設置方法:
一、配置中 增長
[host]:[port]?maxPoolSize=10
二、代碼中 :
dao.GlobalMgoSession.SetPoolLimit(10)
再作壓測:
$ netstat -nat|grep -i 27017|wc -l
15
結論:
每次clone session以後,操做結束時若是調用 session.Close 則會unset Socket ,socket refer數減小,若是不設置上限,每一個協程請求到來發現無空閒鏈接就會建立socket鏈接,直到達到最大值4096,而mongo的鏈接數上限通常也就是1萬,也就是一個端口你只能啓動一兩個進程保證鏈接不被撐爆,過多的鏈接數客戶端效率不高,server端更會耗費內存和CPU,因此須要啓用自定義鏈接池 , 啓用鏈接池也須要注意若是有pooMaxLimit個協程執行過長或者死循環不釋放socket鏈接,也會悲劇。
mgo底層socket鏈接池只在maxPooMaxLimit 範圍內實現複用,須要自行優化。