上一篇初識了 nsq
三個模塊(nsqd, nsqlookupd, nsqadmin
)的 demo
演示,本篇則從源碼開始,一步一步去解析 nsqd
的執行流程和邏輯處理,學習別人優秀的項目架構,以期學以至用。sql
nsqd
執行入口在 nsq/apps/nsqd/main.go
能夠找到執行入口文件,以下:api
nsqd
執行主邏輯源碼2.1 經過第三方 svc
包進行優雅的後臺進程管理,svc.Run() -> svc.Init() -> svc.Start()
,啓動 nsqd
實例;架構
func main() { prg := &program{} if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil { logFatal("%s", err) } } func (p *program) Init(env svc.Environment) error { if env.IsWindowsService() { dir := filepath.Dir(os.Args[0]) return os.Chdir(dir) } return nil } func (p *program) Start() error { opts := nsqd.NewOptions() flagSet := nsqdFlagSet(opts) flagSet.Parse(os.Args[1:]) ... }
2.2 初始化配置項(opts, cfg
),加載歷史數據(nsqd.LoadMetadata
)、持久化最新數據(nsqd.PersistMetadata
),而後開啓協程,進入 nsqd.Main()
主函數;併發
options.Resolve(opts, flagSet, cfg) nsqd, err := nsqd.New(opts) if err != nil { logFatal("failed to instantiate nsqd - %s", err) } p.nsqd = nsqd err = p.nsqd.LoadMetadata() if err != nil { logFatal("failed to load metadata - %s", err) } err = p.nsqd.PersistMetadata() if err != nil { logFatal("failed to persist metadata - %s", err) } go func() { err := p.nsqd.Main() if err != nil { p.Stop() os.Exit(1) } }()
2.3 初始化 tcpServer, httpServer, httpsServer
,而後循環監控隊列信息(n.queueScanLoop
)、節點信息管理(n.lookupLoop
)、統計信息(n.statsdLoop
)輸出;app
tcpServer := &tcpServer{ctx: ctx} n.waitGroup.Wrap(func() { exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf)) }) httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)) }) if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { httpsServer := newHTTPServer(ctx, true, true) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)) }) } n.waitGroup.Wrap(n.queueScanLoop) n.waitGroup.Wrap(n.lookupLoop) if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(n.statsdLoop) }
2.4 分別處理 tcp/http
請求,開啓 handler
協程進行併發處理,其中 newHTTPServer
註冊路由採用了 Decorate
裝飾器模式(後面會進一步解析);tcp
http-Decorate
路由分發:分佈式
router := httprouter.New() router.HandleMethodNotAllowed = true router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.logf) router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.logf) router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.logf) s := &httpServer{ ctx: ctx, tlsEnabled: tlsEnabled, tlsRequired: tlsRequired, router: router, } router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText)) router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1)) // v1 negotiate router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1)) router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.V1)) router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.V1)) // only v1 router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1)) router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
tcp-handler
處理:函數
for { clientConn, err := listener.Accept() if err != nil { if nerr, ok := err.(net.Error); ok && nerr.Temporary() { logf(lg.WARN, "temporary Accept() failure - %s", err) runtime.Gosched() continue } // theres no direct way to detect this error because it is not exposed if !strings.Contains(err.Error(), "use of closed network connection") { return fmt.Errorf("listener.Accept() error - %s", err) } break } go handler.Handle(clientConn) }
2.5 tcp
解析 V2
協議,走內部協議封裝的 prot.IOLoop(conn)
進行處理;oop
var prot protocol.Protocol switch protocolMagic { case " V2": prot = &protocolV2{ctx: p.ctx} default: protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL")) clientConn.Close() p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) return } err = prot.IOLoop(clientConn) if err != nil { p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err) return }
2.6 經過內部協議進行 p.Exec
(執行命令)、p.Send
(發送結果),保證每一個 nsqd
節點都能正確的進行消息生成與消費,一旦上述過程有 error
都會被捕獲處理,確保分佈式投遞的可靠性。學習
params := bytes.Split(line, separatorBytes) p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params) var response []byte response, err = p.Exec(client, params) if err != nil { ctx := "" if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil { ctx = " - " + parentErr.Error() } p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx) sendErr := p.Send(client, frameTypeError, []byte(err.Error())) if sendErr != nil { p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx) break } // errors of type FatalClientErr should forceably close the connection if _, ok := err.(*protocol.FatalClientErr); ok { break } continue } if response != nil { err = p.Send(client, frameTypeResponse, response) if err != nil { err = fmt.Errorf("failed to send response - %s", err) break } }
nsqd
流程圖小結上述流程小結示意圖以下:
【小結】從源碼能夠看到,代碼邏輯清晰明瞭,利用 Go
協程高效併發處理分佈式多節點 nsqd
的消息生產與消費,裏面有不少細節有待下一步仔細剖析,學以至用。