NSQD是 nsq 的主要邏輯部分,請參考官方文檔。咱們直接看代碼。html
main 函數位於git
github.com/nsqio/nsq/apps/nsqd/nsqd.gogithub
func main() { prg := &program{} if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil { log.Fatal(err) } }
其中svc.Run 是一個service wrapper,來自於go-svc。它傳入Service 接口,而且作了四件事情Init, Start, NotifySignal和 Stop:api
// Run runs your Service. // // Run will block until one of the signals specified in sig is received. // If sig is empty syscall.SIGINT and syscall.SIGTERM are used by default. func Run(service Service, sig ...os.Signal) error { env := environment{} if err := service.Init(env); err != nil { return err } if err := service.Start(); err != nil { return err } if len(sig) == 0 { sig = []os.Signal{syscall.SIGINT, syscall.SIGTERM} } signalChan := make(chan os.Signal, 1) signalNotify(signalChan, sig...) <-signalChan return service.Stop() }
讓咱們來看下program 對於Service 這個接口的實現網絡
func (p *program) Init(env svc.Environment) error { if env.IsWindowsService() { dir := filepath.Dir(os.Args[0]) return os.Chdir(dir) } return nil } func (p *program) Start() error { //ztd: 初始化選項 opts := nsqd.NewOptions() //ztd: 設置接受的參數 flagSet := nsqdFlagSet(opts) flagSet.Parse(os.Args[1:]) rand.Seed(time.Now().UTC().UnixNano()) //ztd: 若是僅僅查詢版本信息 if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) { fmt.Println(version.String("nsqd")) os.Exit(0) } //ztd: 若是指定了conf 文件 var cfg config configFile := flagSet.Lookup("config").Value.String() if configFile != "" { _, err := toml.DecodeFile(configFile, &cfg) if err != nil { log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error()) } } cfg.Validate() //ztd: 合併命令行和配置文件中的配置 options.Resolve(opts, flagSet, cfg) nsqd := nsqd.New(opts) //ztd: metadata 中存儲了topic和 channel 信息 err := nsqd.LoadMetadata() if err != nil { log.Fatalf("ERROR: %s", err.Error()) } //ztd: 不是很明白爲何讀完了立刻又寫 err = nsqd.PersistMetadata() if err != nil { log.Fatalf("ERROR: failed to persist metadata - %s", err.Error()) } nsqd.Main() p.nsqd = nsqd return nil } func (p *program) Stop() error { if p.nsqd != nil { p.nsqd.Exit() } return nil }
func (n *NSQD) Main() { var httpListener net.Listener var httpsListener net.Listener ctx := &context{n} //ztd: 監聽某一端口 tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress) if err != nil { n.logf("FATAL: listen (%s) failed - %s", n.getOpts().TCPAddress, err) os.Exit(1) } n.Lock() n.tcpListener = tcpListener n.Unlock() tcpServer := &tcpServer{ctx: ctx} n.waitGroup.Wrap(func() { protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger) }) //ztd: 啓動https 服務 if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig) if err != nil { n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPSAddress, err) os.Exit(1) } n.Lock() n.httpsListener = httpsListener n.Unlock() httpsServer := newHTTPServer(ctx, true, true) n.waitGroup.Wrap(func() { http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.getOpts().Logger) }) } //ztd: 啓動http 服務 httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress) if err != nil { n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPAddress, err) os.Exit(1) } n.Lock() n.httpListener = httpListener n.Unlock() httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired) n.waitGroup.Wrap(func() { http_api.Serve(n.httpListener, httpServer, "HTTP", n.getOpts().Logger) }) n.waitGroup.Wrap(func() { n.queueScanLoop() }) n.waitGroup.Wrap(func() { n.lookupLoop() }) if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(func() { n.statsdLoop() }) } }
拿出幾個片斷討論一下app
n.Lock() n.tcpListener = tcpListener n.Unlock()
開始看到這塊有點懵,原來mutex 還能夠這麼用,先看眼NSQD 的結構tcp
type NSQD struct { //64bit atomic vars need to be first for proper alignment on 32bit platforms clientIDSequence int64 sync.RWMutex ... }
原來NSQD 內嵌了一個sync.RWMtex, 使用的時候直接n.Lock().按照我原來的習慣,都是這樣寫:函數
var lock sync.Mutex lock.Lock()
NSQD 對sync.WaitGroup 作了巧妙的封裝, 使用起來是這個樣子:oop
n.waitGroup.Wrap(func() { protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger) })
來看一眼Wrap的實現:ui
type WaitGroupWrapper struct { sync.WaitGroup } func (w *WaitGroupWrapper) Wrap(cb func()) { w.Add(1) go func() { cb() w.Done() }() }
這段代碼看起來比較簡單,先給waitgroup +1,起了一個goroutine 運行 callback func,等函數運行結束之後對waitgroup -1. 不過waitgroup 是在哪裏wait 的呢?對全局搜了一下代碼,發現是在Exit方法裏面:
func (n *NSQD) Exit() { if n.tcpListener != nil { n.tcpListener.Close() } if n.httpListener != nil { n.httpListener.Close() } if n.httpsListener != nil { n.httpsListener.Close() } n.Lock() err := n.PersistMetadata() if err != nil { n.logf("ERROR: failed to persist metadata - %s", err) } n.logf("NSQ: closing topics") for _, topic := range n.topicMap { topic.Close() } n.Unlock() close(n.exitChan) n.waitGroup.Wait() n.dl.Unlock() }
Exit 方法關閉了各類資源以後調用了n.waitGroup.Wait(),等待全部資源釋放完畢.
回到剛纔這一句 protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)
func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) { l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr())) for { clientConn, err := listener.Accept() if err != nil { if nerr, ok := err.(net.Error); ok && nerr.Temporary() { l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err)) //ztd: 若是發生臨時的error讓出時間片。猜測這麼作的緣由多是當前網絡情況很差引發的一 些臨時error,若是立刻去accept,會獲得另一個error,不如讓出時間片,讓其餘goroutine 作事情。有一點延遲的意思。若是有更好的解釋,請糾正我。。。 runtime.Gosched() continue } // theres no direct way to detect this error because it is not exposed if !strings.Contains(err.Error(), "use of closed network connection") { l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err)) } //ztd: 在上面提到的Exit方法裏會關閉listener, break 跳出循環 break } go handler.Handle(clientConn) } l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr())) }
在後面的文章中,講繼續閱讀tcp handler,http handler,n.queueScanLoop() 和 n.lookupLoop()