tips:若是本文對你有用,請愛心點個贊,提升排名,讓這篇文章幫助更多的人。謝謝你們!比心❤~
若是解決不了,能夠在文末加我微信,進羣交流。html
NSQ 是實時的分佈式消息處理平臺,其設計的目的是用來大規模地處理天天數以十億計級別的消息。linux
NSQ 具備分佈式和去中心化拓撲結構,該結構具備無單點故障、故障容錯、高可用性以及可以保證消息的可靠傳遞的特徵。git
NSQ 很是容易配置和部署,且具備最大的靈活性,支持衆多消息協議。另外,官方還提供了拆箱即用 Go 和 Python 庫。若是讀者有興趣構建本身的客戶端的話,還能夠參考官方提供的協議規範。github
網上有人翻譯了國外的一篇文章:咱們是如何使用NSQ處理7500億消息的golang
官網文檔:https://nsq.io/overview/quick_start.html
中文文檔:http://wiki.jikexueyuan.com/project/nsq-guide/web
我是在ubuntu系統中按照官方操做進行部署測試。sql
安裝nsq啓動服務
在https://nsq.io/deployment/installing.html選擇對應的版本,並解壓。ubuntu
$ tar -zxvf nsq-1.2.0.linux-amd64.go1.12.9.tar.gz $ cd nsq-1.2.0.linux-amd64.go1.12.9/bin $ sudo cp ~/Downloads/nsq-1.2.0.linux-amd64.go1.12.9/bin/ -r /usr/local/nsq/bin $ sudo vim /etc/profile $ source
後臺啓動三個服務vim
$ ./nsqlookupd > /dev/null 2>&1 & [1] 20076 $ ./nsqd --lookupd-tcp-address=127.0.0.1:4160 > /dev/null 2>&1 & [2] 20420 $ ./nsqadmin --lookupd-http-address=127.0.0.1:4161 > /dev/null 2>&1 & [3] 20620
> -lookupd-tcp-address
爲上面nsqlookupd的IP和tcp的端口4160
> -lookupd-http-address
是http的端口也就是4161由於admin經過http請求來查詢相關信息bash
基本概念
nsqd:基本的節點
nsqlookupd:彙總節點信息,提供查詢和管理topic等服務
nsqadmin:管理端展現UI界面,能有一個web頁面去查看和操做
簡單使用
curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'
會建立一個test主題,併發送一個hello world消息http://127.0.0.1:4171/
進行訪問能夠看到NSQ的管理界面,很是的簡潔,其中127.0.0.1
爲服務器IP./nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
消費test中剛纔的消息,並輸出到服務器/tmp目錄中默認一開始消息不是持久化的
nsq採用的方式時內存+硬盤的模式,當內存到達必定程度時就會將數據持久化到硬盤
--mem-queue-size
設置爲 0
,全部的消息將會存儲到磁盤。優勢:
缺點:
官方提供了不少語言接入的客戶端 https://nsq.io/clients/client_libraries.html
針對消息生產者的客戶端,官方還推薦直接使用post請求發送消息,如:
curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'
表示向test主題發送hello world這個消息
deb安裝
$ curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh
依賴包下載:
$ go get github.com/nsqio/go-nsq
生產者:
package main // 生產者 import ( "fmt" "github.com/nsqio/go-nsq" ) var tcpNsqdAddr = "127.0.0.1:4150" func main() { // 初始化配置 config := nsq.NewConfig() for i := 0; i < 100; i++ { // 建立100個生產者 tPro, err := nsq.NewProducer(tcpNsqdAddr, config) if err != nil { fmt.Printf("tPro new failed:%s", err) } // 主題 topic := "Insert" // 主題內容 tCommand := "New data!" // 發佈消息 err = tPro.Publish(topic, []byte(tCommand)) if err != nil { fmt.Printf("Publish failed:%s", err) } } }
其中
127.0.0.1:4150
爲發送消息的地址,消費者裏面寫的也是相同的地址就能夠了。
消費者:
package main // 消費者 import ( "fmt" "sync" "time" "github.com/nsqio/go-nsq" ) var tcpNsqdAddr = "127.0.0.1:4150" type NsqHandler struct { // 消息數 msqCount int // 標識id nsqHandlerID string } func main() { // 初始化配置 config := nsq.NewConfig() // 創造消費者,參數一是訂閱的主題,參數二是使用的通道 com, err := nsq.NewConsumer("Insert", "channel1", config) if err != nil { fmt.Println(err) } // 添加處理回調 com.AddHandler(&NsqHandler{nsqHandlerID: "One"}) // 鏈接對應的nsqd err = com.ConnectToNSQD(tcpNsqdAddr) if err != nil { fmt.Println(err) } // 只是爲了避免結束進程,這裏沒有意義 var wg = &sync.WaitGroup{} wg.Add(1) wg.Wait() } // HandleMessage 實現HandleMessage方法 // message是接收到的消息 func (s *NsqHandler) HandleMessage(message *nsq.Message) error { // 每接收到一條消息]+1 s.msqCount ++ // 打印輸出信息和ID fmt.Println(s.msqCount,s.nsqHandlerID) // 打印消息的一些基本信息 fmt.Printf("msg.Timestamp=]%v,msg.nsqaddress=%s,msg.body=]%s", time.Unix(0,message.Timestamp).Format("2006-01-02 03:04:05"),message.NSQDAddress,string(message.Body)) return nil }