golang-nsq系列(二)--nsqd源碼解析

上一篇初識了 nsq 三個模塊(nsqd, nsqlookupd, nsqadmin)的 demo演示,本篇則從源碼開始,一步一步去解析 nsqd 的執行流程和邏輯處理,學習別人優秀的項目架構,以期學以至用。sql

1. nsqd 執行入口

nsq/apps/nsqd/main.go 能夠找到執行入口文件,以下:api

nsqd-path

2. nsqd 執行主邏輯源碼

2.1 經過第三方 svc 包進行優雅的後臺進程管理,svc.Run() -> svc.Init() -> svc.Start(),啓動 nsqd 實例;架構

func main() {
  prg := &program{}
  if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
    logFatal("%s", err)
  }
}

func (p *program) Init(env svc.Environment) error {
  if env.IsWindowsService() {
    dir := filepath.Dir(os.Args[0])
    return os.Chdir(dir)
  }
  return nil
}

func (p *program) Start() error {
  opts := nsqd.NewOptions()

  flagSet := nsqdFlagSet(opts)
  flagSet.Parse(os.Args[1:])
  ...
}



2.2 初始化配置項(opts, cfg),加載歷史數據(nsqd.LoadMetadata)、持久化最新數據(nsqd.PersistMetadata),而後開啓協程,進入 nsqd.Main() 主函數;併發

options.Resolve(opts, flagSet, cfg)
  nsqd, err := nsqd.New(opts)
  if err != nil {
    logFatal("failed to instantiate nsqd - %s", err)
  }
  p.nsqd = nsqd

  err = p.nsqd.LoadMetadata()
  if err != nil {
    logFatal("failed to load metadata - %s", err)
  }
  err = p.nsqd.PersistMetadata()
  if err != nil {
    logFatal("failed to persist metadata - %s", err)
  }

  go func() {
    err := p.nsqd.Main()
    if err != nil {
      p.Stop()
      os.Exit(1)
    }
  }()



2.3 初始化 tcpServer, httpServer, httpsServer,而後循環監控隊列信息(n.queueScanLoop)、節點信息管理(n.lookupLoop)、統計信息(n.statsdLoop)輸出;app

tcpServer := &tcpServer{ctx: ctx}
  n.waitGroup.Wrap(func() {
    exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf))
  })
  httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
  n.waitGroup.Wrap(func() {
    exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
  })
  if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
    httpsServer := newHTTPServer(ctx, true, true)
    n.waitGroup.Wrap(func() {
      exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
    })
  }

  n.waitGroup.Wrap(n.queueScanLoop)
  n.waitGroup.Wrap(n.lookupLoop)
  if n.getOpts().StatsdAddress != "" {
    n.waitGroup.Wrap(n.statsdLoop)
  }



2.4 分別處理 tcp/http 請求,開啓 handler 協程進行併發處理,其中 newHTTPServer 註冊路由採用了 Decorate 裝飾器模式(後面會進一步解析);tcp

http-Decorate路由分發:分佈式

router := httprouter.New()
  router.HandleMethodNotAllowed = true
  router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.logf)
  router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.logf)
  router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.logf)
  s := &httpServer{
    ctx:         ctx,
    tlsEnabled:  tlsEnabled,
    tlsRequired: tlsRequired,
    router:      router,
  }

  router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
  router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))

  // v1 negotiate
  router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
  router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.V1))
  router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.V1))

  // only v1
  router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
  router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))

tcp-handler 處理:函數

for {
    clientConn, err := listener.Accept()
    if err != nil {
      if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
        logf(lg.WARN, "temporary Accept() failure - %s", err)
        runtime.Gosched()
        continue
      }
      // theres no direct way to detect this error because it is not exposed
      if !strings.Contains(err.Error(), "use of closed network connection") {
        return fmt.Errorf("listener.Accept() error - %s", err)
      }
      break
    }
    go handler.Handle(clientConn)
  }



2.5 tcp 解析 V2 協議,走內部協議封裝的 prot.IOLoop(conn) 進行處理;oop

var prot protocol.Protocol
  switch protocolMagic {
  case "  V2":
    prot = &protocolV2{ctx: p.ctx}
  default:
    protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
    clientConn.Close()
    p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
      clientConn.RemoteAddr(), protocolMagic)
    return
  }

  err = prot.IOLoop(clientConn)
  if err != nil {
    p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
    return
  }



2.6 經過內部協議進行 p.Exec(執行命令)、p.Send(發送結果),保證每一個 nsqd 節點都能正確的進行消息生成與消費,一旦上述過程有 error 都會被捕獲處理,確保分佈式投遞的可靠性。學習

params := bytes.Split(line, separatorBytes)

    p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)

    var response []byte
    response, err = p.Exec(client, params)
    if err != nil {
      ctx := ""
      if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
        ctx = " - " + parentErr.Error()
      }
      p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)

      sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
      if sendErr != nil {
        p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
        break
      }

      // errors of type FatalClientErr should forceably close the connection
      if _, ok := err.(*protocol.FatalClientErr); ok {
        break
      }
      continue
    }

    if response != nil {
      err = p.Send(client, frameTypeResponse, response)
      if err != nil {
        err = fmt.Errorf("failed to send response - %s", err)
        break
      }
    }

3. nsqd 流程圖小結

上述流程小結示意圖以下:

nsqd-logic

【小結】從源碼能夠看到,代碼邏輯清晰明瞭,利用 Go 協程高效併發處理分佈式多節點 nsqd 的消息生產與消費,裏面有不少細節有待下一步仔細剖析,學以至用。

相關文章
相關標籤/搜索