NSQ系列之nsqlookupd代碼分析四(詳解nsqlookupd中的RegitrationDB)

NSQ系列之nsqlookupd代碼分析四(詳解nsqlookupd中的RegitrationDB操做方法)

上一章咱們大體瞭解了nsqlookupdtcpServer中的IOLoop協議的處理邏輯,裏面有提到一個存儲nsqdPeerInfo以及topic channel數據信息的RegitrationDB的一些操做方法。今天咱們就來說解一下關於RegitrationDB的操做方法git

廢話很少說,直接上代碼吧(代碼位於nsq/nsqlookupd/regitration_db.go這個文件中)sql

type RegistrationDB struct {
	sync.RWMutex                               //讀寫鎖用於併發操做
	registrationMap map[Registration]Producers //定義一個一Regitration爲key producer指針的slice爲value的map
}

type Registration struct {
	Category string
	Key      string
	SubKey   string
}
type Registrations []Registration

//用於記錄client相關信息
type PeerInfo struct {
	lastUpdate       int64  //client 心跳包最後接收時間
	id               string //client remote address
	RemoteAddress    string `json:"remote_address"`
	Hostname         string `json:"hostname"`
	BroadcastAddress string `json:"broadcast_address"`
	TCPPort          int    `json:"tcp_port"`
	HTTPPort         int    `json:"http_port"`
	Version          string `json:"version"`
}

type Producer struct {
	//PeerInfo指針
	peerInfo     *PeerInfo
	tombstoned   bool
	tombstonedAt time.Time
}

type Producers []*Producer

//實現String接口,打印出producer信息
func (p *Producer) String() string {
	return fmt.Sprintf("%s [%d, %d]", p.peerInfo.BroadcastAddress, p.peerInfo.TCPPort, p.peerInfo.HTTPPort)
}

//producer標記爲tombstoned 並記錄當前時間
func (p *Producer) Tombstone() {
	p.tombstoned = true
	p.tombstonedAt = time.Now()
}

//判斷producer是不是tombstoned
func (p *Producer) IsTombstoned(lifetime time.Duration) bool {
	return p.tombstoned && time.Now().Sub(p.tombstonedAt) < lifetime
}

//初始化一個RegistrationDB
func NewRegistrationDB() *RegistrationDB {
	return &RegistrationDB{
		registrationMap: make(map[Registration]Producers),
	}
}

// add a registration key
//添加一個Registration key 若是不存在Map中則將其設置爲你一個空的Producer
func (r *RegistrationDB) AddRegistration(k Registration) {
	r.Lock()
	defer r.Unlock()
	_, ok := r.registrationMap[k]
	if !ok {
		r.registrationMap[k] = Producers{}
	}
}

// add a producer to a registration
//添加一個producer到registration中
func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool {
	r.Lock()
	defer r.Unlock()
	producers := r.registrationMap[k]
	found := false
	for _, producer := range producers {
		if producer.peerInfo.id == p.peerInfo.id {
			found = true
		}
	}
	if found == false {
		r.registrationMap[k] = append(producers, p)
	}
	return !found
}

// remove a producer from a registration
//移除registration中一個producer
func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) {
	r.Lock()
	defer r.Unlock()
	producers, ok := r.registrationMap[k]
	if !ok {
		return false, 0
	}
	removed := false
	//這裏用到裏從一個slice中刪除一個元素的方法
	cleaned := Producers{}
	for _, producer := range producers {
		if producer.peerInfo.id != id {
			cleaned = append(cleaned, producer)
		} else {
			removed = true
		}
	}
	// Note: this leaves keys in the DB even if they have empty lists
	r.registrationMap[k] = cleaned
	//返貨是否移除以及新的producers長度
	return removed, len(cleaned)
}

// remove a Registration and all it's producers
//刪除registration下全部的producers
func (r *RegistrationDB) RemoveRegistration(k Registration) {
	r.Lock()
	defer r.Unlock()
	delete(r.registrationMap, k)
}

func (r *RegistrationDB) needFilter(key string, subkey string) bool {
	return key == "*" || subkey == "*"
}

