nsqlookup服務爲nsqd的服務發現,分析和借鑑下服務發現的源碼:sql
//首先看看每一個節點的地址信息,包括域名、地址、端口等等 type PeerInfo struct { lastUpdate int64 //ping值的時候update這個時間戳 id string //節點的惟一ID RemoteAddress string `json:"remote_address"` Hostname string `json:"hostname"` BroadcastAddress string `json:"broadcast_address"` TCPPort int `json:"tcp_port"` HTTPPort int `json:"http_port"` Version string `json:"version"` } //節點之上再封裝的一層結構,方便管理節點是否過時 type Producer struct { peerInfo *PeerInfo tombstoned bool tombstonedAt time.Time }
再來看看nsqlookup是怎麼去存儲管理這些Producer:json
type RegistrationDB struct { sync.RWMutex registrationMap map[Registration]ProducerMap //topic或者channel的具體分類->ProducerMap } type Registration struct { Category string //服務分類topic仍是channel Key string //topic的名字 SubKey string //channel的名字或者"" } //其中key和subKey查找的時候支持通配符"*"的方式查找 type Registrations []Registration type ProducerMap map[string]*Producer //節點Id->具體的peer的封裝結構Producer
分別看看怎麼新增Registration和Producer:app
//新增registration和producer都很簡單 func (r *RegistrationDB) AddRegistration(k Registration) { r.Lock() defer r.Unlock() _, ok := r.registrationMap[k] if !ok { //增長一個Registeration直接賦值一個map r.registrationMap[k] = make(map[string]*Producer) } } // add a producer to a registration func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool { r.Lock() defer r.Unlock() _, ok := r.registrationMap[k] if !ok { r.registrationMap[k] = make(map[string]*Producer) } //把對應的producer存到registration下面的producerMap裏面去 producers := r.registrationMap[k] _, found := producers[p.peerInfo.id] if found == false { producers[p.peerInfo.id] = p } return !found }
再看看Find相應的Registration和Producer的操做:tcp
func (r *RegistrationDB) needFilter(key string, subkey string) bool { //有通配符的時候須要過濾 return key == "*" || subkey == "*" } func (k Registration) IsMatch(category string, key string, subkey string) bool { //分別比較category,key,subkey的值來判斷 if category != k.Category { return false } if key != "*" && k.Key != key { return false } if subkey != "*" && k.SubKey != subkey { return false } return true } func (r *RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations { r.RLock() defer r.RUnlock() if !r.needFilter(key, subkey) { //不須要過濾,就不須要k的IsMath操做,直接查找 k := Registration{category, key, subkey} if _, ok := r.registrationMap[k]; ok { return Registrations{k} } return Registrations{} } results := Registrations{} for k := range r.registrationMap { //須要匹配通配符的方式查找 if !k.IsMatch(category, key, subkey) { continue } results = append(results, k) } return results } func (r *RegistrationDB) FindProducers(category string, key string, subkey string) Producers { r.RLock() defer r.RUnlock() //找ProducerMap,而後轉換爲Slice方式返回 if !r.needFilter(key, subkey) { k := Registration{category, key, subkey} return ProducerMap2Slice(r.registrationMap[k]) } results := make(map[string]*Producer) for k, producers := range r.registrationMap { if !k.IsMatch(category, key, subkey) { continue } for _, producer := range producers { _, found := results[producer.peerInfo.id] if found == false { results[producer.peerInfo.id] = producer } } } return ProducerMap2Slice(results) }
最後再看看外層如何調用這個RegistrationDB:code
func (p *LookupProtocolV1) REGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { //註冊前須要鑑權 if client.peerInfo == nil { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") } //找到topic和channel的name topic, channel, err := getTopicChan("REGISTER", params) if err != nil { return nil, err } //若是參數channel不爲"",則說明這個producer是一個channel類型,不然是一個topic類型 if channel != "" { key := Registration{"channel", topic, channel} if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) { p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "channel", topic, channel) } } //註冊完channel後還須要註冊topic對應的producer key := Registration{"topic", topic, ""} if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) { p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "topic", topic, "") } return []byte("OK"), nil } //服務取消註冊 func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { if client.peerInfo == nil { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") } topic, channel, err := getTopicChan("UNREGISTER", params) if err != nil { return nil, err } if channel != "" { 只刪除channel對應的producer key := Registration{"channel", topic, channel} removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id) if removed { p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, "channel", topic, channel) } // for ephemeral channels, remove the channel as well if it has no producers if left == 0 && strings.HasSuffix(channel, "#ephemeral") { p.ctx.nsqlookupd.DB.RemoveRegistration(key) } } else { //須要刪除topic全部對應的channel registrations := p.ctx.nsqlookupd.DB.FindRegistrations("channel", topic, "*") for _, r := range registrations { if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed { p.ctx.nsqlookupd.logf(LOG_WARN, "client(%s) unexpected UNREGISTER category:%s key:%s subkey:%s", client, "channel", topic, r.SubKey) } } //刪除topic下的producer key := Registration{"topic", topic, ""} if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id); removed { p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, "topic", topic, "") } } return []byte("OK"), nil }
總結:服務發現經過topic和channel下存放不一樣的nsqd的peer節點信息,方便找出某類topic和channel下面有哪些peer節點rem