golang操做mongo使用的包是"gopkg.in/mgo.v2",coding過程當中須要併發讀寫mongo數據庫,簡單觀摩了下源碼,記錄下本身的一些理解,若有錯誤,敬請斧正。 golang
通常來講,咱們直接這樣建立一個session:mongodb
Session, err = mgo.Dial(URL)數據庫
if err != nil {數組
log.Println(err)緩存
}安全
來看看Dial這個函數作了什麼:服務器
func Dial(url string) (*Session, error) {網絡
session, err := DialWithTimeout(url, 10*time.Second)session
if err == nil {數據結構
session.SetSyncTimeout(1 * time.Minute)
session.SetSocketTimeout(1 * time.Minute)
}
return session, err
}
調用DialWithTimeout函數設置默認的超時時間是10秒。該函數中調用了DialWithInfo這個函數,而DialWithInfo函數中比較重要是是調用了newSession,看看這個函數作了什麼操做:
func newSession(consistency Mode, cluster *mongoCluster, timeout time.Duration) (session *Session) {
cluster.Acquire()
session = &Session{
cluster_: cluster,
syncTimeout: timeout,
sockTimeout: timeout,
poolLimit: 4096,
}
debugf("New session %p on cluster %p", session, cluster)
session.SetMode(consistency, true)
session.SetSafe(&Safe{})
session.queryConfig.prefetch = defaultPrefetch
return session
}
返回的session設置了一些默認的參數,暫時先忽略,直接看看Session的數據結構:
type Session struct {
m sync.RWMutex
cluster_ *mongoCluster
slaveSocket *mongoSocket
masterSocket *mongoSocket
slaveOk bool
consistency Mode
queryConfig query
safeOp *queryOp
syncTimeout time.Duration
sockTimeout time.Duration
defaultdb string
sourcedb string
dialCred *Credential
creds []Credential
poolLimit int
bypassValidation bool
}
m是mgo.Session的併發鎖,所以全部的Session實例都是線程安全的。slaveSocket,masterSocket表明了該Session到mongodb主節點和從節點的一個物理鏈接的緩存。而Session的策略老是優先使用緩存的鏈接。是否緩存鏈接,由consistency也就是該Session的模式決定。假設在併發程序中,使用同一個Session實例,不使用Copy,而該Session實例的模式又剛好會緩存鏈接,那麼,全部的經過該Session實例的操做,都會經過同一條鏈接到達mongodb。雖然mongodb自己的網絡模型是非阻塞通訊,請求能夠經過一條鏈路,非阻塞地處理,可是會影響效率。
其次mgo.Session緩存的一主一從鏈接,實例自己不負責維護。也就是說,當slaveSocket,masterSocket任意其一,鏈接斷開,Session本身不會重置緩存,該Session的使用者若是不主動重置緩存,調用者獲得的將永遠是EOF。這種狀況在主從切換時就會發生,在網絡抖動時也會發生。
mgo的DB句柄須要你作一個copy操做:
// Copy works just like New, but preserves the exact authentication
// information from the original session.
func (s *Session) Copy() *Session {
s.m.Lock()
scopy := copySession(s, true)
s.m.Unlock()
scopy.Refresh()
return scopy
}
copySession將源Session淺拷貝到臨時Session中,這樣源Session的配置就拷貝到了臨時Session中。關鍵的Refresh,將源Session淺拷貝到臨時Session的鏈接緩存指針,也就是slaveSocket,masterSocket置爲空,這樣臨時Session就不存在緩存鏈接,而轉爲去嘗試獲取一個空閒的鏈接。
mgo自身維護了一套到mongodb集羣的鏈接池。這套鏈接池機制以mongodb數據庫服務器爲最小單位,每一個mongodb都會在mgo內部,對應一個mongoServer結構體的實例,一個實例表明着mgo持有的到該數據庫的鏈接。看看這個鏈接池的定義:
type mongoServer struct {
sync.RWMutex
Addr string
ResolvedAddr string
tcpaddr *net.TCPAddr
unusedSockets []*mongoSocket
liveSockets []*mongoSocket
closed bool
abended bool
sync chan bool
dial dialer
pingValue time.Duration
pingIndex int
pingCount uint32
pingWindow [6]time.Duration
info *mongoServerInfo
}
info表明了該實例對應的數據庫服務器在集羣中的信息——是否master,ReplicaSetName等。而兩個Slice,就是傳說中的鏈接池。unusedSockets存儲當前空閒的鏈接,liveSockets存儲當前活躍中的鏈接,Session緩存的鏈接就同時存放在liveSockets切片中,而臨時Session獲取到的鏈接就位於unusedSockets切片中。
每一個mongoServer都會隸屬於一個mongoCluster結構,至關於mgo在內部,模擬出了mongo數據庫集羣的模型。
type mongoCluster struct {
sync.RWMutex
serverSynced sync.Cond
userSeeds []string
dynaSeeds []string
servers mongoServers
masters mongoServers
references int
syncing bool
direct bool
failFast bool
syncCount uint
setName string
cachedIndex map[string]bool
sync chan bool
dial dialer
}
mongoCluster持有一系列mongoServer的實例,以主從結構分散到兩個數組中。 每一個Session都會存儲本身對應的,要操做的mongoCluster的引用。
前面的描述能夠總結成下面這張圖:
那麼咱們在使用的時候就能夠建立一個Session,而後clone操做,用clone獲得的copysession完成操做,結束後關閉這個copysession就能夠了。