剖析nsq消息隊列(二) 去中心化源碼解析

在上一篇帖子剖析nsq消息隊列(一) 簡介及去中心化實現原理中,我介紹了nsq的兩種使用方式,一種是直接鏈接,還有一種是經過nslookup來實現去中心化的方式使用,並大概說了一下實現原理,沒有什麼難理解的東西,這篇帖子我把nsq實現去中心化的源碼和其中的業物邏輯展現給你們看一下。html

nsqd和nsqlookupd的通訊實現

上一篇中在啓動nsqd時我用瞭如下命令,我指定了一個參數 --lookupd-tcp-addresssql

./nsqd -tcp-address ":8000"  -http-address ":8001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./a
複製代碼

--lookupd-tcp-address 用於指定nsqlookupdtcp監聽地址。json

nsqdnsqlookupd的通訊交流簡單來講就是下圖這樣 api

nsqd啓動後鏈接nsqlookupd,鏈接成功後,要發送一個魔法標識nsq.MagicV1,這個標識有啥魔法麼,固然不是,他只是用於標明,客戶端和服務端雙方使用的信息通訊版本,不能的版本有不一樣的處理方式,爲了後期作新的消息處理版本方便吧。 nsqlookupd 的代碼塊bash

func (p *tcpServer) Handle(clientConn net.Conn) {	
	// ...
	buf := make([]byte, 4)
	_, err := io.ReadFull(clientConn, buf)
	// ...
	protocolMagic := string(buf)
	// ...
	var prot protocol.Protocol
	switch protocolMagic {
	case " V1":
		prot = &LookupProtocolV1{ctx: p.ctx}
	default:
		// ...
		return
	}
	err = prot.IOLoop(clientConn)
	//...
}
複製代碼

這個時候的nsqd已經和nsqlookupd創建好了鏈接,可是這時,僅僅說明他倆鏈接成功。 nsqlookupd也並無把這個鏈接加到可用的nsqd列表裏。 創建鏈接完成後,nsqd會發送IDENTIFY命令,這個命令裏包含了nsq的基本信息 nsqd的代碼app

ci := make(map[string]interface{})
		ci["version"] = version.Binary
		ci["tcp_port"] = n.RealTCPAddr().Port
		ci["http_port"] = n.RealHTTPAddr().Port
		ci["hostname"] = hostname
		ci["broadcast_address"] = n.getOpts().BroadcastAddress

		cmd, err := nsq.Identify(ci)
		if err != nil {
			lp.Close()
			return
		}
		resp, err := lp.Command(cmd)
複製代碼

包含了nsqd 提供的tcphttp端口,主機名,版本等等,發送給nsqlookupd,nsqlookupd收到IDENTIFY命令後,解析信息而後加到nsqd的可用列表裏 nsqlookupd 的代碼塊tcp

func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
	var err error
	if client.peerInfo != nil {
		return nil, protocol.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again")
	}
	var bodyLen int32
	err = binary.Read(reader, binary.BigEndian, &bodyLen)
	// ...
	body := make([]byte, bodyLen)
	_, err = io.ReadFull(reader, body)
	// ...	
	peerInfo := PeerInfo{id: client.RemoteAddr().String()}
	err = json.Unmarshal(body, &peerInfo)
	// ...
	client.peerInfo = &peerInfo
	// 把nsqd的鏈接加入到可用列表裏    
	if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) {
		p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "")
	}
	// ...
	return response, nil
}
複製代碼

而後每過15秒,會發送一個PING心跳命令給nsqlookupd,這樣保持存活狀態,nsqlookupd每次收到發過來的PING命令後,也會記下這個nsqd的最後更新時間,這樣作爲一個篩選條件,若是長時間沒有更新,就認爲這個節點有問題,不會把這個節點的信息加入到可用列表。 到此爲止,一個nsqd就把本身的信息註冊到nsqlookupd的可用列表了,咱們能夠啓動多個nsqd和多個nsqlookupd,爲nsqd 指定多個nsqlookupd,就如同我上一篇帖子寫的那樣oop

--lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200
複製代碼

nsqd和全部的nsqlookupd創建鏈接,註冊服務信息,並保持心跳,保證可用列表的更新.ui

nsqlookupd 掛掉的處理方式

上面咱們說了nsqd若是出現問題,nsqlookupdnsqd可用列表裏就會處理掉這個鏈接信息。如nsqlookupd掛了怎麼辦呢 this

目前的處理方式是這樣的, 不管是心跳,仍是其餘命令, nsqd會給全部的 nsqlookup發送信息,當 nsqd發現 nsqlookupd出現問題時,在每次發送命令時,會不斷的進行從新鏈接:

func (lp *lookupPeer) Command(cmd *nsq.Command) ([]byte, error) {
	initialState := lp.state
	if lp.state != stateConnected {
		err := lp.Connect()
		if err != nil {
			return nil, err
		}
		lp.state = stateConnected
		_, err = lp.Write(nsq.MagicV1)
		if err != nil {
			lp.Close()
			return nil, err
		}
		if initialState == stateDisconnected {
			lp.connectCallback(lp)
		}
		if lp.state != stateConnected {
			return nil, fmt.Errorf("lookupPeer connectCallback() failed")
		}
	}
	// ...
}
複製代碼

