在上一篇帖子剖析nsq消息隊列(一) 簡介及去中心化實現原理中,我介紹了nsq的兩種使用方式,一種是直接鏈接,還有一種是經過nslookup來實現去中心化的方式使用,並大概說了一下實現原理,沒有什麼難理解的東西,這篇帖子我把nsq
實現去中心化的源碼和其中的業物邏輯展現給你們看一下。html
上一篇中在啓動nsqd
時我用瞭如下命令,我指定了一個參數 --lookupd-tcp-address
sql
./nsqd -tcp-address ":8000" -http-address ":8001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./a
--lookupd-tcp-address
用於指定nsqlookupd
的tcp
監聽地址。json
nsqd
和 nsqlookupd
的通訊交流簡單來講就是下圖這樣
api
nsqd
啓動後鏈接nsqlookupd
,鏈接成功後,要發送一個魔法標識nsq.MagicV1
,這個標識有啥魔法麼,固然不是,他只是用於標明,客戶端和服務端雙方使用的信息通訊版本,不能的版本有不一樣的處理方式,爲了後期作新的消息處理版本方便吧。
nsqlookupd
的代碼塊app
func (p *tcpServer) Handle(clientConn net.Conn) { // ... buf := make([]byte, 4) _, err := io.ReadFull(clientConn, buf) // ... protocolMagic := string(buf) // ... var prot protocol.Protocol switch protocolMagic { case " V1": prot = &LookupProtocolV1{ctx: p.ctx} default: // ... return } err = prot.IOLoop(clientConn) //... }
這個時候的nsqd
已經和nsqlookupd
創建好了鏈接,可是這時,僅僅說明他倆鏈接成功。
nsqlookupd
也並無把這個鏈接加到可用的nsqd
列表裏。
創建鏈接完成後,nsqd
會發送IDENTIFY
命令,這個命令裏包含了nsq的基本信息
nsqd
的代碼tcp
ci := make(map[string]interface{}) ci["version"] = version.Binary ci["tcp_port"] = n.RealTCPAddr().Port ci["http_port"] = n.RealHTTPAddr().Port ci["hostname"] = hostname ci["broadcast_address"] = n.getOpts().BroadcastAddress cmd, err := nsq.Identify(ci) if err != nil { lp.Close() return } resp, err := lp.Command(cmd)
包含了nsqd
提供的tcp
和http
端口,主機名,版本等等,發送給nsqlookupd
,nsqlookupd
收到IDENTIFY
命令後,解析信息而後加到nsqd
的可用列表裏
nsqlookupd
的代碼塊oop
func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { var err error if client.peerInfo != nil { return nil, protocol.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again") } var bodyLen int32 err = binary.Read(reader, binary.BigEndian, &bodyLen) // ... body := make([]byte, bodyLen) _, err = io.ReadFull(reader, body) // ... peerInfo := PeerInfo{id: client.RemoteAddr().String()} err = json.Unmarshal(body, &peerInfo) // ... client.peerInfo = &peerInfo // 把nsqd的鏈接加入到可用列表裏 if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) { p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "") } // ... return response, nil }
而後每過15秒,會發送一個PING
心跳命令給nsqlookupd
,這樣保持存活狀態,nsqlookupd
每次收到發過來的PING
命令後,也會記下這個nsqd
的最後更新時間,這樣作爲一個篩選條件,若是長時間沒有更新,就認爲這個節點有問題,不會把這個節點的信息加入到可用列表。
到此爲止,一個nsqd
就把本身的信息註冊到nsqlookupd
的可用列表了,咱們能夠啓動多個nsqd
和多個nsqlookupd
,爲nsqd
指定多個nsqlookupd
,就如同我上一篇帖子寫的那樣this
--lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200
nsqd
和全部的nsqlookupd
創建鏈接,註冊服務信息,並保持心跳,保證可用列表的更新.url
上面咱們說了nsqd
若是出現問題,nsqlookupd
的nsqd
可用列表裏就會處理掉這個鏈接信息。如nsqlookupd
掛了怎麼辦呢
目前的處理方式是這樣的,
不管是心跳,仍是其餘命令,nsqd
會給全部的nsqlookup
發送信息,當nsqd
發現nsqlookupd
出現問題時,在每次發送命令時,會不斷的進行從新鏈接:3d
func (lp *lookupPeer) Command(cmd *nsq.Command) ([]byte, error) { initialState := lp.state if lp.state != stateConnected { err := lp.Connect() if err != nil { return nil, err } lp.state = stateConnected _, err = lp.Write(nsq.MagicV1) if err != nil { lp.Close() return nil, err } if initialState == stateDisconnected { lp.connectCallback(lp) } if lp.state != stateConnected { return nil, fmt.Errorf("lookupPeer connectCallback() failed") } } // ... }
若是鏈接成功,會再次調用connectCallback
方法,進行IDENTIFY
命令的調用等。
上一篇帖子裏介紹了,客戶端如何鏈接nsqlookupd
來進行通訊
adds := []string{"127.0.0.1:7201", "127.0.0.1:8201"} config := nsq.NewConfig() config.MaxInFlight = 1000 config.MaxBackoffDuration = 5 * time.Second config.DialTimeout = 10 * time.Second topicName := "testTopic1" c, _ := nsq.NewConsumer(topicName, "ch1", config) testHandler := &MyTestHandler{consumer: c} c.AddHandler(testHandler) if err := c.ConnectToNSQLookupds(adds); err != nil { panic(err) }
須要注意adds
裏地址的端口,是nsqlookupd
的http
端口
這裏我還使用上一篇帖子中的圖,給你們詳細分析
調用方法c.ConnectToNSQLookupds(adds)
,他的實現是訪問nsqlookupd
的http端口http://127.0.0.1:7201/lookup?topic=testTopic1
獲得提供consumer
訂閱的topic
全部的producers
節點信息, url返回的數據信息以下。
{ "channels": [ "nsq_to_file", "ch1" ], "producers": [ { "remote_address": "127.0.0.1:58606", "hostname": "li-peng-mc-macbook.local", "broadcast_address": "li-peng-mc-macbook.local", "tcp_port": 8000, "http_port": 8001, "version": "1.1.1-alpha" }, { "remote_address": "127.0.0.1:58627", "hostname": "li-peng-mc-macbook.local", "broadcast_address": "li-peng-mc-macbook.local", "tcp_port": 7000, "http_port": 7001, "version": "1.1.1-alpha" } ] }
方法queryLookupd
就是進行的上圖的操做
topic
的 nsqd
列表func (r *Consumer) queryLookupd() { retries := 0 retry: endpoint := r.nextLookupdEndpoint() // ... err := apiRequestNegotiateV1("GET", endpoint, nil, &data) if err != nil { // ... } var nsqdAddrs []string for _, producer := range data.Producers { broadcastAddress := producer.BroadcastAddress port := producer.TCPPort joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port)) nsqdAddrs = append(nsqdAddrs, joined) } // 進行鏈接 for _, addr := range nsqdAddrs { err = r.ConnectToNSQD(addr) if err != nil && err != ErrAlreadyConnected { r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err) continue } } }
有新的nsqd加入,是如何處理的呢?
在調用ConnectToNSQLookupd
時會啓動一個協程go r.lookupdLoop()
調用方法lookupdLoop
的定時循環訪問 queryLookupd
更新 nsqd
的可用列表
// poll all known lookup servers every LookupdPollInterval func (r *Consumer) lookupdLoop() { // ... var ticker *time.Ticker select { case <-time.After(jitter): case <-r.exitChan: goto exit } // 設置Interval 來循環訪問 queryLookupd ticker = time.NewTicker(r.config.LookupdPollInterval) for { select { case <-ticker.C: r.queryLookupd() case <-r.lookupdRecheckChan: r.queryLookupd() case <-r.exitChan: goto exit } } exit: // ... }
當有nsqd
出現故障時怎麼辦?當前的處理方式是
nsqdlookupd
會把這個故障節點從可用列表中去除,客戶端從接口獲得的可用列表永遠都是可用的。nsqlookup
進行了鏈接,若是是則case r.lookupdRecheckChan <- 1
去刷新可用列表queryLookupd
,若是不是,而後啓動一個協程去定時作重試鏈接,若是故障恢復,鏈接成功,會從新加入到可用列表.func (r *Consumer) onConnClose(c *Conn) { // ... // remove this connections RDY count from the consumer's total delete(r.connections, c.String()) left := len(r.connections) // ... r.mtx.RLock() numLookupd := len(r.lookupdHTTPAddrs) reconnect := indexOf(c.String(), r.nsqdTCPAddrs) >= 0 // 若是使用的是nslookup則去刷新可用列表 if numLookupd > 0 { // trigger a poll of the lookupd select { case r.lookupdRecheckChan <- 1: default: } } else if reconnect { // ... }(c.String()) } }