nsq源碼分析之nsqlookup實現

nsqlookup服務爲nsqd的服務發現,分析和借鑑下服務發現的源碼:sql

//首先看看每一個節點的地址信息,包括域名、地址、端口等等

type PeerInfo struct {
	lastUpdate       int64                           //ping值的時候update這個時間戳 
	id               string                          //節點的惟一ID
	RemoteAddress    string `json:"remote_address"`  
	Hostname         string `json:"hostname"`        
	BroadcastAddress string `json:"broadcast_address"`
	TCPPort          int    `json:"tcp_port"`
	HTTPPort         int    `json:"http_port"`
	Version          string `json:"version"`
}

//節點之上再封裝的一層結構,方便管理節點是否過時
type Producer struct {
	peerInfo     *PeerInfo
	tombstoned   bool
	tombstonedAt time.Time
}

再來看看nsqlookup是怎麼去存儲管理這些Producer:json

type RegistrationDB struct {
	sync.RWMutex
	registrationMap map[Registration]ProducerMap //topic或者channel的具體分類->ProducerMap
}

type Registration struct {
	Category string     //服務分類topic仍是channel
	Key      string     //topic的名字
	SubKey   string     //channel的名字或者""
}

//其中key和subKey查找的時候支持通配符"*"的方式查找

type Registrations []Registration

type ProducerMap map[string]*Producer //節點Id->具體的peer的封裝結構Producer

分別看看怎麼新增Registration和Producer:app

//新增registration和producer都很簡單
func (r *RegistrationDB) AddRegistration(k Registration) {
	r.Lock()
	defer r.Unlock()
	_, ok := r.registrationMap[k]
	if !ok {
        //增長一個Registeration直接賦值一個map
		r.registrationMap[k] = make(map[string]*Producer)
	}
}

// add a producer to a registration
func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool {
	r.Lock()
	defer r.Unlock()
	_, ok := r.registrationMap[k]
	if !ok {
		r.registrationMap[k] = make(map[string]*Producer)
	}
    //把對應的producer存到registration下面的producerMap裏面去
	producers := r.registrationMap[k]
	_, found := producers[p.peerInfo.id]
	if found == false {
		producers[p.peerInfo.id] = p
	}
	return !found
}

再看看Find相應的Registration和Producer的操做:tcp

func (r *RegistrationDB) needFilter(key string, subkey string) bool {
    //有通配符的時候須要過濾
	return key == "*" || subkey == "*"
}

func (k Registration) IsMatch(category string, key string, subkey string) bool {
    //分別比較category,key,subkey的值來判斷
	if category != k.Category {
		return false
	}
	if key != "*" && k.Key != key {
		return false
	}
	if subkey != "*" && k.SubKey != subkey {
		return false
	}
	return true
}

func (r *RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations {
	r.RLock()
	defer r.RUnlock()
	if !r.needFilter(key, subkey) {
        //不須要過濾,就不須要k的IsMath操做,直接查找
		k := Registration{category, key, subkey}
		if _, ok := r.registrationMap[k]; ok {
			return Registrations{k}
		}
		return Registrations{}
	}
	results := Registrations{}
	for k := range r.registrationMap {
        //須要匹配通配符的方式查找
		if !k.IsMatch(category, key, subkey) {
			continue
		}
		results = append(results, k)
	}
	return results
}

func (r *RegistrationDB) FindProducers(category string, key string, subkey string) Producers {
	r.RLock()
	defer r.RUnlock()
    //找ProducerMap,而後轉換爲Slice方式返回
	if !r.needFilter(key, subkey) {
		k := Registration{category, key, subkey}
		return ProducerMap2Slice(r.registrationMap[k])
	}

	results := make(map[string]*Producer)
	for k, producers := range r.registrationMap {
		if !k.IsMatch(category, key, subkey) {
			continue
		}
		for _, producer := range producers {
			_, found := results[producer.peerInfo.id]
			if found == false {
				results[producer.peerInfo.id] = producer
			}
		}
	}
	return ProducerMap2Slice(results)
}

最後再看看外層如何調用這個RegistrationDB:code

func (p *LookupProtocolV1) REGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
    //註冊前須要鑑權
	if client.peerInfo == nil {
		return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY")
	}
    //找到topic和channel的name
	topic, channel, err := getTopicChan("REGISTER", params)
	if err != nil {
		return nil, err
	}
    
    //若是參數channel不爲"",則說明這個producer是一個channel類型,不然是一個topic類型 
	if channel != "" {
		key := Registration{"channel", topic, channel}
		if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) {
			p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s",
				client, "channel", topic, channel)
		}
	}
    //註冊完channel後還須要註冊topic對應的producer
	key := Registration{"topic", topic, ""}
	if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) {
		p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s",
			client, "topic", topic, "")
	}

	return []byte("OK"), nil
}

//服務取消註冊
func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
	if client.peerInfo == nil {
		return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY")
	}

	topic, channel, err := getTopicChan("UNREGISTER", params)
	if err != nil {
		return nil, err
	}

	if channel != "" {
        只刪除channel對應的producer
		key := Registration{"channel", topic, channel}
		removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id)
		if removed {
			p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
				client, "channel", topic, channel)
		}
		// for ephemeral channels, remove the channel as well if it has no producers
		if left == 0 && strings.HasSuffix(channel, "#ephemeral") {
			p.ctx.nsqlookupd.DB.RemoveRegistration(key)
		}
	} else {
		//須要刪除topic全部對應的channel
		registrations := p.ctx.nsqlookupd.DB.FindRegistrations("channel", topic, "*")
		for _, r := range registrations {
			if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed {
				p.ctx.nsqlookupd.logf(LOG_WARN, "client(%s) unexpected UNREGISTER category:%s key:%s subkey:%s",
					client, "channel", topic, r.SubKey)
			}
		}
        //刪除topic下的producer
		key := Registration{"topic", topic, ""}
		if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id); removed {
			p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
				client, "topic", topic, "")
		}
	}

	return []byte("OK"), nil
}

總結:服務發現經過topic和channel下存放不一樣的nsqd的peer節點信息,方便找出某類topic和channel下面有哪些peer節點rem

相關文章
相關標籤/搜索