剖析nsq消息隊列(一) 簡介及去中心化實現原理

分佈式消息隊列nsq,簡單易用,去中心化的設計使nsq更健壯,nsq充分利用了go語言的goroutinechannel來實現的消息處理,代碼量也不大,讀不了多久就沒了。後期的文章我會把nsq的源碼分析給你們看。 主要的分析路線以下html

  • 分析nsq的總體框架結構,分析如何作到的無中心化分佈式拓撲結構,如何處理的單點故障。
  • 分析nsq是如何保證消息的可靠性,如何保證消息的處理,對於消息的持久化是如何處理和擴展的。
  • 分析nsq是如何作的消息的負載處理,即如何把合理的、不超過客戶端消費能力的狀況下,把消息分發到不一樣的客戶端。
  • 分析nsq提供的一些輔助組件。

這篇帖子,介紹nsq的主體結構,以及他是如何作到去中心化的分佈式拓撲結構,如何處理的單點故障。 幾個組件是須要先大概說一下 nsqd 消息隊列的主體,對消息的接收,處理和把消息分發到客戶端。 nsqlookupd nsq拓撲結構信息的管理者,有了他才能組成一個簡單易用的無中心化的分佈式拓撲網絡結構。 go-nsq nsq官方的go語言客戶端,基本上市面上的主流編程語言都有相應的客戶端在這裏 還有可視化的組件nsqadmin和一些工具像nsq_to_filensq_stat、等等,這些在後期的帖子裏會介紹git

使用方式

兩種方式一種是直接鏈接另外一種是經過nsqlookupd進行鏈接github

直連方式

nsqd是獨立運行的,咱們能夠直接使用部署幾個nsqd而後使用客戶端直連的方式使用算法

例子

目前資源有限,我就都在一臺機器上模擬了 啓動兩個nsqdsql

./nsqd -tcp-address ":8000"  -http-address ":8001" -data-path=./a
複製代碼
./nsqd -tcp-address ":7000"  -http-address ":7001" -data-path=./b
複製代碼

正常啓動會有相似下面的輸出編程

[nsqd] 2019/08/29 18:42:56.928345 INFO: nsqd v1.1.1-alpha (built w/go1.12.7)
[nsqd] 2019/08/29 18:42:56.928512 INFO: ID: 538
[nsqd] 2019/08/29 18:42:56.928856 INFO: NSQ: persisting topic/channel metadata to b/nsqd.dat
[nsqd] 2019/08/29 18:42:56.935797 INFO: TCP: listening on [::]:7000
[nsqd] 2019/08/29 18:42:56.935891 INFO: HTTP: listening on [::]:7001
複製代碼

簡單使用bash

func main() {
	adds := []string{"127.0.0.1:7000", "127.0.0.1:8000"}
	config := nsq.NewConfig()

	topicName := "testTopic1"
	c, _ := nsq.NewConsumer(topicName, "ch1", config)
	testHandler := &MyTestHandler{consumer: c}

	c.AddHandler(testHandler)
	if err := c.ConnectToNSQDs(adds); err != nil {
		panic(err)
	}
	stats := c.Stats()
	if stats.Connections == 0 {
		panic("stats report 0 connections (should be > 0)")
	}
	stop := make(chan os.Signal)
	signal.Notify(stop, os.Interrupt)
	fmt.Println("server is running....")
	<-stop
}

type MyTestHandler struct {
	consumer *nsq.Consumer
}

func (m MyTestHandler) HandleMessage(message *nsq.Message) error {
	fmt.Println(string(message.Body))
	return nil
}
複製代碼

方法 c.ConnectToNSQDs(adds),鏈接多個nsqd服務 而後運行多個客戶端實現 這時,咱們發送一個消息,網絡

curl -d 'hello world 2' 'http://127.0.0.1:7001/pub?topic=testTopic1'
複製代碼

nsqd會根據他的算法,把消息分配到一個客戶端 客戶端的輸入以下框架

2019/08/30 12:05:32 INF    1 [testTopic1/ch1] (127.0.0.1:7000) connecting to nsqd
2019/08/30 12:05:32 INF    1 [testTopic1/ch1] (127.0.0.1:8000) connecting to nsqd
server is running....
hello world 2
複製代碼

可是這種作的話,須要客戶端作一些額外的工做,須要頻繁的去檢查全部nsqd的狀態,若是發現出現問題須要客戶端主動去處理這些問題。curl

總結

我使用的客戶端庫是官方庫 go-nsq,使用直接連nsqd的方式,

  • 若是有nsqd出現問題,如今的處理方式,他會每隔一段時間執行一次重連操做。想去掉這個鏈接信息就要額外作一些處理了。
  • 若是對nsqd進行橫向擴充,只能是本身民額外的寫一些代碼調用ConnectToNSQDs或者ConnectToNSQD方法

去中心化鏈接方式 nsqlookupd

官方推薦使用鏈接nsqlookupd的方式,nsqlookupd用於作服務的註冊和發現,這樣能夠作到去中心化。

圖中咱們運行着多個nsqd和多個nsqlookupd的實例,客戶端去鏈接nsqlookupd來操做nsqd

例子