//根據category key subkey查找Registrations
//若是傳入的key 或 subkey爲*的話則獲取全部的registrationMap中全部的registration
//若是key 或 subkey 不爲* 的話則 獲取具體的registration
//這裏實現了相似 * 這個通配符的概念
func (r *RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations {
	r.RLock()
	defer r.RUnlock()
	if !r.needFilter(key, subkey) {
		k := Registration{category, key, subkey}
		if _, ok := r.registrationMap[k]; ok {
			return Registrations{k}
		}
		return Registrations{}
	}
	results := Registrations{}
	for k := range r.registrationMap {
		if !k.IsMatch(category, key, subkey) {
			continue
		}
		results = append(results, k)
	}
	return results
}

//根據category key subkey查找全部的Producer
//同上面的FindRegistrations函數同樣,實現了*通配符的概念
func (r *RegistrationDB) FindProducers(category string, key string, subkey string) Producers {
	r.RLock()
	defer r.RUnlock()
	if !r.needFilter(key, subkey) {
		k := Registration{category, key, subkey}
		return r.registrationMap[k]
	}

	results := Producers{}
	for k, producers := range r.registrationMap {
		if !k.IsMatch(category, key, subkey) {
			continue
		}
		for _, producer := range producers {
			found := false
			for _, p := range results {
				if producer.peerInfo.id == p.peerInfo.id {
					found = true
				}
			}
			if found == false {
				results = append(results, producer)
			}
		}
	}
	return results
}

//根據producer.peerInfo.id查找所屬的registration key
func (r *RegistrationDB) LookupRegistrations(id string) Registrations {
	r.RLock()
	defer r.RUnlock()
	results := Registrations{}
	for k, producers := range r.registrationMap {
		for _, p := range producers {
			if p.peerInfo.id == id {
				results = append(results, k)
				break
			}
		}
	}
	return results
}

//依據Registration中的category key subkey,判斷是否與Registration匹配
func (k Registration) IsMatch(category string, key string, subkey string) bool {
	if category != k.Category {
		return false
	}
	if key != "*" && k.Key != key {
		return false
	}
	if subkey != "*" && k.SubKey != subkey {
		return false
	}
	return true
}

//根據category key subkey過濾Registrations
func (rr Registrations) Filter(category string, key string, subkey string) Registrations {
	output := Registrations{}
	for _, k := range rr {
		if k.IsMatch(category, key, subkey) {
			output = append(output, k)
		}
	}
	return output
}

//獲取registrationMap中全部Registration的key
func (rr Registrations) Keys() []string {
	keys := make([]string, len(rr))
	for i, k := range rr {
		keys[i] = k.Key
	}
	return keys
}

//獲取registrationMap中全部Registration的subkey
func (rr Registrations) SubKeys() []string {
	subkeys := make([]string, len(rr))
	for i, k := range rr {
		subkeys[i] = k.SubKey
	}
	return subkeys
}

//過濾出全部可用的Producer
func (pp Producers) FilterByActive(inactivityTimeout time.Duration, tombstoneLifetime time.Duration) Producers {
	now := time.Now()
	results := Producers{}
	for _, p := range pp {
		cur := time.Unix(0, atomic.LoadInt64(&p.peerInfo.lastUpdate))
		if now.Sub(cur) > inactivityTimeout || p.IsTombstoned(tombstoneLifetime) {
			continue
		}
		results = append(results, p)
	}
	return results
}

//獲取Producers中全部的PeerInfo
func (pp Producers) PeerInfo() []*PeerInfo {
	results := []*PeerInfo{}
	for _, p := range pp {
		results = append(results, p.peerInfo)
	}
	return results
}

經過上面代碼的分析咱們不難看出registration_db.go文件用map的形式保存Producer,並提供一系列增、刪、改、查的操做。同時使用RWMutex作併發控制。json

到這裏咱們講解了nsqlookupdtcpServer的所有代碼了,咱們瞭解了nsqlookupd是用來發現並記錄nsqd服務相關的remot address tcp端口 http 端口等信息 以及 相應的topicchannel信息的功能,這樣好方便消費查詢相應的topicchannelnsqd服務連接信息,已實現對nsqd進行拓撲管理的功能。併發

下一章咱們開始分析nsqlookupd中的httpServer相關的代碼app

PS:順便附送前面三章的傳送門tcp

NSQ系列之nsqlookupd代碼分析一(初探nsqlookup)函數

NSQ系列之nsqlookupd代碼分析二(初識nsqlookupd tcpServer)oop

NSQ系列之nsqlookupd代碼分析三(詳解tcpServer 中的IOLoop方法)this

PS:若是文中有錯誤,歡迎你們指正哦。atom

相關文章
相關標籤/搜索