golang使用Nsq

爲何要使用Nsq

最近一直在尋找一個高性能,高可用的消息隊列作內部服務之間的通信。一開始想到用zeromq,但在查找資料的過程當中,意外的發現了Nsq這個由golang開發的消息隊列,畢竟是golang原汁原味的東西,功能齊全,關鍵是性能還不錯。其中支持動態拓展,消除單點故障等特性,  均可以很好的知足個人需求node

下面上一張Nsq與其餘mq的對比圖,看上去的確強大。下面簡單記錄一下Nsq的使用方法
golang2017開發者大會
圖片來自golang2017開發者大會git

Nsq服務端

Nsq服務端簡介

在使用Nsq服務以前,仍是有必要了解一下Nsq的幾個核心組件
整個Nsq服務包含三個主要部分github

nsqlookupd

先看看官方的原話是怎麼說:
nsqlookupd是守護進程負責管理拓撲信息。客戶端經過查詢 nsqlookupd 來發現指定話題(topic)的生產者,而且 nsqd 節點廣播話題(topic)和通道(channel)信息golang

簡單的說nsqlookupd就是中心管理服務,它使用tcp(默認端口4160)管理nsqd服務,使用http(默認端口4161)管理nsqadmin服務。同時爲客戶端提供查詢功能sql

總的來講,nsqlookupd具備如下功能或特性shell

  • 惟一性,在一個Nsq服務中只有一個nsqlookupd服務。固然也能夠在集羣中部署多個nsqlookupd,但它們之間是沒有關聯的
  • 去中心化,即便nsqlookupd崩潰,也會不影響正在運行的nsqd服務
  • 充當nsqd和naqadmin信息交互的中間件
  • 提供一個http查詢服務,給客戶端定時更新nsqd的地址目錄 

nsqadmin

官方原話:是一套 WEB UI,用來聚集集羣的實時統計,並執行不一樣的管理任務緩存

總的來講,nsqadmin具備如下功能或特性服務器

  • 提供一個對topic和channel統一管理的操做界面以及各類實時監控數據的展現,界面設計的很簡潔,操做也很簡單
  • 展現全部message的數量,恩....裝X利器
  • 可以在後臺建立topic和channel,這個應該不經常使用到
  • nsqadmin的全部功能都必須依賴於nsqlookupd,nsqadmin只是向nsqlookupd傳遞用戶操做並展現來自nsqlookupd的數據

nsqadmin默認的訪問地址是http://127.0.0.1:4171/ 負載均衡

nsqd

官方原話:nsqd 是一個守護進程,負責接收,排隊,投遞消息給客戶端tcp

簡單的說,真正幹活的就是這個服務,它主要負責message的收發,隊列的維護。nsqd會默認監聽一個tcp端口(4150)和一個http端口(4151)以及一個可選的https端口

總的來講,nsqd 具備如下功能或特性

  • 對訂閱了同一個topic,同一個channel的消費者使用負載均衡策略(不是輪詢)
  • 只要channel存在,即便沒有該channel的消費者,也會將生產者的message緩存到隊列中(注意消息的過時處理)
  • 保證隊列中的message至少會被消費一次,即便nsqd退出,也會將隊列中的消息暫存磁盤上(結束進程等意外狀況除外)
  • 限定內存佔用,可以配置nsqd中每一個channel隊列在內存中緩存的message數量,一旦超出,message將被緩存到磁盤中
  • topic,channel一旦創建,將會一直存在,要及時在管理臺或者用代碼清除無效的topic和channel,避免資源的浪費

這是官方的圖,第一個channel(meteics)由於有多個消費者,因此觸發了負載均衡機制。後面兩個channel因爲沒有消費者,全部的message均會被緩存在相應的隊列裏,直到消費者出現

這裏想到一個問題是,若是一個channel只有生產者不停的在投遞message,會不會致使服務器資源被耗盡?也許nsqd內部作了相應處理,但仍是要避免這種狀況的出現

Nsq服務端與客戶端的關係

瞭解nsqlookupd,nsqd與客戶端中消費者和生產者的關係

消費者

消費者有兩種方式與nsqd創建鏈接

  • 消費者直連nsqd,這是最簡單的方式,缺點是nsqd服務沒法實現動態伸縮了(固然,本身去實現一個也是能夠的)  
  • 消費者經過http查詢nsqlookupd獲取該nsqlookupd上全部nsqd的鏈接地址,而後再分別和這些nsqd創建鏈接(官方推薦的作法),可是客戶端會不停的向nsqlookupd查詢最新的nsqd地址目錄(不喜歡用http輪詢這種方式...)
    仍是看圖更直接些 ,官方的消費者模型:

clipboard.png

生產者

生產者必須直連nsqd去投遞message(網上說,能夠鏈接到nsqlookupd,讓nsqlookupd自動選擇一個nsqd去完成投遞,可是我用Producer的tcp是連不上nsqlookupd的,不知道http可不能夠...),

這裏有一個問題就是若是生產者所鏈接的nsqd炸了,那麼message就會投遞失敗,因此在客戶端必須本身實現相應的備用方案

安裝Nsq

方法一

  • 首先搭建golang開發環境,這裏就不詳細講了

    • 注意一下,搭建golang環境時最好將bin目錄添加到系統環境(path)裏,省去了每次都要去bin目錄裏執行的麻煩
  • 安裝包管理器godep