若是鏈接成功,會再次調用connectCallback方法,進行IDENTIFY命令的調用等。

客戶端和nsqlookupd、nsqd的通訊實現

上一篇帖子裏介紹了,客戶端如何鏈接nsqlookupd來進行通訊

adds := []string{"127.0.0.1:7201", "127.0.0.1:8201"}
	config := nsq.NewConfig()
	config.MaxInFlight = 1000
	config.MaxBackoffDuration = 5 * time.Second
	config.DialTimeout = 10 * time.Second

	topicName := "testTopic1"
	c, _ := nsq.NewConsumer(topicName, "ch1", config)
	testHandler := &MyTestHandler{consumer: c}

	c.AddHandler(testHandler)
	if err := c.ConnectToNSQLookupds(adds); err != nil {
		panic(err)
	}
複製代碼

須要注意adds裏地址的端口,是nsqlookupdhttp端口 這裏我還使用上一篇帖子中的圖,給你們詳細分析

調用方法 c.ConnectToNSQLookupds(adds),他的實現是訪問 nsqlookupd的http端口 http://127.0.0.1:7201/lookup?topic=testTopic1獲得提供 consumer訂閱的 topic全部的 producers節點信息, url返回的數據信息以下。

{
  "channels": [
    "nsq_to_file",
    "ch1"
  ],
  "producers": [
    {
      "remote_address": "127.0.0.1:58606",
      "hostname": "li-peng-mc-macbook.local",
      "broadcast_address": "li-peng-mc-macbook.local",
      "tcp_port": 8000,
      "http_port": 8001,
      "version": "1.1.1-alpha"
    },
    {
      "remote_address": "127.0.0.1:58627",
      "hostname": "li-peng-mc-macbook.local",
      "broadcast_address": "li-peng-mc-macbook.local",
      "tcp_port": 7000,
      "http_port": 7001,
      "version": "1.1.1-alpha"
    }
  ]
}
複製代碼

方法 queryLookupd就是進行的上圖的操做

  • 獲得提供訂閱的topicnsqd列表
  • 進行鏈接
func (r *Consumer) queryLookupd() {
	retries := 0
retry:
	endpoint := r.nextLookupdEndpoint()

	// ...	
	err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
	if err != nil {
		// ...
	}
	var nsqdAddrs []string
	for _, producer := range data.Producers {
		broadcastAddress := producer.BroadcastAddress
		port := producer.TCPPort
		joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port))
		nsqdAddrs = append(nsqdAddrs, joined)
	}
	// 進行鏈接
	for _, addr := range nsqdAddrs {
		err = r.ConnectToNSQD(addr)
		if err != nil && err != ErrAlreadyConnected {
			r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
			continue
		}
	}
}
複製代碼

如何刷新nsqd的可用列表

有新的nsqd加入,是如何處理的呢? 在調用ConnectToNSQLookupd時會啓動一個協程go r.lookupdLoop() 調用方法lookupdLoop的定時循環訪問 queryLookupd 更新 nsqd的可用列表

// poll all known lookup servers every LookupdPollInterval
func (r *Consumer) lookupdLoop() {
	// ...
	var ticker *time.Ticker
	select {
	case <-time.After(jitter):
	case <-r.exitChan:
		goto exit
	}
	// 設置Interval 來循環訪問 queryLookupd
	ticker = time.NewTicker(r.config.LookupdPollInterval)
	for {
		select {
		case <-ticker.C:
			r.queryLookupd()
		case <-r.lookupdRecheckChan:
			r.queryLookupd()
		case <-r.exitChan:
			goto exit
		}
	}

exit:
	// ...
}
複製代碼

處理 nsqd 的單點故障

當有 nsqd出現故障時怎麼辦?當前的處理方式是

  • nsqdlookupd會把這個故障節點從可用列表中去除,客戶端從接口獲得的可用列表永遠都是可用的。
  • 客戶端會把這個故障節點從可用節點上移除,而後要去判斷是否使用了nsqlookup進行了鏈接,若是是則case r.lookupdRecheckChan <- 1 去刷新可用列表queryLookupd,若是不是,而後啓動一個協程去定時作重試鏈接,若是故障恢復,鏈接成功,會從新加入到可用列表. 客戶端實現的代碼
func (r *Consumer) onConnClose(c *Conn) {
	// ...
	// remove this connections RDY count from the consumer's total delete(r.connections, c.String()) left := len(r.connections) // ... r.mtx.RLock() numLookupd := len(r.lookupdHTTPAddrs) reconnect := indexOf(c.String(), r.nsqdTCPAddrs) >= 0 // 若是使用的是nslookup則去刷新可用列表 if numLookupd > 0 { // trigger a poll of the lookupd select { case r.lookupdRecheckChan <- 1: default: } } else if reconnect { // ... }(c.String()) } } 複製代碼
相關文章
相關標籤/搜索