Go語言操做NoSql

NSQ平臺

NSQ是目前比較流行的一個分佈式的消息隊列,本文主要介紹了NSQ及Go語言如何操做NSQ。html

NSQ

NSQ介紹

NSQ是Go語言編寫的一個開源的實時分佈式內存消息隊列,其性能十分優異。 NSQ的優點有如下優點:node

  1. NSQ提倡分佈式和分散的拓撲,沒有單點故障,支持容錯和高可用性,並提供可靠的消息交付保證
  2. NSQ支持橫向擴展,沒有任何集中式代理。
  3. NSQ易於配置和部署,而且內置了管理界面。

NSQ的應用場景

一般來講,消息隊列都適用如下場景。git

異步處理

參照下圖利用消息隊列把業務流程中的非關鍵流程異步化,從而顯著下降業務請求的響應時間。github

nsq1.png

應用解耦

經過使用消息隊列將不一樣的業務邏輯解耦,下降系統間的耦合,提升系統的健壯性。後續有其餘業務要使用訂單數據可直接訂閱消息隊列,提升系統的靈活性。sql

nsq2.png

流量削峯

相似秒殺(大秒)等場景下,某一時間可能會產生大量的請求,使用消息隊列可以爲後端處理請求提供必定的緩衝區,保證後端服務的穩定性。後端

nsq3.png

安裝

官方下載頁面根據本身的平臺下載並解壓便可。瀏覽器

NSQ組件

nsqd

nsqd是一個守護進程,它接收、排隊並向客戶端發送消息。bash

啓動nsqd,指定-broadcast-address=127.0.0.1來配置廣播地址服務器

./nsqd -broadcast-address=127.0.0.1

若是是在搭配nsqlookupd使用的模式下須要還指定nsqlookupd地址:架構

./nsqd -broadcast-address=127.0.0.1 -lookupd-tcp-address=127.0.0.1:4160

若是是部署了多個nsqlookupd節點的集羣,那還能夠指定多個-lookupd-tcp-address

nsqdq相關配置項以下:

-auth-http-address value
    <addr>:<port> to query auth server (may be given multiple times)
-broadcast-address string
    address that will be registered with lookupd (defaults to the OS hostname) (default "PROSNAKES.local")
-config string
    path to config file
-data-path string
    path to store disk-backed messages
-deflate
    enable deflate feature negotiation (client compression) (default true)
-e2e-processing-latency-percentile value
    message processing time percentiles (as float (0, 1.0]) to track (can be specified multiple times or comma separated '1.0,0.99,0.95', default none)
-e2e-processing-latency-window-time duration
    calculate end to end latency quantiles for this duration of time (ie: 60s would only show quantile calculations from the past 60 seconds) (default 10m0s)
-http-address string
    <addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4151")
-http-client-connect-timeout duration
    timeout for HTTP connect (default 2s)
-http-client-request-timeout duration
    timeout for HTTP request (default 5s)
-https-address string
    <addr>:<port> to listen on for HTTPS clients (default "0.0.0.0:4152")
-log-prefix string
    log message prefix (default "[nsqd] ")
-lookupd-tcp-address value
    lookupd TCP address (may be given multiple times)
-max-body-size int
    maximum size of a single command body (default 5242880)
-max-bytes-per-file int
    number of bytes per diskqueue file before rolling (default 104857600)
-max-deflate-level int
    max deflate compression level a client can negotiate (> values == > nsqd CPU usage) (default 6)
-max-heartbeat-interval duration
    maximum client configurable duration of time between client heartbeats (default 1m0s)
-max-msg-size int
    maximum size of a single message in bytes (default 1048576)
-max-msg-timeout duration
    maximum duration before a message will timeout (default 15m0s)
-max-output-buffer-size int
    maximum client configurable size (in bytes) for a client output buffer (default 65536)
-max-output-buffer-timeout duration
    maximum client configurable duration of time between flushing to a client (default 1s)
-max-rdy-count int
    maximum RDY count for a client (default 2500)
-max-req-timeout duration
    maximum requeuing timeout for a message (default 1h0m0s)
-mem-queue-size int
    number of messages to keep in memory (per topic/channel) (default 10000)
-msg-timeout string
    duration to wait before auto-requeing a message (default "1m0s")
-node-id int
    unique part for message IDs, (int) in range [0,1024) (default is hash of hostname) (default 616)
-snappy
    enable snappy feature negotiation (client compression) (default true)
-statsd-address string
    UDP <addr>:<port> of a statsd daemon for pushing stats
-statsd-interval string
    duration between pushing to statsd (default "1m0s")
-statsd-mem-stats
    toggle sending memory and GC stats to statsd (default true)
-statsd-prefix string
    prefix used for keys sent to statsd (%s for host replacement) (default "nsq.%s")
-sync-every int
    number of messages per diskqueue fsync (default 2500)
