NSQ是目前比較流行的一個分佈式的消息隊列,本文主要介紹了NSQ及Go語言如何操做NSQ。html
NSQ是Go語言編寫的一個開源的實時分佈式內存消息隊列,其性能十分優異。 NSQ的優點有如下優點:node
一般來講,消息隊列都適用如下場景。git
參照下圖利用消息隊列把業務流程中的非關鍵流程異步化,從而顯著下降業務請求的響應時間。github
經過使用消息隊列將不一樣的業務邏輯解耦,下降系統間的耦合,提升系統的健壯性。後續有其餘業務要使用訂單數據可直接訂閱消息隊列,提升系統的靈活性。sql
相似秒殺(大秒)等場景下,某一時間可能會產生大量的請求,使用消息隊列可以爲後端處理請求提供必定的緩衝區,保證後端服務的穩定性。後端
官方下載頁面根據本身的平臺下載並解壓便可。瀏覽器
nsqd是一個守護進程,它接收、排隊並向客戶端發送消息。bash
啓動nsqd
,指定-broadcast-address=127.0.0.1
來配置廣播地址服務器
./nsqd -broadcast-address=127.0.0.1
若是是在搭配nsqlookupd
使用的模式下須要還指定nsqlookupd
地址:架構
./nsqd -broadcast-address=127.0.0.1 -lookupd-tcp-address=127.0.0.1:4160
若是是部署了多個nsqlookupd
節點的集羣,那還能夠指定多個-lookupd-tcp-address
。
nsqdq
相關配置項以下:
-auth-http-address value <addr>:<port> to query auth server (may be given multiple times) -broadcast-address string address that will be registered with lookupd (defaults to the OS hostname) (default "PROSNAKES.local") -config string path to config file -data-path string path to store disk-backed messages -deflate enable deflate feature negotiation (client compression) (default true) -e2e-processing-latency-percentile value message processing time percentiles (as float (0, 1.0]) to track (can be specified multiple times or comma separated '1.0,0.99,0.95', default none) -e2e-processing-latency-window-time duration calculate end to end latency quantiles for this duration of time (ie: 60s would only show quantile calculations from the past 60 seconds) (default 10m0s) -http-address string <addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4151") -http-client-connect-timeout duration timeout for HTTP connect (default 2s) -http-client-request-timeout duration timeout for HTTP request (default 5s) -https-address string <addr>:<port> to listen on for HTTPS clients (default "0.0.0.0:4152") -log-prefix string log message prefix (default "[nsqd] ") -lookupd-tcp-address value lookupd TCP address (may be given multiple times) -max-body-size int maximum size of a single command body (default 5242880) -max-bytes-per-file int number of bytes per diskqueue file before rolling (default 104857600) -max-deflate-level int max deflate compression level a client can negotiate (> values == > nsqd CPU usage) (default 6) -max-heartbeat-interval duration maximum client configurable duration of time between client heartbeats (default 1m0s) -max-msg-size int maximum size of a single message in bytes (default 1048576) -max-msg-timeout duration maximum duration before a message will timeout (default 15m0s) -max-output-buffer-size int maximum client configurable size (in bytes) for a client output buffer (default 65536) -max-output-buffer-timeout duration maximum client configurable duration of time between flushing to a client (default 1s) -max-rdy-count int maximum RDY count for a client (default 2500) -max-req-timeout duration maximum requeuing timeout for a message (default 1h0m0s) -mem-queue-size int number of messages to keep in memory (per topic/channel) (default 10000) -msg-timeout string duration to wait before auto-requeing a message (default "1m0s") -node-id int unique part for message IDs, (int) in range [0,1024) (default is hash of hostname) (default 616) -snappy enable snappy feature negotiation (client compression) (default true) -statsd-address string UDP <addr>:<port> of a statsd daemon for pushing stats -statsd-interval string duration between pushing to statsd (default "1m0s") -statsd-mem-stats toggle sending memory and GC stats to statsd (default true) -statsd-prefix string prefix used for keys sent to statsd (%s for host replacement) (default "nsq.%s") -sync-every int number of messages per diskqueue fsync (default 2500) -sync-timeout duration duration of time per diskqueue fsync (default 2s) -tcp-address string <addr>:<port> to listen on for TCP clients (default "0.0.0.0:4150") -tls-cert string path to certificate file -tls-client-auth-policy string client certificate auth policy ('require' or 'require-verify') -tls-key string path to key file -tls-min-version value minimum SSL/TLS version acceptable ('ssl3.0', 'tls1.0', 'tls1.1', or 'tls1.2') (default 769) -tls-required require TLS for client connections (true, false, tcp-https) -tls-root-ca-file string path to certificate authority file -verbose enable verbose logging -version print version string -worker-id do NOT use this, use --node-id
nsqlookupd是維護全部nsqd狀態、提供服務發現的守護進程。它能爲消費者查找特定topic
下的nsqd提供了運行時的自動發現服務。 它不維持持久狀態,也不須要與任何其餘nsqlookupd實例協調以知足查詢。所以根據你係統的冗餘要求儘量多地部署nsqlookupd
節點。它們小豪的資源不多,能夠與其餘服務共存。咱們的建議是爲每一個數據中心運行至少3個集羣。
nsqlookupd
相關配置項以下:
-broadcast-address string address of this lookupd node, (default to the OS hostname) (default "PROSNAKES.local") -config string path to config file -http-address string <addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4161") -inactive-producer-timeout duration duration of time a producer will remain in the active list since its last ping (default 5m0s) -log-prefix string log message prefix (default "[nsqlookupd] ") -tcp-address string <addr>:<port> to listen on for TCP clients (default "0.0.0.0:4160") -tombstone-lifetime duration duration of time a producer will remain tombstoned if registration remains (default 45s) -verbose enable verbose logging -version print version string
一個實時監控集羣狀態、執行各類管理任務的Web管理平臺。 啓動nsqadmin
,指定nsqlookupd
地址:
./nsqadmin -lookupd-http-address=127.0.0.1:4161
咱們可使用瀏覽器打開http://127.0.0.1:4171/
訪問以下管理界面。
nsqadmin
相關的配置項以下:
-allow-config-from-cidr string A CIDR from which to allow HTTP requests to the /config endpoint (default "127.0.0.1/8") -config string path to config file -graphite-url string graphite HTTP address -http-address string <addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4171") -http-client-connect-timeout duration timeout for HTTP connect (default 2s) -http-client-request-timeout duration timeout for HTTP request (default 5s) -http-client-tls-cert string path to certificate file for the HTTP client -http-client-tls-insecure-skip-verify configure the HTTP client to skip verification of TLS certificates -http-client-tls-key string path to key file for the HTTP client -http-client-tls-root-ca-file string path to CA file for the HTTP client -log-prefix string log message prefix (default "[nsqadmin] ") -lookupd-http-address value lookupd HTTP address (may be given multiple times) -notification-http-endpoint string HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent -nsqd-http-address value nsqd HTTP address (may be given multiple times) -proxy-graphite proxy HTTP requests to graphite -statsd-counter-format string The counter stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string. (default "stats.counters.%s.count") -statsd-gauge-format string The gauge stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string. (default "stats.gauges.%s") -statsd-interval duration time interval nsqd is configured to push to statsd (must match nsqd) (default 1m0s) -statsd-prefix string prefix used for keys sent to statsd (%s for host replacement, must match nsqd) (default "nsq.%s") -version print version string
每一個nsqd實例旨在一次處理多個數據流。這些數據流稱爲「topics」
,一個topic
具備1個或多個「channels」
。每一個channel
都會收到topic
全部消息的副本,實際上下游的服務是經過對應的channel
來消費topic
消息。
topic
和channel
不是預先配置的。topic
在首次使用時建立,方法是將其發佈到指定topic
,或者訂閱指定topic
上的channel
。channel
是經過訂閱指定的channel
在第一次使用時建立的。
topic
和channel
都相互獨立地緩衝數據,防止緩慢的消費者致使其餘chennel
的積壓(一樣適用於topic
級別)。
channel
能夠而且一般會鏈接多個客戶端。假設全部鏈接的客戶端都處於準備接收消息的狀態,則每條消息將被傳遞到隨機客戶端。例如:
總而言之,消息是從topic -> channel
(每一個channel接收該topic的全部消息的副本)多播的,可是從channel -> consumers
均勻分佈(每一個消費者接收該channel的一部分消息)。
--mem-queue-size
設置爲0,全部的消息將會存儲到磁盤。官方提供了Go語言版的客戶端:go-nsq,更多客戶端支持請查看CLIENT LIBRARIES。
go get -u github.com/nsqio/go-nsq
一個簡單的生產者示例代碼以下:
// nsq_producer/main.go package main import ( "bufio" "fmt" "os" "strings" "github.com/nsqio/go-nsq" ) // NSQ Producer Demo var producer *nsq.Producer // 初始化生產者 func initProducer(str string) (err error) { config := nsq.NewConfig() producer, err = nsq.NewProducer(str, config) if err != nil { fmt.Printf("create producer failed, err:%v\n", err) return err } return nil } func main() { nsqAddress := "127.0.0.1:4150" err := initProducer(nsqAddress) if err != nil { fmt.Printf("init producer failed, err:%v\n", err) return } reader := bufio.NewReader(os.Stdin) // 從標準輸入讀取 for { data, err := reader.ReadString('\n') if err != nil { fmt.Printf("read string from stdin failed, err:%v\n", err) continue } data = strings.TrimSpace(data) if strings.ToUpper(data) == "Q" { // 輸入Q退出 break } // 向 'topic_demo' publish 數據 err = producer.Publish("topic_demo", []byte(data)) if err != nil { fmt.Printf("publish msg to nsq failed, err:%v\n", err) continue } } }
將上面的代碼編譯執行,而後在終端輸入兩條數據123
和456
:
$ ./nsq_producer 123 2018/10/22 18:41:20 INF 1 (127.0.0.1:4150) connecting to nsqd 456
使用瀏覽器打開http://127.0.0.1:4171/
能夠查看到相似下面的頁面: 在下面這個頁面能看到當前的topic
信息:
點擊頁面上的topic_demo
就能進入一個展現更多詳細信息的頁面,在這個頁面上咱們能夠查看和管理topic
,同時可以看到目前在LWZMBP:4151 (127.0.01:4151)
這個nsqd
上有2條message。又由於沒有消費者接入因此暫時沒有建立channel
。
在/nodes
這個頁面咱們可以很方便的查看當前接入lookupd
的nsqd
節點。
這個/counter
頁面顯示了處理的消息數量,由於咱們沒有接入消費者,因此處理的消息數量爲0。
在/lookup
界面支持建立topic
和channel
。
一個簡單的消費者示例代碼以下:
// nsq_consumer/main.go package main import ( "fmt" "os" "os/signal" "syscall" "time" "github.com/nsqio/go-nsq" ) // NSQ Consumer Demo // MyHandler 是一個消費者類型 type MyHandler struct { Title string } // HandleMessage 是須要實現的處理消息的方法 func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) { fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body)) return } // 初始化消費者 func initConsumer(topic string, channel string, address string) (err error) { config := nsq.NewConfig() config.LookupdPollInterval = 15 * time.Second c, err := nsq.NewConsumer(topic, channel, config) if err != nil { fmt.Printf("create consumer failed, err:%v\n", err) return } consumer := &MyHandler{ Title: "沙河1號", } c.AddHandler(consumer) // if err := c.ConnectToNSQD(address); err != nil { // 直接連NSQD if err := c.ConnectToNSQLookupd(address); err != nil { // 經過lookupd查詢 return err } return nil } func main() { err := initConsumer("topic_demo", "first", "127.0.0.1:4161") if err != nil { fmt.Printf("init consumer failed, err:%v\n", err) return } c := make(chan os.Signal) // 定義一個信號的通道 signal.Notify(c, syscall.SIGINT) // 轉發鍵盤中斷信號到c <-c // 阻塞 }
將上面的代碼保存以後編譯執行,就可以獲取以前咱們publish的兩條消息了:
$ ./nsq_consumer 2018/10/22 18:49:06 INF 1 [topic_demo/first] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=topic_demo 2018/10/22 18:49:06 INF 1 [topic_demo/first] (127.0.0.1:4150) connecting to nsqd 沙河1號 recv from 127.0.0.1:4150, msg:123 沙河1號 recv from 127.0.0.1:4150, msg:456
同時在nsqadmin的/counter
頁面查看處處理的數據數量爲2。
關於go-nsq
的更多內容請閱讀go-nsq的官方文檔。