NSQ 源碼閱讀(二) NSQD 入口函數

前言

NSQD是 nsq 的主要邏輯部分,請參考官方文檔。咱們直接看代碼。html

入口函數

main 函數位於git

github.com/nsqio/nsq/apps/nsqd/nsqd.gogithub

func main() {
    prg := &program{}
    if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
        log.Fatal(err)
    }
}

其中svc.Run 是一個service wrapper,來自於go-svc。它傳入Service 接口,而且作了四件事情Init, Start, NotifySignal和 Stop:api

// Run runs your Service.
//
// Run will block until one of the signals specified in sig is received.
// If sig is empty syscall.SIGINT and syscall.SIGTERM are used by default.
func Run(service Service, sig ...os.Signal) error {
    env := environment{}
    if err := service.Init(env); err != nil {
        return err
    }

    if err := service.Start(); err != nil {
        return err
    }

    if len(sig) == 0 {
        sig = []os.Signal{syscall.SIGINT, syscall.SIGTERM}
    }

    signalChan := make(chan os.Signal, 1)
    signalNotify(signalChan, sig...)
    <-signalChan

    return service.Stop()
}

讓咱們來看下program 對於Service 這個接口的實現網絡

func (p *program) Init(env svc.Environment) error {
    if env.IsWindowsService() {
        dir := filepath.Dir(os.Args[0])
        return os.Chdir(dir)
    }
    return nil
}
func (p *program) Start() error {
    //ztd: 初始化選項
    opts := nsqd.NewOptions()
    //ztd: 設置接受的參數
    flagSet := nsqdFlagSet(opts)
    flagSet.Parse(os.Args[1:])

    rand.Seed(time.Now().UTC().UnixNano())
    //ztd: 若是僅僅查詢版本信息
    if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
        fmt.Println(version.String("nsqd"))
        os.Exit(0)
    }
    //ztd: 若是指定了conf 文件
    var cfg config
    configFile := flagSet.Lookup("config").Value.String()
    if configFile != "" {
        _, err := toml.DecodeFile(configFile, &cfg)
        if err != nil {
            log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
        }
    }
    cfg.Validate()
    //ztd: 合併命令行和配置文件中的配置
    options.Resolve(opts, flagSet, cfg)
    nsqd := nsqd.New(opts)
    //ztd: metadata 中存儲了topic和 channel 信息
    err := nsqd.LoadMetadata()
    if err != nil {
        log.Fatalf("ERROR: %s", err.Error())
    }
    //ztd: 不是很明白爲何讀完了立刻又寫
    err = nsqd.PersistMetadata()
    if err != nil {
        log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
    }
    nsqd.Main()

    p.nsqd = nsqd
    return nil
}

func (p *program) Stop() error {
    if p.nsqd != nil {
        p.nsqd.Exit()
    }
    return nil
}

nsqd.Main()

func (n *NSQD) Main() {
    var httpListener net.Listener
    var httpsListener net.Listener

    ctx := &context{n}
    //ztd: 監聽某一端口
    tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress)
    if err != nil {
        n.logf("FATAL: listen (%s) failed - %s", n.getOpts().TCPAddress, err)
        os.Exit(1)
    }

    n.Lock()
    n.tcpListener = tcpListener
    n.Unlock()
    tcpServer := &tcpServer{ctx: ctx}
    n.waitGroup.Wrap(func() {
        protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)
    })
    //ztd: 啓動https 服務
    if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
        httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
        if err != nil {
            n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)
            os.Exit(1)
        }
        n.Lock()
        n.httpsListener = httpsListener
        n.Unlock()
        httpsServer := newHTTPServer(ctx, true, true)
        n.waitGroup.Wrap(func() {
            http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.getOpts().Logger)
        })
    }
    //ztd: 啓動http 服務
    httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
    if err != nil {
        n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
        os.Exit(1)
    }
    n.Lock()
    n.httpListener = httpListener
    n.Unlock()
    httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
    n.waitGroup.Wrap(func() {
        http_api.Serve(n.httpListener, httpServer, "HTTP", n.getOpts().Logger)
    })

    n.waitGroup.Wrap(func() { n.queueScanLoop() })
    n.waitGroup.Wrap(func() { n.lookupLoop() })
    if n.getOpts().StatsdAddress != "" {
        n.waitGroup.Wrap(func() { n.statsdLoop() })
    }
}

拿出幾個片斷討論一下app

n.Lock()
    n.tcpListener = tcpListener
    n.Unlock()

開始看到這塊有點懵,原來mutex 還能夠這麼用,先看眼NSQD 的結構tcp

type NSQD struct {
      //64bit atomic vars need to be first for proper alignment on 32bit platforms
     clientIDSequence int64
     sync.RWMutex
     ...
     }

原來NSQD 內嵌了一個sync.RWMtex, 使用的時候直接n.Lock().按照我原來的習慣,都是這樣寫:函數

var lock sync.Mutex
    lock.Lock()

NSQD 對sync.WaitGroup 作了巧妙的封裝, 使用起來是這個樣子:oop

n.waitGroup.Wrap(func() {
        protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)
    })

來看一眼Wrap的實現:ui

type WaitGroupWrapper struct {
    sync.WaitGroup
}

func (w *WaitGroupWrapper) Wrap(cb func()) {
    w.Add(1)
    go func() {
        cb()
        w.Done()
    }()
}

這段代碼看起來比較簡單,先給waitgroup +1,起了一個goroutine 運行 callback func,等函數運行結束之後對waitgroup -1. 不過waitgroup 是在哪裏wait 的呢?對全局搜了一下代碼,發現是在Exit方法裏面:

func (n *NSQD) Exit() {
    if n.tcpListener != nil {
        n.tcpListener.Close()
    }

    if n.httpListener != nil {
        n.httpListener.Close()
    }

    if n.httpsListener != nil {
        n.httpsListener.Close()
    }

    n.Lock()
    err := n.PersistMetadata()
    if err != nil {
        n.logf("ERROR: failed to persist metadata - %s", err)
    }
    n.logf("NSQ: closing topics")
    for _, topic := range n.topicMap {
        topic.Close()
    }
    n.Unlock()

    close(n.exitChan)
    n.waitGroup.Wait()

    n.dl.Unlock()
}

Exit 方法關閉了各類資源以後調用了n.waitGroup.Wait(),等待全部資源釋放完畢.

回到剛纔這一句 protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)

func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
    l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr()))

    for {
        clientConn, err := listener.Accept()
        if err != nil {
            if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
                l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err))
                //ztd: 若是發生臨時的error讓出時間片。猜測這麼作的緣由多是當前網絡情況很差引發的一 些臨時error,若是立刻去accept,會獲得另一個error,不如讓出時間片,讓其餘goroutine 作事情。有一點延遲的意思。若是有更好的解釋,請糾正我。。。
                runtime.Gosched()
                continue
            }
            // theres no direct way to detect this error because it is not exposed
            if !strings.Contains(err.Error(), "use of closed network connection") {
                l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err))
            }
            //ztd: 在上面提到的Exit方法裏會關閉listener, break 跳出循環
            break
        }
        go handler.Handle(clientConn)
    }

    l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr()))
}

在後面的文章中,講繼續閱讀tcp handler,http handler,n.queueScanLoop() 和 n.lookupLoop()

相關文章
相關標籤/搜索