咱們要先啓動nsqlookupd,爲了演示方便,我啓動兩個nsqlookupd實例, 三個nsqd實例

./nsqlookupd -tcp-address ":8200" -http-address ":8201"
複製代碼
./nsqlookupd -tcp-address ":7200" -http-address ":7201"
複製代碼

爲了演示橫向擴充,先啓動兩個,客戶端鏈接後,再啓動第三個。

./nsqd -tcp-address ":8000"  -http-address ":8001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./a
複製代碼
./nsqd -tcp-address ":7000"  -http-address ":7001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200  -data-path=./b
複製代碼

--lookupd-tcp-address 用於指定lookup的鏈接地址

客戶端簡單代碼

package main

import (
	"fmt"
	"os"
	"os/signal"
	"time"

	"github.com/nsqio/go-nsq"
)

func main() {
	adds := []string{"127.0.0.1:7201", "127.0.0.1:8201"}
	config := nsq.NewConfig()
	config.MaxInFlight = 1000
	config.MaxBackoffDuration = 5 * time.Second
	config.DialTimeout = 10 * time.Second

	topicName := "testTopic1"
	c, _ := nsq.NewConsumer(topicName, "ch1", config)
	testHandler := &MyTestHandler{consumer: c}

	c.AddHandler(testHandler)
	if err := c.ConnectToNSQLookupds(adds); err != nil {
		panic(err)
	}
	stats := c.Stats()
	if stats.Connections == 0 {
		panic("stats report 0 connections (should be > 0)")
	}
	stop := make(chan os.Signal)
	signal.Notify(stop, os.Interrupt)
	fmt.Println("server is running....")
	<-stop
}

type MyTestHandler struct {
	consumer *nsq.Consumer
}

func (m MyTestHandler) HandleMessage(message *nsq.Message) error {
	fmt.Println(string(message.Body))
	return nil
}

複製代碼

方法ConnectToNSQLookupds就是用於鏈接nsqlookupd的,可是須要注意的是,鏈接的是http端口72018201,庫go-nsq 是經過請求其中一個nsqlookupd的 http 方法http://127.0.0.1:7201/lookup?topic=testTopic1 來獲得全部提供topic=testTopic1nsqd 列表信息,而後對全部的nsqd進行鏈接,

2019/08/30 13:47:26 INF    1 [testTopic1/ch1] querying nsqlookupd http://127.0.0.1:7201/lookup?topic=testTopic1
2019/08/30 13:47:26 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:7000) connecting to nsqd
2019/08/30 13:47:26 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) connecting to nsqd

複製代碼

目前咱們已經鏈接了兩個。 咱們演示一下橫向擴充,啓動第三個nsqd

./nsqd -tcp-address ":6000"  -http-address ":6001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200  -data-path=./c
複製代碼

這裏會有一個問題,當我啓動了一個親的nsqd可是他的topic是空的,咱們需指定這新的nsqd處理哪些topic。 咱們能夠用nsqadmin查看全部的topic

./nsqadmin  --lookupd-http-address=127.0.0.1:8201 --lookupd-http-address=127.0.0.1:7201
複製代碼

而後去你的nsqd上去建topic

curl -X POST 'http://127.0.0.1:6001/topic/create?topic=testTopic1'
複製代碼

固然也能夠本身寫一些自動化的角本 查看客戶端的日誌輸出

2019/08/30 14:56:01 INF    1 [testTopic1/ch1] querying nsqlookupd http://127.0.0.1:7201/lookup?topic=testTopic1
2019/08/30 14:56:01 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:6000) connecting to nsqd

複製代碼

已經連上咱們的新nsqd

我手動關閉一個nsqd實例 客戶端的日誌輸出已經斷開了鏈接

2019/08/30 15:04:20 ERR    1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) IO error - EOF
2019/08/30 15:04:20 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) beginning close
2019/08/30 15:04:20 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) readLoop exiting
2019/08/30 15:04:20 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) breaking out of writeLoop
2019/08/30 15:04:20 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) writeLoop exiting
2019/08/30 15:04:20 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) finished draining, cleanup exiting
2019/08/30 15:04:20 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) clean close complete
2019/08/30 15:04:20 WRN    1 [testTopic1/ch1] there are 2 connections left alive

複製代碼

而且nsqdnsqlookupd也斷開了鏈接,客戶端再次從nsqlookupd取全部的nsqd的地址時獲得的老是可用的地址。

去中心化實現原理

nsqlookupd用於管理整個網絡拓撲結構,nsqd用他實現服務的註冊,客戶端使用他獲得全部的nsqd服務節點信息,而後全部的consumer端鏈接 實現原理以下,

  • nsqd把本身的服務信息廣播給一個或者多個nsqlookupd
  • 客戶端鏈接一個或者多個nsqlookupd,經過nsqlookupd獲得全部的nsqd的鏈接信息,進行鏈接消費,
  • 若是某個nsqd出現問題,down機了,會和nsqlookupd斷開,這樣客戶端nsqlookupd獲得的nsqd的列表永遠是可用的。客戶端鏈接的是全部的nsqd,一個出問題了就用其餘的鏈接,因此也不會受影響。
相關文章
相關標籤/搜索