NSQLookupd Main方法分析

上一篇  http://my.oschina.net/astute/blog/296955  已經分析了 nsqlookupd 啓動時的命令行解析,最終構造了 NSQLookupd 結構體,這是一個很重要的結構體。

type NSQLookupd struct {
     options      *nsqlookupdOptions     // 命令行參數
     tcpAddr      *net.TCPAddr     // tcp 端口
     httpAddr     *net.TCPAddr     // http 端口
     tcpListener  net.Listener
     httpListener net.Listener
     waitGroup    util.WaitGroupWrapper
     DB           *RegistrationDB
}


這篇文章分析下 daemon.Main() 執行過程


一、建立 context,內部只有一個指向NSQLookupd的指針

context := &Context{l}


二、建立 listener;Listen 方法支持面向流的監聽,包括 unix domain stream socket。

tcpListener, err := net.Listen("tcp", l.tcpAddr.String())


三、建立 tcpServer struct,內部只含有一個 context 指針,爲何須要對 NSQLookupd

tcpServer 實現了 Handle(net.Conn) 方法,代表實現了 TCPHandler 接口

tcpServer := &tcpServer{context: context}


四、分析 l.waitGroup.Wrap(func() { util.TCPServer(tcpListener, tcpServer) })

waitGroup 是結構體 util.WaitGroupWrapper 實例,此結構體繼承自 sync.WaitGroup;WaitGroup 等待一組協程結束。主協程調用 Add 方法來設置等待協程的數目;func() { util.TCPServer(tcpListener, tcpServer) } 這是一個匿名函數。

總結:主協程調用 Wrap 方法,調用 Add,設置等待的協程數目爲1,啓動新的協程去調用 匿名函數,主協程最後會在 exitChan 通道上等待消息,若是收到中斷信號,關閉 tcpListener, httpListner,最後在 waitGroup 上等待協程結束。須要 waitGroup 的目的,就是爲了 主協程 和 監聽協程之間的狀態同步。


五、監聽協程執行的邏輯以下,簡化版本,忽略日誌和錯誤處理。監聽協程循環 Accept,而後分配新的協程調用 handler 處理接收的 clientConn。處理客戶端鏈接的協程,我把它叫作 工做協程。

func TCPServer(listener net.Listener, handler TCPHandler) {

     for {
          clientConn, err := listener.Accept()
          go handler.Handle(clientConn)
     }

}


六、工做協程的處理邏輯,最終調用的是 tcpServer 的 Handle 方法,這是最重要的一個方法,重點分析

func (p *tcpServer) Handle(clientConn net.Conn)

buf := make([]byte, 4)
_, err := io.ReadFull(clientConn, buf)

讀取客戶端發送過來的前4個字節,裏面含有協議的版本號。看到了沒有,徹底阻塞式編程,沒有屎同樣的事件驅動!

判斷版本號後,最終調用 LookupProtocolV1 的 IOLoop 方法


七、方法分析 func (p *LookupProtocolV1) IOLoop(conn net.Conn) error

client := NewClientV1(conn)     // ClientV1 struct 繼承 net.Conn,封裝一個 PeerInfo

reader := bufio.NewReader(client) // 建立一個帶緩衝的 reader

for {     // 此部分代碼有省略
    line, err = reader.ReadString('\n’)     // 按行來處理

    line = strings.TrimSpace(line)
    params := strings.Split(line, " ")

    response, err := p.Exec(client, reader, params)  // 執行命令

    if response != nil {
        _, err = util.SendResponse(client, response)     // 發送 response
    }
}

IO 出錯,或者執行命令出錯,會退出循環。


八、命令分派

func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error)

switch params[0] {
case "PING":
    return p.PING(client, params)
// .........
}

根據第一個參數作命令區分,調用響應的指令


九、命令執行,以 ping 命令爲例,看看命令如何執行

func (p *LookupProtocolV1) PING(client *ClientV1, params []string) ([]byte, error)

atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano()) // 更新客戶端的狀態

return []byte("OK"), nil      // 返回 「OK」,經由第 7 步的 util.SendResponse 發送給 client

至此 tcpServer 的處理流程就介紹完了。


十、和 2,3 步對應的,還會建立 httpListener, httpServer, 主要負責 topic,channel 相關的操做

httpListener, err := net.Listen("tcp", l.httpAddr.String())
httpServer := &httpServer{context: context}

處理 httpServer 的協程, 會調用 func() { util.HTTPServer(httpListener, httpServer, "HTTP") }


十一、分析 util.HTTPServer 函數

關鍵是以下兩句
server := &http.Server{
    Handler: handler,
}
err := server.Serve(listener)

默認的 http handler 就是 httpServer 自己


十二、分析 httpServer 的 ServeHTTP 方法

err := s.v1Router(w, req)     // 路由請求


1三、分析 v1Router

switch req.URL.Path {
case "/ping」:
//......
}

按照請求路徑分派


1四、以 "/topics」 爲例,看如何處理請求

執行函數 util.NegotiateAPIResponseWrapper(w, req,
               func() (interface{}, error) { return s.doTopics(req) })


內部調用傳入的匿名函數,也就是執行 s.doTopics(req),會把全部的 topic 對應的 producer 信息組成 map[string]interface{} 返回給客戶端 
相關文章
相關標籤/搜索