-sync-timeout duration
    duration of time per diskqueue fsync (default 2s)
-tcp-address string
    <addr>:<port> to listen on for TCP clients (default "0.0.0.0:4150")
-tls-cert string
    path to certificate file
-tls-client-auth-policy string
    client certificate auth policy ('require' or 'require-verify')
-tls-key string
    path to key file
-tls-min-version value
    minimum SSL/TLS version acceptable ('ssl3.0', 'tls1.0', 'tls1.1', or 'tls1.2') (default 769)
-tls-required
    require TLS for client connections (true, false, tcp-https)
-tls-root-ca-file string
    path to certificate authority file
-verbose
    enable verbose logging
-version
    print version string
-worker-id
    do NOT use this, use --node-id

nsqlookupd

nsqlookupd是維護全部nsqd狀態、提供服務發現的守護進程。它能爲消費者查找特定topic下的nsqd提供了運行時的自動發現服務。 它不維持持久狀態,也不須要與任何其餘nsqlookupd實例協調以知足查詢。所以根據你係統的冗餘要求儘量多地部署nsqlookupd節點。它們小豪的資源不多,能夠與其餘服務共存。咱們的建議是爲每一個數據中心運行至少3個集羣。

nsqlookupd相關配置項以下:

-broadcast-address string
    address of this lookupd node, (default to the OS hostname) (default "PROSNAKES.local")
-config string
    path to config file
-http-address string
    <addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4161")
-inactive-producer-timeout duration
    duration of time a producer will remain in the active list since its last ping (default 5m0s)
-log-prefix string
    log message prefix (default "[nsqlookupd] ")
-tcp-address string
    <addr>:<port> to listen on for TCP clients (default "0.0.0.0:4160")
-tombstone-lifetime duration
    duration of time a producer will remain tombstoned if registration remains (default 45s)
-verbose
    enable verbose logging
-version
    print version string

nsqadmin

一個實時監控集羣狀態、執行各類管理任務的Web管理平臺。 啓動nsqadmin,指定nsqlookupd地址:

./nsqadmin -lookupd-http-address=127.0.0.1:4161

咱們可使用瀏覽器打開http://127.0.0.1:4171/訪問以下管理界面。

undefined

nsqadmin相關的配置項以下:

-allow-config-from-cidr string
    A CIDR from which to allow HTTP requests to the /config endpoint (default "127.0.0.1/8")
-config string
    path to config file
-graphite-url string
    graphite HTTP address
-http-address string
    <addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4171")
-http-client-connect-timeout duration
    timeout for HTTP connect (default 2s)
-http-client-request-timeout duration
    timeout for HTTP request (default 5s)
-http-client-tls-cert string
    path to certificate file for the HTTP client
-http-client-tls-insecure-skip-verify
    configure the HTTP client to skip verification of TLS certificates
-http-client-tls-key string
    path to key file for the HTTP client
-http-client-tls-root-ca-file string
    path to CA file for the HTTP client
-log-prefix string
    log message prefix (default "[nsqadmin] ")
-lookupd-http-address value
    lookupd HTTP address (may be given multiple times)
-notification-http-endpoint string
    HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent
-nsqd-http-address value
    nsqd HTTP address (may be given multiple times)
-proxy-graphite
    proxy HTTP requests to graphite
-statsd-counter-format string
    The counter stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string. (default "stats.counters.%s.count")
-statsd-gauge-format string
    The gauge stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string. (default "stats.gauges.%s")
-statsd-interval duration
    time interval nsqd is configured to push to statsd (must match nsqd) (default 1m0s)
-statsd-prefix string
    prefix used for keys sent to statsd (%s for host replacement, must match nsqd) (default "nsq.%s")
-version
    print version string

NSQ架構

NSQ工做模式

nsq4.png

Topic和Channel

每一個nsqd實例旨在一次處理多個數據流。這些數據流稱爲「topics」,一個topic具備1個或多個「channels」。每一個channel都會收到topic全部消息的副本,實際上下游的服務是經過對應的channel來消費topic消息。

topicchannel不是預先配置的。topic在首次使用時建立,方法是將其發佈到指定topic,或者訂閱指定topic上的channelchannel是經過訂閱指定的channel在第一次使用時建立的。

topicchannel都相互獨立地緩衝數據,防止緩慢的消費者致使其餘chennel的積壓(一樣適用於topic級別)。

channel能夠而且一般會鏈接多個客戶端。假設全部鏈接的客戶端都處於準備接收消息的狀態,則每條消息將被傳遞到隨機客戶端。例如:

undefined

總而言之,消息是從topic -> channel(每一個channel接收該topic的全部消息的副本)多播的,可是從channel -> consumers均勻分佈(每一個消費者接收該channel的一部分消息)。

NSQ接收和發送消息流程

undefined

