最近一直在尋找一個高性能,高可用的消息隊列作內部服務之間的通信。一開始想到用zeromq,但在查找資料的過程當中,意外的發現了Nsq這個由golang開發的消息隊列,畢竟是golang原汁原味的東西,功能齊全,關鍵是性能還不錯。其中支持動態拓展,消除單點故障等特性, 均可以很好的知足個人需求node
下面上一張Nsq與其餘mq的對比圖,看上去的確強大。下面簡單記錄一下Nsq的使用方法
圖片來自golang2017開發者大會git
在使用Nsq服務以前,仍是有必要了解一下Nsq的幾個核心組件
整個Nsq服務包含三個主要部分github
先看看官方的原話是怎麼說:
nsqlookupd是守護進程負責管理拓撲信息。客戶端經過查詢 nsqlookupd 來發現指定話題(topic)的生產者,而且 nsqd 節點廣播話題(topic)和通道(channel)信息golang
簡單的說nsqlookupd就是中心管理服務,它使用tcp(默認端口4160)管理nsqd服務,使用http(默認端口4161)管理nsqadmin服務。同時爲客戶端提供查詢功能sql
總的來講,nsqlookupd具備如下功能或特性shell
官方原話:是一套 WEB UI,用來聚集集羣的實時統計,並執行不一樣的管理任務緩存
總的來講,nsqadmin具備如下功能或特性服務器
nsqadmin默認的訪問地址是http://127.0.0.1:4171/ 負載均衡
官方原話:nsqd 是一個守護進程,負責接收,排隊,投遞消息給客戶端tcp
簡單的說,真正幹活的就是這個服務,它主要負責message的收發,隊列的維護。nsqd會默認監聽一個tcp端口(4150)和一個http端口(4151)以及一個可選的https端口
總的來講,nsqd 具備如下功能或特性
這是官方的圖,第一個channel(meteics)由於有多個消費者,因此觸發了負載均衡機制。後面兩個channel因爲沒有消費者,全部的message均會被緩存在相應的隊列裏,直到消費者出現
這裏想到一個問題是,若是一個channel只有生產者不停的在投遞message,會不會致使服務器資源被耗盡?也許nsqd內部作了相應處理,但仍是要避免這種狀況的出現
瞭解nsqlookupd,nsqd與客戶端中消費者和生產者的關係
消費者有兩種方式與nsqd創建鏈接
生產者必須直連nsqd去投遞message(網上說,能夠鏈接到nsqlookupd,讓nsqlookupd自動選擇一個nsqd去完成投遞,可是我用Producer的tcp是連不上nsqlookupd的,不知道http可不能夠...),
這裏有一個問題就是若是生產者所鏈接的nsqd炸了,那麼message就會投遞失敗,因此在客戶端必須本身實現相應的備用方案
首先搭建golang開發環境,這裏就不詳細講了
go get github.com/tools/godep
執行完後檢查godep是否已經安裝在bin目錄下,通常都會自動安裝,若是沒有,用go install手動安裝下
go get github.com/bmizerany/assert
godep get github.com/bitly/nsq/...
若是安裝成功,bin目錄裏就會出現一大堆nsq_...開頭的可執行文件
PS:若是安裝失敗
這時採用方法二安裝
nsqd是一個獨立的服務,啓動一個nsqd就能夠完成message的收發,啓動一個單機的nsqd,很簡單
nsqd
客戶端可使用http,也可使用tcp,這裏我使用是官方的go-nsq包作客戶端,使用tcp進行message的收發
nsqlookupd
nsqd --lookupd-tcp-address=127.0.0.1:4160
nsqd --lookupd-tcp-address=127.0.0.1:4160 -tcp-address=0.0.0.0:4152 -http-address=0.0.0.0:4153
nsqadmin --lookupd-http-address=127.0.0.1:4161
Consumer的重連時間配置項有兩個功能(這個設計必須吐槽一下,分開配置更好一點)
//Nsq發送測試 package main import ( "bufio" "fmt" "github.com/nsqio/go-nsq" "os" ) var producer *nsq.Producer // 主函數 func main() { strIP1 := "127.0.0.1:4150" strIP2 := "127.0.0.1:4152" InitProducer(strIP1) running := true //讀取控制檯輸入 reader := bufio.NewReader(os.Stdin) for running { data, _, _ := reader.ReadLine() command := string(data) if command == "stop" { running = false } for err := Publish("test", command); err != nil; err = Publish("test", command) { //切換IP重連 strIP1, strIP2 = strIP2, strIP1 InitProducer(strIP1) } } //關閉 producer.Stop() } // 初始化生產者 func InitProducer(str string) { var err error fmt.Println("address: ", str) producer, err = nsq.NewProducer(str, nsq.NewConfig()) if err != nil { panic(err) } } //發佈消息 func Publish(topic string, message string) error { var err error if producer != nil { if message == "" { //不能發佈空串,不然會致使error return nil } err = producer.Publish(topic, []byte(message)) // 發佈消息 return err } return fmt.Errorf("producer is nil", err) }
//Nsq接收測試 package main import ( "fmt" "time" "github.com/nsqio/go-nsq" ) // 消費者 type ConsumerT struct{} // 主函數 func main() { InitConsumer("test", "test-channel", "127.0.0.1:4161") for { time.Sleep(time.Second * 10) } } //處理消息 func (*ConsumerT) HandleMessage(msg *nsq.Message) error { fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body)) return nil } //初始化消費者 func InitConsumer(topic string, channel string, address string) { cfg := nsq.NewConfig() cfg.LookupdPollInterval = time.Second //設置重連時間 c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一個消費者 if err != nil { panic(err) } c.SetLogger(nil, 0) //屏蔽系統日誌 c.AddHandler(&ConsumerT{}) // 添加消費者接口 //創建NSQLookupd鏈接 if err := c.ConnectToNSQLookupd(address); err != nil { panic(err) } //創建多個nsqd鏈接 // if err := c.ConnectToNSQDs([]string{"127.0.0.1:4150", "127.0.0.1:4152"}); err != nil { // panic(err) // } // 創建一個nsqd鏈接 // if err := c.ConnectToNSQD("127.0.0.1:4150"); err != nil { // panic(err) // } }