NSQ是一個實時分佈式消息平臺, 旨在大規模運行, 天天處理數十億條消息, 被許多互聯網公司所使用;sql
其中 nsqd 是一個守護進程, 負責接收, 排隊, 投遞消息給客戶端;
它能夠獨立運行, 不過一般它是由 nsqlookupd 實例所在集羣配置的(它在這能聲明 topics 和 channels, 以便你們能找到);
它在 2 個 TCP 端口監聽, 一個給客戶端, 另外一個是 HTTP API; 同時, 它也能在第三個端口監聽 HTTPS數據庫
nsq
大概分nsqd
nsqlookupd
nsqadmin
三個部分json
nsqlookupd
是守護進程負責管理拓撲信息; 客戶端經過查詢nsqlookupd
來發現指定話題(topic)的生產者, 而且 nsqd 節點廣播話題(topic)和通道(channel)信息, 具備如下功能api
nsq
服務中只有一個nsqlookupd
服務, 固然也能夠在集羣中部署多個nsqlookupd
, 但它們之間是沒有關聯的nsqlookupd
崩潰, 也會不影響正在運行的nsqd
服務nsqd
和naqadmin
信息交互的中間件http
查詢服務, 給客戶端定時更新nsqd
的地址目錄nsqd
是一個守護進程, 負責接收, 排隊, 投遞消息給客戶端緩存
topic
, 同一個channel
的消費者使用負載均衡策略(不是輪詢)channel
存在, 即便沒有該channel
的消費者, 也會將生產者的message
緩存到隊列中(注意消息的過時處理)message
至少會被消費一次, 即便nsqd
退出, 也會將隊列中的消息暫存磁盤上(結束進程等意外狀況除外)nsqd
中每一個channel
隊列在內存中緩存的message
數量, 一旦超出, message
將被緩存到磁盤中topic
, channel
一旦創建, 將會一直存在, 要及時在管理臺或者用代碼清除無效的topic
和channel
, 避免資源的浪費是一套 WEB UI, 用來聚集集羣的實時統計, 並執行不一樣的管理任務併發
本文以及後面的分析都是基於 1.0.0 版本代碼, 爲了增長可讀性, 我把註釋放在了函數外, 基本都覆蓋到, 本文中就不囉嗦講如何使用了, 查閱文檔便可app
package nsqlookupd
// 鎖
// 配置選項
// tcpListener 如上文所說 tcp http 端口監聽
// httpListener
// waitGroup 線程同步
// 數據庫
type NSQLookupd struct {
sync.RWMutex
opts *Options
tcpListener net.Listener
httpListener net.Listener
waitGroup util.WaitGroupWrapper
DB *RegistrationDB
}
// 若是沒有指定 Logger, 就new一個
// new NSQLookupd, 待會看一下 `NewRegistrationDB` 作了什麼事情
// 解析 log level
func New(opts *Options) *NSQLookupd {
if opts.Logger == nil {
opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
}
n := &NSQLookupd{
opts: opts,
DB: NewRegistrationDB(),
}
var err error
opts.logLevel, err = lg.ParseLogLevel(opts.LogLevel, opts.Verbose)
if err != nil {
n.logf(LOG_FATAL, "%s", err)
os.Exit(1)
}
n.logf(LOG_INFO, version.String("nsqlookupd"))
return n
}
// 建立 context, 其實 ctx 就是 NSQLookupd, 不明白爲何畫蛇添足, 想要引入原生的 Context struct?
// 建立 tcpListener, 這裏用到了鎖, 說明該場景有併發
// 根據 ctx 建立 tcpServer
// waitGroup 線程同步後, 建立 TCPServer
// 重複以上步驟,建立 HTTPServer
func (l *NSQLookupd) Main() {
ctx := &Context{l}
tcpListener, err := net.Listen("tcp", l.opts.TCPAddress)
if err != nil {
l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.TCPAddress, err)
os.Exit(1)
}
l.Lock()
l.tcpListener = tcpListener
l.Unlock()
tcpServer := &tcpServer{ctx: ctx}
l.waitGroup.Wrap(func() {
protocol.TCPServer(tcpListener, tcpServer, l.logf)
})
httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)
if err != nil {
l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.HTTPAddress, err)
os.Exit(1)
}
l.Lock()
l.httpListener = httpListener
l.Unlock()
httpServer := newHTTPServer(ctx)
l.waitGroup.Wrap(func() {
http_api.Serve(httpListener, httpServer, "HTTP", l.logf)
})
}
// 獲取 TCP 地址, 繼續鎖, 說明地址可能會修改
func (l *NSQLookupd) RealTCPAddr() *net.TCPAddr {
l.RLock()
defer l.RUnlock()
return l.tcpListener.Addr().(*net.TCPAddr)
}
// 獲取 HTTP 地址
func (l *NSQLookupd) RealHTTPAddr() *net.TCPAddr {
l.RLock()
defer l.RUnlock()
return l.httpListener.Addr().(*net.TCPAddr)
}
// 關閉 tcpListener httpListener, 等待線程同步後結束
func (l *NSQLookupd) Exit() {
if l.tcpListener != nil {
l.tcpListener.Close()
}
if l.httpListener != nil {
l.httpListener.Close()
}
l.waitGroup.Wait()複製代碼
OK, 至此 nsqlookupd.go
已經分析完畢, 若是想知道以上代碼如何單獨使用, 能夠看測試nsqlookupd_test.go
呀 😂, 在以上代碼中, 咱們看到了 db
部分, 接下來看看負載均衡
package nsqlookupd
// 鎖
// 以 Registration 爲 key 儲存 Producers, 即生產者
type RegistrationDB struct {
sync.RWMutex
registrationMap map[Registration]Producers
}
type Registration struct {
Category string
Key string
SubKey string
}
type Registrations []Registration
// *節點信息*
// 上次更新時間
// 標識符
// 地址
// 主機名
// 廣播地址
// tcp 地址
// http 地址
// 版本號
type PeerInfo struct {
lastUpdate int64
id string
RemoteAddress string `json:"remote_address"`
Hostname string `json:"hostname"`
BroadcastAddress string `json:"broadcast_address"`
TCPPort int `json:"tcp_port"`
HTTPPort int `json:"http_port"`
Version string `json:"version"`
}
// *生產者*
// 節點信息
// 是否刪除
// 刪除時間
type Producer struct {
peerInfo *PeerInfo
tombstoned bool
tombstonedAt time.Time
}
type Producers []*Producer
// 轉換爲字符串
func (p *Producer) String() string {
return fmt.Sprintf("%s [%d, %d]", p.peerInfo.BroadcastAddress, p.peerInfo.TCPPort, p.peerInfo.HTTPPort)
}
// 刪除
func (p *Producer) Tombstone() {
p.tombstoned = true
p.tombstonedAt = time.Now()
}
// 是否刪除
func (p *Producer) IsTombstoned(lifetime time.Duration) bool {
return p.tombstoned && time.Now().Sub(p.tombstonedAt) < lifetime
}
// 建立 RegistrationDB
func NewRegistrationDB() *RegistrationDB {
return &RegistrationDB{
registrationMap: make(map[Registration]Producers),
}
}
// 增長一個註冊表 key
func (r *RegistrationDB) AddRegistration(k Registration) {
r.Lock()
defer r.Unlock()
_, ok := r.registrationMap[k]
if !ok {
r.registrationMap[k] = Producers{}
}
}
// 添加一個 producer 到 registration
// 取出 producers, 並遍歷,
// 若是不存在, 就添加進去
// 若是存在, 返回 false
func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool {
r.Lock()
defer r.Unlock()
producers := r.registrationMap[k]
found := false
for _, producer := range producers {
if producer.peerInfo.id == p.peerInfo.id {
found = true
}
}
if found == false {
r.registrationMap[k] = append(producers, p)
}
return !found
}
// 根據 id 從 registration 中刪除 producer
// 若是不存在, 返回 false
// 建立一個新的 Producers, 遍歷原來的 Producers,
// 若是 id 不相同就添加進去, 即刪除成功 簡單粗暴 哈哈哈哈哈哈
func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) {
r.Lock()
defer r.Unlock()
producers, ok := r.registrationMap[k]
if !ok {
return false, 0
}
removed := false
cleaned := Producers{}
for _, producer := range producers {
if producer.peerInfo.id != id {
cleaned = append(cleaned, producer)
} else {
removed = true
}
}
// Note: this leaves keys in the DB even if they have empty lists
r.registrationMap[k] = cleaned
return removed, len(cleaned)
}
// 刪除一個 registration
func (r *RegistrationDB) RemoveRegistration(k Registration) {
r.Lock()
defer r.Unlock()
delete(r.registrationMap, k)
}
// 須要過濾
func (r *RegistrationDB) needFilter(key string, subkey string) bool {
return key == "*" || subkey == "*"
}
// 根據 category, key, subkey 查找 Registrations
// 若是 key == '*' 或者 subkey == '*', 則只查找一個
// 不然 遍歷 registrationMap, 返回全部條件符合的 registration
func (r *RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations {
r.RLock()
defer r.RUnlock()
if !r.needFilter(key, subkey) {
k := Registration{category, key, subkey}
if _, ok := r.registrationMap[k]; ok {
return Registrations{k}
}
return Registrations{}
}
results := Registrations{}
for k := range r.registrationMap {
if !k.IsMatch(category, key, subkey) {
continue
}
results = append(results, k)
}
return results
}
// 根據 category, key, subkey 查找 Producers
// 同上 沒什麼好說的, 多了個根據 id 去重, 略囉嗦
func (r *RegistrationDB) FindProducers(category string, key string, subkey string) Producers {
r.RLock()
defer r.RUnlock()
if !r.needFilter(key, subkey) {
k := Registration{category, key, subkey}
return r.registrationMap[k]
}
results := Producers{}
for k, producers := range r.registrationMap {
if !k.IsMatch(category, key, subkey) {
continue
}
for _, producer := range producers {
found := false
for _, p := range results {
if producer.peerInfo.id == p.peerInfo.id {
found = true
}
}
if found == false {
results = append(results, producer)
}
}
}
return results
}
// 根據 id 查找 Registrations
// 依然遍歷 沒什麼好說的
func (r *RegistrationDB) LookupRegistrations(id string) Registrations {
r.RLock()
defer r.RUnlock()
results := Registrations{}
for k, producers := range r.registrationMap {
for _, p := range producers {
if p.peerInfo.id == id {
results = append(results, k)
break
}
}
}
return results
}
// 是否匹配
func (k Registration) IsMatch(category string, key string, subkey string) bool {
if category != k.Category {
return false
}
if key != "*" && k.Key != key {
return false
}
if subkey != "*" && k.SubKey != subkey {
return false
}
return true
}
// 過濾
func (rr Registrations) Filter(category string, key string, subkey string) Registrations {
output := Registrations{}
for _, k := range rr {
if k.IsMatch(category, key, subkey) {
output = append(output, k)
}
}
return output
}
// keys
func (rr Registrations) Keys() []string {
keys := make([]string, len(rr))
for i, k := range rr {
keys[i] = k.Key
}
return keys
}
// subkeys
func (rr Registrations) SubKeys() []string {
subkeys := make([]string, len(rr))
for i, k := range rr {
subkeys[i] = k.SubKey
}
return subkeys
}
// 根據時間過濾
func (pp Producers) FilterByActive(inactivityTimeout time.Duration, tombstoneLifetime time.Duration) Producers {
now := time.Now()
results := Producers{}
for _, p := range pp {
cur := time.Unix(0, atomic.LoadInt64(&p.peerInfo.lastUpdate))
if now.Sub(cur) > inactivityTimeout || p.IsTombstoned(tombstoneLifetime) {
continue
}
results = append(results, p)
}
return results
}
// 節點信息
func (pp Producers) PeerInfo() []*PeerInfo {
results := []*PeerInfo{}
for _, p := range pp {
results = append(results, p.peerInfo)
}
return results
}複製代碼
好了, 能夠看出 RegistrationDB
以 map
結構包含了全部節點信息; 名爲db
, 實則最多算個cache
罷了 2333333; 印證了上文中的 客戶端經過查詢 nsqlookupd 來發現指定話題(topic)的生產者
;tcp
好了, 第一篇暫時結束, 接下來的敬請期待分佈式