nsqlookupd:高性能消息中間件 NSQ 解析

摘要:本篇將會結合源碼介紹 nsqlookupd 的實現細節。

本篇將會結合源碼介紹 nsqlookupd 的實現細節。nsqlookupd 主要流程與nsqd 執行邏輯類似,區別在於具體運行的任務不一樣。sql

nsqlookupd是nsq管理集羣拓撲信息以及用於註冊和發現nsqd服務。因此,也能夠把nsqlookupd理解爲註冊發現服務。當nsq集羣中有多個nsqlookupd服務時,由於每一個nsqd都會向全部的nsqlookupd上報本地信息,所以nsqlookupd具備最終一致性。api

入口函數

在 nsq/apps/nsqlookupd/main.go 能夠找到執行入口文件。app

// 位於apps/nsqlookupd/main.go:45
func main() {
  prg := &program{}
  if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
    logFatal("%s", err)
  }
}

func (p *program) Init(env svc.Environment) error {
  if env.IsWindowsService() {
    dir := filepath.Dir(os.Args[0])
    return os.Chdir(dir)
  }
  return nil
}

func (p *program) Start() error {
  opts := nsqlookupd.NewOptions()

  flagSet := nsqlookupdFlagSet(opts)
  ...
}

一樣,經過第三方 svc 包進行優雅的後臺進程管理,svc.Run() -> svc.Init() -> svc.Start(),啓動 nsqlookupd 實例。tcp

// 位於 apps/nsqlookupd/main.go:80
options.Resolve(opts, flagSet, cfg)
  nsqlookupd, err := nsqlookupd.New(opts)
  if err != nil {
    logFatal("failed to instantiate nsqlookupd", err)
  }
  p.nsqlookupd = nsqlookupd

  go func() {
    err := p.nsqlookupd.Main()
    if err != nil {
      p.Stop()
      os.Exit(1)
    }
  }()

初始化配置參數(優先級:flagSet-命令行參數 > cfg-配置文件 > opts-默認值),開啓協程,進入 nsqlookupd.Main() 主函數。函數

監聽請求

咱們來看下 nsqlookupd 是如何監聽請求的,代碼實現以下:oop

// 位於 nsqlookupd/nsqlookupd.go:53
func (l *NSQLookupd) Main() error {
  ctx := &Context{l}

  exitCh := make(chan error)
  var once sync.Once
  exitFunc := func(err error) {
    once.Do(func() {
      if err != nil {
        l.logf(LOG_FATAL, "%s", err)
      }
      exitCh <- err
    })
  }

  tcpServer := &tcpServer{ctx: ctx}
  l.waitGroup.Wrap(func() {
    exitFunc(protocol.TCPServer(l.tcpListener, tcpServer, l.logf))
  })
  httpServer := newHTTPServer(ctx)
  l.waitGroup.Wrap(func() {
    exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.logf))
  })

  err := <-exitCh
  return err
}

開啓 goroutine 執行 tcpServer, httpServer,分別監聽 nsqd, nsqadmin 的客戶端請求。性能

處理請求

// 位於 internal/protocol/tcp_server.go:17
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
  logf(lg.INFO, "TCP: listening on %s", listener.Addr())

  for {
    clientConn, err := listener.Accept()
    if err != nil {
      if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
        logf(lg.WARN, "temporary Accept() failure - %s", err)
        runtime.Gosched()
        continue
      }
      // theres no direct way to detect this error because it is not exposed
      if !strings.Contains(err.Error(), "use of closed network connection") {
        return fmt.Errorf("listener.Accept() error - %s", err)
      }
      break
    }
    go handler.Handle(clientConn)
  }

  logf(lg.INFO, "TCP: closing %s", listener.Addr())

  return nil
}

TCPServer 循環監聽客戶端請求,創建長鏈接進行通訊,並開啓 handler 處理每個客戶端 conn。this

裝飾 http 路由

httpServer 經過 http_api.Decorate 裝飾器實現對各 http 路由進行 handler 裝飾,如加 log 日誌、V1 協議版本號的統一格式輸出等;spa

func newHTTPServer(ctx *Context) *httpServer {
  log := http_api.Log(ctx.nsqlookupd.logf)

  router := httprouter.New()
  router.HandleMethodNotAllowed = true
  router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.logf)
  router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.logf)
  router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.logf)
  s := &httpServer{
    ctx:    ctx,
    router: router,
  }

  router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
  router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))

  // v1 negotiate
  router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api.V1))
  router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.V1))
  router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.V1))
  router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, http_api.V1))
}

處理客戶端命令

tcp 解析 V1 協議,內部協議封裝的 prot.IOLoop(conn) 進行循環處理客戶端命令,直到客戶端命令所有解析處理完畢才關閉鏈接。命令行

var prot protocol.Protocol
  switch protocolMagic {
  case "  V1":
    prot = &LookupProtocolV1{ctx: p.ctx}
  default:
    protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL"))
    clientConn.Close()
    p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
      clientConn.RemoteAddr(), protocolMagic)
    return
  }

  err = prot.IOLoop(clientConn)

執行命令

經過內部協議進行 p.Exec(執行命令)、p.SendResponse(返回結果),保證每一個 nsqd 節點都能正確的進行服務註冊(register)與註銷(unregister),並進行心跳檢測(ping)節點的可用性,確保客戶端取到的 nsqd 節點列表都是最新可用的。

for {
    line, err = reader.ReadString('\n')
    if err != nil {
      break
    }

    line = strings.TrimSpace(line)
    params := strings.Split(line, " ")

    var response []byte
    response, err = p.Exec(client, reader, params)
    if err != nil {
      ctx := ""
      if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
        ctx = " - " + parentErr.Error()
      }
      _, sendErr := protocol.SendResponse(client, []byte(err.Error()))
      if sendErr != nil {
        p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
        break
      }
      continue
    }

    if response != nil {
      _, err = protocol.SendResponse(client, response)
      if err != nil {
        break
      }
    }
  }

  conn.Close()

nsqlookupd 服務同時開啓 tcp 和 http 兩個監聽服務,nsqd 會做爲客戶端,連上 nsqlookupd 的 tcp 服務,並上報本身的 topic 和 channel 信息,以及經過心跳機制判斷 nsqd 狀態;還有個 http 服務提供給 nsqadmin 獲取集羣信息。

小結

本文主要介紹 nsqlookupd 的實現,nsqlookupd 一樣是一個守護進程,負責管理拓撲信息。客戶端經過查詢 nsqlookupd 來發現指定話題( topic )的生產者,而且 nsqd 節點廣播話題(topic)和通道( channel )信息。有兩個接口: TCP 接口, nsqd 用它來廣播。 HTTP 接口,客戶端用它來發現和管理。

下一篇文章,將會繼續介紹 nsq 中其餘模塊實現的細節。

本文分享自華爲雲社區《高性能消息中間件 NSQ 解析-nsqlookupd 實現細節介紹》,原文做者:aoho 。

 

點擊關注,第一時間瞭解華爲雲新鮮技術~

相關文章
相關標籤/搜索