上一章咱們大體瞭解了nsqlookupd
的tcpServer
中的IOLoop
協議的處理邏輯,裏面有提到一個存儲nsqd
的PeerInfo
以及topic
channel
數據信息的RegitrationDB
的一些操做方法。今天咱們就來說解一下關於RegitrationDB
的操做方法git
廢話很少說,直接上代碼吧(代碼位於nsq/nsqlookupd/regitration_db.go這個文件中)sql
type RegistrationDB struct { sync.RWMutex //讀寫鎖用於併發操做 registrationMap map[Registration]Producers //定義一個一Regitration爲key producer指針的slice爲value的map } type Registration struct { Category string Key string SubKey string } type Registrations []Registration //用於記錄client相關信息 type PeerInfo struct { lastUpdate int64 //client 心跳包最後接收時間 id string //client remote address RemoteAddress string `json:"remote_address"` Hostname string `json:"hostname"` BroadcastAddress string `json:"broadcast_address"` TCPPort int `json:"tcp_port"` HTTPPort int `json:"http_port"` Version string `json:"version"` } type Producer struct { //PeerInfo指針 peerInfo *PeerInfo tombstoned bool tombstonedAt time.Time } type Producers []*Producer //實現String接口,打印出producer信息 func (p *Producer) String() string { return fmt.Sprintf("%s [%d, %d]", p.peerInfo.BroadcastAddress, p.peerInfo.TCPPort, p.peerInfo.HTTPPort) } //producer標記爲tombstoned 並記錄當前時間 func (p *Producer) Tombstone() { p.tombstoned = true p.tombstonedAt = time.Now() } //判斷producer是不是tombstoned func (p *Producer) IsTombstoned(lifetime time.Duration) bool { return p.tombstoned && time.Now().Sub(p.tombstonedAt) < lifetime } //初始化一個RegistrationDB func NewRegistrationDB() *RegistrationDB { return &RegistrationDB{ registrationMap: make(map[Registration]Producers), } } // add a registration key //添加一個Registration key 若是不存在Map中則將其設置爲你一個空的Producer func (r *RegistrationDB) AddRegistration(k Registration) { r.Lock() defer r.Unlock() _, ok := r.registrationMap[k] if !ok { r.registrationMap[k] = Producers{} } } // add a producer to a registration //添加一個producer到registration中 func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool { r.Lock() defer r.Unlock() producers := r.registrationMap[k] found := false for _, producer := range producers { if producer.peerInfo.id == p.peerInfo.id { found = true } } if found == false { r.registrationMap[k] = append(producers, p) } return !found } // remove a producer from a registration //移除registration中一個producer func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) { r.Lock() defer r.Unlock() producers, ok := r.registrationMap[k] if !ok { return false, 0 } removed := false //這裏用到裏從一個slice中刪除一個元素的方法 cleaned := Producers{} for _, producer := range producers { if producer.peerInfo.id != id { cleaned = append(cleaned, producer) } else { removed = true } } // Note: this leaves keys in the DB even if they have empty lists r.registrationMap[k] = cleaned //返貨是否移除以及新的producers長度 return removed, len(cleaned) } // remove a Registration and all it's producers //刪除registration下全部的producers func (r *RegistrationDB) RemoveRegistration(k Registration) { r.Lock() defer r.Unlock() delete(r.registrationMap, k) } func (r *RegistrationDB) needFilter(key string, subkey string) bool { return key == "*" || subkey == "*" } //根據category key subkey查找Registrations //若是傳入的key 或 subkey爲*的話則獲取全部的registrationMap中全部的registration //若是key 或 subkey 不爲* 的話則 獲取具體的registration //這裏實現了相似 * 這個通配符的概念 func (r *RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations { r.RLock() defer r.RUnlock() if !r.needFilter(key, subkey) { k := Registration{category, key, subkey} if _, ok := r.registrationMap[k]; ok { return Registrations{k} } return Registrations{} } results := Registrations{} for k := range r.registrationMap { if !k.IsMatch(category, key, subkey) { continue } results = append(results, k) } return results } //根據category key subkey查找全部的Producer //同上面的FindRegistrations函數同樣,實現了*通配符的概念 func (r *RegistrationDB) FindProducers(category string, key string, subkey string) Producers { r.RLock() defer r.RUnlock() if !r.needFilter(key, subkey) { k := Registration{category, key, subkey} return r.registrationMap[k] } results := Producers{} for k, producers := range r.registrationMap { if !k.IsMatch(category, key, subkey) { continue } for _, producer := range producers { found := false for _, p := range results { if producer.peerInfo.id == p.peerInfo.id { found = true } } if found == false { results = append(results, producer) } } } return results } //根據producer.peerInfo.id查找所屬的registration key func (r *RegistrationDB) LookupRegistrations(id string) Registrations { r.RLock() defer r.RUnlock() results := Registrations{} for k, producers := range r.registrationMap { for _, p := range producers { if p.peerInfo.id == id { results = append(results, k) break } } } return results } //依據Registration中的category key subkey,判斷是否與Registration匹配 func (k Registration) IsMatch(category string, key string, subkey string) bool { if category != k.Category { return false } if key != "*" && k.Key != key { return false } if subkey != "*" && k.SubKey != subkey { return false } return true } //根據category key subkey過濾Registrations func (rr Registrations) Filter(category string, key string, subkey string) Registrations { output := Registrations{} for _, k := range rr { if k.IsMatch(category, key, subkey) { output = append(output, k) } } return output } //獲取registrationMap中全部Registration的key func (rr Registrations) Keys() []string { keys := make([]string, len(rr)) for i, k := range rr { keys[i] = k.Key } return keys } //獲取registrationMap中全部Registration的subkey func (rr Registrations) SubKeys() []string { subkeys := make([]string, len(rr)) for i, k := range rr { subkeys[i] = k.SubKey } return subkeys } //過濾出全部可用的Producer func (pp Producers) FilterByActive(inactivityTimeout time.Duration, tombstoneLifetime time.Duration) Producers { now := time.Now() results := Producers{} for _, p := range pp { cur := time.Unix(0, atomic.LoadInt64(&p.peerInfo.lastUpdate)) if now.Sub(cur) > inactivityTimeout || p.IsTombstoned(tombstoneLifetime) { continue } results = append(results, p) } return results } //獲取Producers中全部的PeerInfo func (pp Producers) PeerInfo() []*PeerInfo { results := []*PeerInfo{} for _, p := range pp { results = append(results, p.peerInfo) } return results }
經過上面代碼的分析咱們不難看出registration_db.go
文件用map
的形式保存Producer
,並提供一系列增、刪、改、查的操做。同時使用RWMutex
作併發控制。json
到這裏咱們講解了nsqlookupd
中tcpServer
的所有代碼了,咱們瞭解了nsqlookupd
是用來發現並記錄nsqd
服務相關的remot address tcp
端口 http
端口等信息 以及 相應的topic
和channel
信息的功能,這樣好方便消費查詢相應的topic
和channel
的nsqd
服務連接信息,已實現對nsqd
進行拓撲管理的功能。併發
下一章咱們開始分析nsqlookupd
中的httpServer
相關的代碼app
PS:順便附送前面三章的傳送門tcp
NSQ系列之nsqlookupd代碼分析一(初探nsqlookup)函數
NSQ系列之nsqlookupd代碼分析二(初識nsqlookupd tcpServer)oop
NSQ系列之nsqlookupd代碼分析三(詳解tcpServer 中的IOLoop方法)this
PS:若是文中有錯誤,歡迎你們指正哦。atom