nsqlookupd
是守護進程負責管理拓撲信息。客戶端經過查詢 nsqlookupd
來發現指定話題(topic
)的生產者,而且提供 nsqd
節點廣播話題(topic
)和通道(channel
)信息。sql
nsqlookupd
有兩個接口:TCP
接口,nsqd
用它來廣播。HTTP
接口,客戶端用它來發現和管理。數據庫
本系列的代碼分析均是基於nsq v0.3.5的代碼進行的分析,若有不對之處歡迎你們指正指導。api
代碼文件路徑爲nsq/nsqlookupd/nsqlookupd.go
app
type NSQLookupd struct { sync.RWMutex //讀寫鎖 opts *Options //nsqlookupd 配置信息 定義文件路徑爲nsq/nsqlookupd/options.go tcpListener net.Listener httpListener net.Listener waitGroup util.WaitGroupWrapper //WaitGroup 典型應用 用於開啓兩個goroutine,一個監聽HTTP 一個監聽TCP DB *RegistrationDB //product 註冊數據庫 具體分析後面章節再講 } //初始化NSQLookupd實例 func New(opts *Options) *NSQLookupd { n := &NSQLookupd{ opts: opts, DB: NewRegistrationDB(), //初始化DB實例 } n.logf(version.String("nsqlookupd")) return n } func (l *NSQLookupd) Main() { ctx := &Context{l} //初始化Context實例將NSQLookupd指針放入Context實例中 Context結構請參考文件nsq/nsqlookupd/context.go Context用於nsqlookupd中的tcpServer 和 httpServer中 tcpListener, err := net.Listen("tcp", l.opts.TCPAddress) //開啓TCP監聽 if err != nil { l.logf("FATAL: listen (%s) failed - %s", l.opts.TCPAddress, err) os.Exit(1) } l.Lock() l.tcpListener = tcpListener l.Unlock() tcpServer := &tcpServer{ctx: ctx} //建立一個tcpServer tcpServer 實現了nsq/internal/protocol包中的TCPHandler接口 l.waitGroup.Wrap(func() { //protocol.TCPServer方法的過程就是tcpListener accept tcp的鏈接 //而後經過tcpServer中的Handle分析報文,而後處理相關的協議 protocol.TCPServer(tcpListener, tcpServer, l.opts.Logger) }) //把tcpServer加入到waitGroup httpListener, err := net.Listen("tcp", l.opts.HTTPAddress) //開啓HTTP監聽 if err != nil { l.logf("FATAL: listen (%s) failed - %s", l.opts.HTTPAddress, err) os.Exit(1) } l.Lock() l.httpListener = httpListener l.Unlock() httpServer := newHTTPServer(ctx) //建立一個httpServer l.waitGroup.Wrap(func() { http_api.Serve(httpListener, httpServer, "HTTP", l.opts.Logger) }) //把httpServer加入到waitGroup } //NSQLookupd退出 func (l *NSQLookupd) Exit() { if l.tcpListener != nil { l.tcpListener.Close() //關閉tcpListener } if l.httpListener != nil { l.httpListener.Close() //關閉httpListener } l.waitGroup.Wait() }
這一章節的代碼就先分析到這裏了,下一章節要分析的是nsqlookup中的tcpServer.tcp