NSQ特性

  • 消息默認不持久化,能夠配置成持久化模式。nsq採用的方式時內存+硬盤的模式,當內存到達必定程度時就會將數據持久化到硬盤。
    • 若是將--mem-queue-size設置爲0,全部的消息將會存儲到磁盤。
    • 服務器重啓時也會將當時在內存中的消息持久化。
  • 每條消息至少傳遞一次。
  • 消息不保證有序。

Go操做NSQ

官方提供了Go語言版的客戶端:go-nsq,更多客戶端支持請查看CLIENT LIBRARIES

安裝

go get -u github.com/nsqio/go-nsq

生產者

一個簡單的生產者示例代碼以下:

// nsq_producer/main.go
package main

import (
    "bufio"
    "fmt"
    "os"
    "strings"

    "github.com/nsqio/go-nsq"
)

// NSQ Producer Demo

var producer *nsq.Producer

// 初始化生產者
func initProducer(str string) (err error) {
    config := nsq.NewConfig()
    producer, err = nsq.NewProducer(str, config)
    if err != nil {
        fmt.Printf("create producer failed, err:%v\n", err)
        return err
    }
    return nil
}

func main() {
    nsqAddress := "127.0.0.1:4150"
    err := initProducer(nsqAddress)
    if err != nil {
        fmt.Printf("init producer failed, err:%v\n", err)
        return
    }

    reader := bufio.NewReader(os.Stdin) // 從標準輸入讀取
    for {
        data, err := reader.ReadString('\n')
        if err != nil {
            fmt.Printf("read string from stdin failed, err:%v\n", err)
            continue
        }
        data = strings.TrimSpace(data)
        if strings.ToUpper(data) == "Q" { // 輸入Q退出
            break
        }
        // 向 'topic_demo' publish 數據
        err = producer.Publish("topic_demo", []byte(data))
        if err != nil {
            fmt.Printf("publish msg to nsq failed, err:%v\n", err)
            continue
        }
    }
}

將上面的代碼編譯執行,而後在終端輸入兩條數據123456

$ ./nsq_producer 
123
2018/10/22 18:41:20 INF    1 (127.0.0.1:4150) connecting to nsqd
456

使用瀏覽器打開http://127.0.0.1:4171/能夠查看到相似下面的頁面: 在下面這個頁面能看到當前的topic信息:

undefined

點擊頁面上的topic_demo就能進入一個展現更多詳細信息的頁面,在這個頁面上咱們能夠查看和管理topic,同時可以看到目前在LWZMBP:4151 (127.0.01:4151)這個nsqd上有2條message。又由於沒有消費者接入因此暫時沒有建立channel

undefined

/nodes這個頁面咱們可以很方便的查看當前接入lookupdnsqd節點。

undefined

這個/counter頁面顯示了處理的消息數量,由於咱們沒有接入消費者,因此處理的消息數量爲0。

undefined

/lookup界面支持建立topicchannel

undefined

消費者

一個簡單的消費者示例代碼以下:

// nsq_consumer/main.go
package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/nsqio/go-nsq"
)

// NSQ Consumer Demo

// MyHandler 是一個消費者類型
type MyHandler struct {
    Title string
}

// HandleMessage 是須要實現的處理消息的方法
func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
    fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body))
    return
}

// 初始化消費者
func initConsumer(topic string, channel string, address string) (err error) {
    config := nsq.NewConfig()
    config.LookupdPollInterval = 15 * time.Second
    c, err := nsq.NewConsumer(topic, channel, config)
    if err != nil {
        fmt.Printf("create consumer failed, err:%v\n", err)
        return
    }
    consumer := &MyHandler{
        Title: "沙河1號",
    }
    c.AddHandler(consumer)

    // if err := c.ConnectToNSQD(address); err != nil { // 直接連NSQD
    if err := c.ConnectToNSQLookupd(address); err != nil { // 經過lookupd查詢
        return err
    }
    return nil

}

func main() {
    err := initConsumer("topic_demo", "first", "127.0.0.1:4161")
    if err != nil {
        fmt.Printf("init consumer failed, err:%v\n", err)
        return
    }
    c := make(chan os.Signal)        // 定義一個信號的通道
    signal.Notify(c, syscall.SIGINT) // 轉發鍵盤中斷信號到c
    <-c                              // 阻塞
}

將上面的代碼保存以後編譯執行,就可以獲取以前咱們publish的兩條消息了:

$ ./nsq_consumer 
2018/10/22 18:49:06 INF    1 [topic_demo/first] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=topic_demo
2018/10/22 18:49:06 INF    1 [topic_demo/first] (127.0.0.1:4150) connecting to nsqd
沙河1號 recv from 127.0.0.1:4150, msg:123
沙河1號 recv from 127.0.0.1:4150, msg:456

同時在nsqadmin的/counter頁面查看處處理的數據數量爲2。

undefined

關於go-nsq的更多內容請閱讀go-nsq的官方文檔

相關文章
相關標籤/搜索