golang-nsq系列(一)--初識

nsq 最初是由 bitly 公司開源出來的一款簡單易用的分佈式消息中間件,它可用於大規模系統中的實時消息服務,而且天天可以處理數億級別的消息。html

它具備如下特性:git

分佈式。它提供了分佈式的、去中心化且沒有單點故障的拓撲結構,穩定的消息傳輸發佈保障,可以具備高容錯和高可用特性。github

易於擴展。它支持水平擴展,沒有中心化的消息代理(Broker),內置的發現服務讓集羣中增長節點很是容易。sql

運維方便。它很是容易配置和部署,靈活性高。docker

高度集成。如今已經有官方的GolangPythonJavaScript客戶端,社區也有了其餘各個語言的客戶端庫方便接入,自定義客戶端也很是容易。bash

1. 首先到官方文檔看用法:

nsq.io/overview/qu…負載均衡

下載對應的二進制可執行文件,在本地按照上述步驟就能夠跑起來了,看下nsqadmin 後臺展現以下: 運維


2. docker 環境搭建 nsq

參考官方提供資料建立:docker-compose.ymltcp

version: '2' # 高版本支持3
services:
  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    ports:
      - "4160:4160" # tcp
      - "4161:4161" # http

  nsqd:
    image: nsqio/nsq
    # 廣播地址不填的話默認就是oshostname(或虛擬機名稱),那樣 lookupd 會鏈接不上,因此直接寫IP
    command: /nsqd --broadcast-address=10.236.92.208 --lookupd-tcp-address=nsqlookupd:4160
    depends_on:
      - nsqlookupd
    ports:
      - "4150:4150" # tcp
      - "4151:4151" # http

  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
    depends_on:
      - nsqlookupd
    ports:
      - "4171:4171" # http
複製代碼

執行 docker-compose up -d 生成對應的三個容器:分佈式

nsqgo_nsqd_1
nsqgo_nsqlookupd_1
nsqgo_nsqadmin_1


3. Golang 使用 nsq

建立生產者:producer.go

package main

import (
  "fmt"
  "log"
  "time"

  "github.com/nsqio/go-nsq"
)

func main() {
  config := nsq.NewConfig()
  p, err := nsq.NewProducer("127.0.0.1:4150", config)

  if err != nil {
    log.Panic(err)
  }

  for i := 0; i < 1000; i++ {
    msg := fmt.Sprintf("num-%d", i)
    log.Println("Pub:" + msg)
    err = p.Publish("testTopic", []byte(msg))
    if err != nil {
      log.Panic(err)
    }
    time.Sleep(time.Second * 1)
  }

  p.Stop()
}
複製代碼

循環寫 1000 個 num-1--1000,經過 p.Publish 發送到消息隊列中,等待消費。

建立消費者:consumer.go

package main

import (
  "log"
  "sync"

  "github.com/nsqio/go-nsq"
)

func main() {
  wg := &sync.WaitGroup{}
  wg.Add(1000)

  config := nsq.NewConfig()
  c, _ := nsq.NewConsumer("testTopic", "ch", config)
  c.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
    log.Printf("Got a message: %s", message.Body)
    wg.Done()
    return nil
  }))

  // 1.直連nsqd
  // err := c.ConnectToNSQD("127.0.0.1:4150")

  // 2.經過 nsqlookupd 服務發現
  err := c.ConnectToNSQLookupd("127.0.0.1:4161")
  if err != nil {
    log.Panic(err)
  }
  wg.Wait()
}
複製代碼

可經過兩種方式與 nsqd 鏈接:

1). 直連 nsqd,適用於單機(standalone)版;

2). 經過 nsqlookupd 服務發現,適用於集羣(cluster)版;

輸出結果:

go run producer.go

2019/08/18 20:29:51 Pub:num-0
2019/08/18 20:29:51 INF    1 (127.0.0.1:4150) connecting to nsqd
2019/08/18 20:29:52 Pub:num-1
2019/08/18 20:29:53 Pub:num-2
2019/08/18 20:29:54 Pub:num-3
2019/08/18 20:29:55 Pub:num-4
2019/08/18 20:29:56 Pub:num-5
2019/08/18 20:29:57 Pub:num-6
2019/08/18 20:29:58 Pub:num-7
2019/08/18 20:29:59 Pub:num-8
2019/08/18 20:30:00 Pub:num-9
2019/08/18 20:30:01 Pub:num-10
複製代碼

go run consumer.go

2019/08/18 20:30:08 INF    1 [testTopic/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=testTopic
2019/08/18 20:30:08 INF    1 [testTopic/ch] (10.236.92.208:4150) connecting to nsqd
2019/08/18 20:30:08 Got a message: num-0
2019/08/18 20:30:08 Got a message: num-1
2019/08/18 20:30:08 Got a message: num-2
2019/08/18 20:30:08 Got a message: num-3
2019/08/18 20:30:08 Got a message: num-4
2019/08/18 20:30:08 Got a message: num-5
2019/08/18 20:30:08 Got a message: num-6
2019/08/18 20:30:08 Got a message: num-7
2019/08/18 20:30:08 Got a message: num-8
2019/08/18 20:30:08 Got a message: num-9
2019/08/18 20:30:08 Got a message: num-10
複製代碼

github 源碼下載:

github.com/astraw99/ns…

【小結】

a. 單個nsqd能夠有多個topic,每一個topic能夠有多個channel。channel接收這個topic全部消息的副本,從而實現多播分發,而channel上的每一個消息被均勻的分發給它的訂閱者,從而實現負載均衡;

b. nsq 專門爲分佈式、集羣化而生,在處理 SPOF(single point of failure, 單點故障)、高可用、最終一致性方面頗有優點。

相關文章
相關標籤/搜索