使用NSQ(附Golang代碼)

上篇文章已經瞭解了消息中間件相關的知識,這篇文章學習一下Golang語言編寫的知名消息中間件NSQhtml

nsq最初是由bitly公司開源出來的一款簡單易用的消息中間件,它可用於大規模系統中的實時消息服務,而且天天可以處理數億級別的消息。它有如下特性:git

  1. 分佈式。它提供了分佈式的、去中心化且沒有單點故障的拓撲結構,穩定的消息傳輸發佈保障,可以具備高容錯和高可用特性。
  2. 易於擴展。它支持水平擴展,沒有中心化的消息代理(Broker),內置的發現服務讓集羣中增長節點很是容易。
  3. 運維方便。它很是容易配置和部署,靈活性高。
  4. 高度集成。如今已經有官方的Golang、Python和JavaScript客戶端,社區也有了其餘各個語言的客戶端庫方便接入,自定義客戶端也很是容易。

如今開始體驗它~github

安裝

首先安裝它,我在Mac上用Homebrew安裝:golang

❯ brew install nsq
複製代碼

組件

nsq一共有四種組件sql

nsqlookupd

nsqlookupd是負責管理拓撲信息並提供最終一致性的發現服務的守護進程(daemon)。在終端1啓動它:瀏覽器

❯ nsqlookupd
[nsqlookupd] 2019/07/18 11:42:16.876296 INFO: nsqlookupd v1.1.0 (built w/go1.11)
[nsqlookupd] 2019/07/18 11:42:16.876864 INFO: HTTP: listening on [::]:4161
[nsqlookupd] 2019/07/18 11:42:16.876868 INFO: TCP: listening on [::]:4160
複製代碼

默認HTTP接口監聽4161,TCP接口監聽4160。bash

nsqd

nsqd是一個負責接收、排隊、投遞消息給客戶端的守護進程。客戶端經過查詢 nsqlookupd 來發現指定話題(topic)的nsqd生產者,nsqd節點會廣播話題(topic)和通道(channel)信息。數據流模型以下:負載均衡

單個nsqd能夠有多個topic,每一個topic能夠有多個channel。channel接收這個topic全部消息的副本,從而實現多播分發,而channel上的每一個消息被分發給它的訂閱者,從而實現負載均衡。運維

在終端2啓動nsqd:curl

❯ nsqd --lookupd-tcp-address=127.0.0.1:4160
...
[nsqd] 2019/07/18 11:47:46.427184 INFO: HTTP: listening on [::]:4151
[nsqd] 2019/07/18 11:47:46.427195 INFO: TCP: listening on [::]:4150
[nsqd] 2019/07/18 11:47:46.427203 INFO: LOOKUP(127.0.0.1:4160): adding peer
[nsqd] 2019/07/18 11:47:46.427355 INFO: LOOKUP connecting to 127.0.0.1:4160
...
複製代碼

nsqd經過tcp端口鏈接到了nsqlookupd,它本身在4151接受HTTP請求,在4150接受TCP請求。

nsqadmin

nsqadmin 是一套WEB管理UI,用來聚集集羣的實時統計,並執行不一樣的管理任務。在終端3啓動它:

❯ nsqadmin --lookupd-http-address=127.0.0.1:4161
[nsqadmin] 2019/07/18 11:54:23.125392 INFO: nsqadmin v1.1.0 (built w/go1.11)
[nsqadmin] 2019/07/18 11:54:23.128755 INFO: HTTP: listening on [::]:4171
複製代碼

瀏覽器打開http://localhost:4171就能訪問了,須要注意,管理UI能夠按需啓動。

功能工具

安裝nsq後會增長nsq_stat/nsq_tail/nsq_to_file等功能工具,這些實用程序以數據流的形式提供了通用功能和內部檢查,稍後能體驗到。

命令行體驗

❯ curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'  # 在終端4執行
OK  # 發佈消息到nsqd,用Rest API完成,看參數表示話題是test。因爲尚未test這個話題,會先建立話題再接收消息。
❯ nsq_tail --lookupd-http-address=127.0.0.1:4161 --topic=test  # 在終端5執行,一會再來看
# 回到終端4
❯ curl -d 'hello world 2' 'http://127.0.0.1:4151/pub?topic=test'
OK
~
❯ curl -d 'hello world 3' 'http://127.0.0.1:4151/pub?topic=test'
OK
~
# 回到終端5
❯ nsq_tail --lookupd-http-address=127.0.0.1:4161 --topic=test
2019/07/18 12:06:24 Adding consumer for topic: test
...
hello world 2
hello world 3
複製代碼

回到終端5,能夠看到接受(消費)了啓動nsq_tail後發佈的2個消息,這就是功能工具的做用,另外如nsq_to_file是把消息發到文件。

nsq的Go客戶端使用

首先安裝go-nsq:

❯ go get github.com/nsqio/go-nsq
複製代碼

先看生產者:

package main

import (
	"github.com/nsqio/go-nsq"
	"log"
	"math/rand"
	"time"
)

func main() {
	config := nsq.NewConfig()
	w, err := nsq.NewProducer("127.0.0.1:4150", config)

	if err != nil {
		log.Panic(err)
	}

	chars := []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ")

	for {
		buf := make([]byte, 4)
		for i := 0; i < 4; i++ {
			buf[i] = chars[rand.Intn(len(chars))]
		}
		log.Printf("Pub: %s", buf)
		err = w.Publish("test", buf)
		if err != nil {
			log.Panic(err)
		}
		time.Sleep(time.Second * 1)
	}

	w.Stop()
}
複製代碼

NewProducer的第一個參數就是nsqd的地址,在這裏作了個無限for循環,每次隨機4個byte發佈到test話題裏面。

接着看消費者代碼:

package main

import (
	"log"
	"sync"

	"github.com/nsqio/go-nsq"
)

func main() {

	wg := &sync.WaitGroup{}
	wg.Add(1000)

	config := nsq.NewConfig()
	q, _ := nsq.NewConsumer("test", "ch", config)
	q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
		log.Printf("Got a message: %s", message.Body)
		wg.Done()
		return nil
	}))
	err := q.ConnectToNSQD("127.0.0.1:4150")
	if err != nil {
		log.Panic(err)
	}
	wg.Wait()

}
複製代碼

一開始經過sync.WaitGroup安排了1000個待執行的等待組,NewConsumer的第一個參數是話題test,第二是通道名字,而後用AddHandler添加一個消費處理函數,在處理函數中會打印這個消息。

如今就能夠體驗了,首先啓動消費者,再啓動發佈者。

❯ go run consumer.go
2019/07/18 15:29:29 INF    1 [test/ch] (127.0.0.1:4150) connecting to nsqd
2019/07/18 15:29:37 Got a message: ZGBA
2019/07/18 15:29:38 Got a message: ICMR
2019/07/18 15:29:39 Got a message: AJWW
2019/07/18 15:29:40 Got a message: HTHC
2019/07/18 15:29:41 Got a message: TCUA
...
❯ go run producer.go
2019/07/18 15:29:36 INF    1 (127.0.0.1:4150) connecting to nsqd
2019/07/18 15:29:37 Pub: ZGBA
2019/07/18 15:29:38 Pub: ICMR
2019/07/18 15:29:39 Pub: AJWW
2019/07/18 15:29:40 Pub: HTHC
2019/07/18 15:29:41 Pub: TCUA
...
複製代碼

代碼地址

原文地址: strconv.com/posts/use-n…

完整代碼能夠在這個地址找到。

延伸閱讀

  1. nsq.io/overview/qu…
相關文章
相關標籤/搜索