1: 啓動類: main.goios
利用:flag.NewFlagSet 解析傳遞的參數tcp
'註冊系統的信號量oop
exitChan := make(chan int)spa
signalChan := make(chan os.Signal, 1)code
go func() {server
<-signalChanit
exitChan <- 1io
}()cli
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)List
調用optios.go 作默認的參數 NewNSQDOptions
啓動server的服務: nsqd := NewNSQD(opts)
加載硬盤上的數據:nsqd.LoadMetadata()
err := nsqd.PersistMetadata()
nsqd.Main()
執行lookup 服務的初始化工做
n.waitGroup.Wrap(func() { n.lookupLoop() })
n.waitGroup.Wrap(func() { util.TCPServer(n.tcpListener, tcpServer) }) 開啓tcp的監聽服務
n.waitGroup.Wrap(func() { util.HTTPServer(n.httpListener, httpServer) }) 開始http的監聽服務
main.go 的流程走完
接下來看看有客戶端鏈接之後的服務:
tcp.go
(p *tcpServer) Handle(clientConn net.Conn)
一系列的校驗。。。
調用:ProtocolV2. IOLoop
client := NewClientV2(clientID, conn, p.context) 初始化客戶端
go p.messagePump(client) 客戶select服務
response, err := p.Exec(client, params) 執行具體的客戶端操做 func (p *ProtocolV2) Exec(client *ClientV2, params [][]byte) ([]byte, error) { switch { case bytes.Equal(params[0], []byte("FIN")): return p.FIN(client, params) case bytes.Equal(params[0], []byte("RDY")): return p.RDY(client, params) case bytes.Equal(params[0], []byte("REQ")): return p.REQ(client, params) case bytes.Equal(params[0], []byte("PUB")): return p.PUB(client, params) case bytes.Equal(params[0], []byte("MPUB")): return p.MPUB(client, params) case bytes.Equal(params[0], []byte("NOP")): return p.NOP(client, params) case bytes.Equal(params[0], []byte("TOUCH")): return p.TOUCH(client, params) case bytes.Equal(params[0], []byte("IDENTIFY")): return p.IDENTIFY(client, params) case bytes.Equal(params[0], []byte("SUB")): return p.SUB(client, params) case bytes.Equal(params[0], []byte("CLS")): return p.CLS(client, params) } return nil, util.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) }