go get github.com/tools/godep

執行完後檢查godep是否已經安裝在bin目錄下,通常都會自動安裝,若是沒有,用go install手動安裝下

  • 安裝依賴包assert
go get github.com/bmizerany/assert
  • 安裝Nsq
godep get github.com/bitly/nsq/...

若是安裝成功,bin目錄裏就會出現一大堆nsq_...開頭的可執行文件

  • PS:若是安裝失敗

    • 像我同樣出現了一大堆"use of internal package not allowed"錯誤,找了半天,纔在一個角落裏發現了這句話
      注意:NSQ 保持了 go get 兼容,可是不推薦,由於以後不能保證仍然能穩定編譯。

    這時採用方法二安裝

方法二

運行Nsq

運行單機nsqd服務

nsqd是一個獨立的服務,啓動一個nsqd就能夠完成message的收發,啓動一個單機的nsqd,很簡單

nsqd

客戶端可使用http,也可使用tcp,這裏我使用是官方的go-nsq包作客戶端,使用tcp進行message的收發

  • 發送
    clipboard.png
  • 接收
    clipboard.png

運行Nsq服務集羣

  • 首先啓動nsqlookud
nsqlookupd
  • 啓動nsqd,並接入剛剛啓動的nsqlookud。這裏爲了方便接下來的測試,啓動了兩個nsqd
nsqd --lookupd-tcp-address=127.0.0.1:4160
nsqd --lookupd-tcp-address=127.0.0.1:4160 -tcp-address=0.0.0.0:4152 -http-address=0.0.0.0:4153
  • 啓動nqsadmin
nsqadmin --lookupd-http-address=127.0.0.1:4161

基於go-nsq的客戶端實現

幾個值得注意的地方

  • Producer斷線後不會重連,須要本身手動重連,Consumer斷線後會自動重連
  • Consumer的重連時間配置項有兩個功能(這個設計必須吐槽一下,分開配置更好一點)

    • Consumer檢測到與nsqd的鏈接斷開後,每隔x秒向nsqd請求重連
    • Consumer每隔x秒,向nsqlookud進行http輪詢,用來更新本身的nsqd地址目錄
    • Consumer的重連時間默認是60s(...菜都涼了),我改爲了1s
  • Consumer能夠同時接收不一樣nsqd node的同名topic數據,爲了不混淆,就必須在客戶端進行處理
  • 在AddConurrentHandlers和 AddHandler中設置的接口回調是在另外的goroutine中執行的
  • Producer不能發佈(Publish)空message,不然會致使panic

go_nsq-send.go

//Nsq發送測試
package main

import (
 "bufio"
 "fmt"
 "github.com/nsqio/go-nsq"
 "os"
)

var producer *nsq.Producer

// 主函數
func main() {
 strIP1 := "127.0.0.1:4150"
 strIP2 := "127.0.0.1:4152"
 InitProducer(strIP1)

 running := true

 //讀取控制檯輸入
 reader := bufio.NewReader(os.Stdin)
 for running {
  data, _, _ := reader.ReadLine()
  command := string(data)
  if command == "stop" {
   running = false
  }

  for err := Publish("test", command); err != nil; err = Publish("test", command) {
   //切換IP重連
   strIP1, strIP2 = strIP2, strIP1
   InitProducer(strIP1)
  }
 }
 //關閉
 producer.Stop()
}

// 初始化生產者
func InitProducer(str string) {
 var err error
 fmt.Println("address: ", str)
 producer, err = nsq.NewProducer(str, nsq.NewConfig())
 if err != nil {
 panic(err)
 }
}

//發佈消息
func Publish(topic string, message string) error {
 var err error
 if producer != nil {
  if message == "" { //不能發佈空串,不然會致使error
   return nil
  }
  err = producer.Publish(topic, []byte(message)) // 發佈消息
  return err
 }
 return fmt.Errorf("producer is nil", err)
}

go_nsq-receive.go

//Nsq接收測試
package main

import (
 "fmt"
 "time"

 "github.com/nsqio/go-nsq"
)

// 消費者
type ConsumerT struct{}

// 主函數
func main() {
 InitConsumer("test", "test-channel", "127.0.0.1:4161")
 for {
  time.Sleep(time.Second * 10)
 }
}

//處理消息
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
 fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
 return nil
}

//初始化消費者
func InitConsumer(topic string, channel string, address string) {
 cfg := nsq.NewConfig()
 cfg.LookupdPollInterval = time.Second          //設置重連時間
 c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一個消費者
 if err != nil {
 panic(err)
 }
 c.SetLogger(nil, 0)        //屏蔽系統日誌
 c.AddHandler(&ConsumerT{}) // 添加消費者接口

 //創建NSQLookupd鏈接
 if err := c.ConnectToNSQLookupd(address); err != nil {
 panic(err)
 }

 //創建多個nsqd鏈接
 // if err := c.ConnectToNSQDs([]string{"127.0.0.1:4150", "127.0.0.1:4152"}); err != nil {
 //  panic(err)
 // }

 // 創建一個nsqd鏈接
 // if err := c.ConnectToNSQD("127.0.0.1:4150"); err != nil {
 //  panic(err)
 // }
}
相關文章
相關標籤/搜索