Golang入門教程(十七)Linux/Windows下快速搭建和配置NSQ

 前言php

NSQ是一個基於Go語言的分佈式實時消息平臺,它基於MIT開源協議發佈,代碼託管在GitHub,其當前最新版本是0.3.1版。NSQ可用於大規模系統中的實時消息服務,而且天天可以處理數億級別的消息,其設計目標是爲在分佈式環境下運行的去中心化服務提供一個強大的基礎架構。NSQ具備分佈式、去中心化的拓撲結構,該結構具備無單點故障、故障容錯、高可用性以及可以保證消息的可靠傳遞的特徵。NSQ很是容易配置和部署,且具備最大的靈活性,支持衆多消息協議。另外,官方還提供了拆箱即用Go和Python庫。若是讀者興趣構建本身的客戶端的話,還能夠參考官方提供的協議規範html

NSQ是由四個重要組件構成:nginx

  • nsqd:一個負責接收、排隊、轉發消息到客戶端的守護進程
  • nsqlookupd:管理拓撲信息並提供最終一致性的發現服務的守護進程
  • nsqadmin:一套Web用戶界面,可實時查看集羣的統計數據和執行各類各樣的管理任務
  • utilities:常見基礎功能、數據流處理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq

NSQ的主要特色以下:git

  • 具備分佈式且無單點故障的拓撲結構 支持水平擴展,在無中斷狀況下可以無縫地添加集羣節點
  • 低延遲的消息推送,參見官方提供的性能說明文檔
  • 具備組合式的負載均衡和多播形式的消息路由
  • 既擅長處理面向流(高吞吐量)的工做負載,也擅長處理面向Job的(低吞吐量)工做負載
  • 消息數據既能夠存儲於內存中,也能夠存儲在磁盤中
  • 實現了生產者、消費者自動發現和消費者自動鏈接生產者,參見nsqlookupd
  • 支持安全傳輸層協議(TLS),從而確保了消息傳遞的安全性
  • 具備與數據格式無關的消息結構,支持JSON、Protocol Buffers、MsgPacek等消息格式
  • 很是易於部署(幾乎沒有依賴)和配置(全部參數均可以經過命令行進行配置)
  • 使用了簡單的TCP協議且具備多種語言的客戶端功能庫
  • 具備用於信息統計、管理員操做和實現生產者等的HTTP接口
  • 爲實時檢測集成了統計數據收集器StatsD
  • 具備強大的集羣管理界面,參見nsqadmin

爲了達到高效的分佈式消息服務,NSQ實現了合理、智能的權衡,從而使得其可以徹底適用於生產環境中,具體內容以下:github

  • 支持消息內存隊列的大小設置,默認徹底持久化(值爲0),消息便可持久到磁盤也能夠保存在內存中
  • 保證消息至少傳遞一次,以確保消息能夠最終成功發送
  • 收到的消息是無序的, 實現了鬆散訂購
  • 發現服務nsqlookupd具備最終一致性,消息最終可以找到全部Topic生產者

官方文檔:https://nsq.io/overview/quick_start.htmlsql

1、Windows 環境靜態方式安裝docker

一、下載安裝 nsqshell

下載最新版本:https://github.com/nsqio/nsq/releases/download/v1.0.0-compat/nsq-1.0.0-compat.windows-amd64.go1.8.tar.gzubuntu

直接解壓(重命名)放在本身的磁盤位置(D盤)windows

二、配置環境變量path

三、新建CMD命令行

運行命令:nsqlookupd

四、再新建一個CMD命令行

運行命令:nsqd --lookupd-tcp-address=127.0.0.1:4160

五、再新建一個CMD命令行

運行命令:nsqadmin --lookupd-http-address=127.0.0.1:4161

六、再新建一個CMD命令行(windows上用git bash)發佈一個帶有初始化信息的topic,

運行命令:curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'

七、再新建一個CMD命令行,開始 nsq_to_file(消費者開始消費這些消息)

運行命令:nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161

注意以上信息保存在你本地C盤下

九、發送更多的信息

查看磁盤消費信息內容

十、打開admin後臺,能夠查看更多信息

地址欄運行:http://127.0.0.1:4171/

使用go編寫一個客戶端

一、安裝官網客戶端連接,這裏以go 庫爲案例

(1)go-nsq擴展庫地址:https://github.com/nsqio/go-nsq

(2)使用go get方式安裝:go get -u github.com/nsqio/go-nsq

二、開始編寫代碼

(1)訂閱:go_client_sub.go

package main

import (
 "fmt"
 "sync"
 "github.com/nsqio/go-nsq"
)
type NSQHandler struct {
}

func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
    fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
    return nil
}

func testNSQ() {
    waiter := sync.WaitGroup{}
    waiter.Add(1)

    go func() {
        defer waiter.Done()
        config:=nsq.NewConfig()
        config.MaxInFlight=9

    //創建多個鏈接
    for i := 0; i<10; i++ {
        consumer, err := nsq.NewConsumer("test", "nsq_to_Tinywan", config)
        if nil != err {
            fmt.Println("err", err)
            return
        }

        consumer.AddHandler(&NSQHandler{})
        err = consumer.ConnectToNSQD("127.0.0.1:4150")
        if nil != err {
            fmt.Println("err", err)
            return
        }
    }
        select{}

    }()

    waiter.Wait()
}
func main() {
        testNSQ();
}

(2)發佈:go_client_pub.go

package main

import (
        "github.com/nsqio/go-nsq"
       )

var producer *nsq.Producer

func main() {
    nsqd := "127.0.0.1:4150"
    producer, err := nsq.NewProducer(nsqd, nsq.NewConfig())
    producer.Publish("test", []byte("Hello Tinywan 0002"))
    if err != nil {
        panic(err)
    }
}

(3)測試結果

三、查看nsqadmin 後臺監控

代碼訂閱修改頻道(channel)信息:consumer, err := nsq.NewConsumer("test", "nsq_to_Tinywan", config)

 固然咱們發送的消息同時發送給文件了,打開文件內容以下

打開的全部測試窗口以下所示

 2、Linux 下使用docker 搭建環境

一、官方docker文檔:https://nsq.io/deployment/docker.html

二、請提早在你的虛擬主機安裝好 docker 服務

三、使用 docker pull ,從鏡像倉庫中拉取或者更新指定鏡像

$ docker pull nsqio/nsq
Using default tag: latest
latest: Pulling from nsqio/nsq
709515475419: Pull complete 
efd1c5a69d15: Pull complete 
fa61d00bb52d: Pull complete 
Digest: sha256:fad1937a88fec5b66fb9f4837b72ad3b70012692826aed5c6435f93c5a23b690
Status: Downloaded newer image for nsqio/nsq:latest

四、查看鏡像

~$ docker images
REPOSITORY          TAG                 IMAGE ID            CREATED             SIZE
ubuntu              17.10               14107f6d2c97        3 weeks ago         99.1MB
nginx               latest              b175e7467d66        3 weeks ago         109MB
hello-world         latest              f2a91732366c        5 months ago        1.85kB
nsqio/nsq latest 2714222e1b39 13 months ago 55.8MB 

五、docker 運行 nsqlookupd

$ docker run --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
[nsqlookupd] 2018/05/06 05:37:27.120708 nsqlookupd v1.0.0-compat (built w/go1.8)
[nsqlookupd] 2018/05/06 05:37:27.121254 TCP: listening on [::]:4160
[nsqlookupd] 2018/05/06 05:37:27.121291 HTTP: listening on [::]:4161

PS:若是官方的是lookupd 會致使 nsqadmin 鏈接失敗,修改爲了:nsqlookupd 

六、docker 運行 nsqd

$ docker run --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=139.224.239.21 --lookupd-tcp-address=139.224.239.21:4160
[nsqd] 2018/05/06 05:32:10.163609 nsqd v1.0.0-compat (built w/go1.8)
[nsqd] 2018/05/06 05:32:10.163642 ID: 302
[nsqd] 2018/05/06 05:32:10.163677 NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2018/05/06 05:32:10.167041 TCP: listening on [::]:4150
[nsqd] 2018/05/06 05:32:10.167078 HTTP: listening on [::]:4151  

 說明:上面的http和tcp鏈接地址我都是寫的公網IP地址,端口號默認不變

七、docker 運行 nsqadmin

$ docker run --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin --lookupd-http-address=139.224.239.21:4161
[nsqadmin] 2018/05/06 05:38:41.011751 nsqadmin v1.0.0-compat (built w/go1.8)
[nsqadmin] 2018/05/06 05:38:41.011901 HTTP: listening on [::]:4171 

 使用 docker ps 命令列出已經運行中的容器

$ docker ps
CONTAINER ID        IMAGE               COMMAND                  CREATED               PORTS                                                 NAMES
6534ccee1e6a        nsqio/nsq           "/nsqlookupd"            2 seconds ago         4150-4151/tcp, 4170-4171/tcp, 0.0.0.0:4160-4161->4160-4161/tcp   lookupd
1e642cf63071        nsqio/nsq           "/nsqadmin --lookupd…"   About a minute ago    4150-4151/tcp, 4160-4161/tcp, 4170/tcp, 0.0.0.0:4171->4171/tcp   nsqadmin
9d5e0678077a        nsqio/nsq           "/nsqd --broadcast-a…"   About a minute ago    4160-4161/tcp, 0.0.0.0:4150-4151->4150-4151/tcp, 4170-4171/tcp   nsqd

八、使用Nginx 作一個域名代理,代理nsqadmin

server {
    server_name  nsqadmin.tinywan.com;

    location / {
        proxy_pass http://139.224.239.21:4171;

        #Proxy Settings
        proxy_set_header   Host             $host;
        proxy_set_header   X-Real-IP        $remote_addr;
        proxy_set_header   X-Forwarded-For  $proxy_add_x_forwarded_for;
    }
}

 PS:這一步爲非必須,只是爲了好看而已,若是域名的話,配置個仍是很好的

九、瀏覽器訪問nsqadmin,如下表示配置ok

十、經過不一樣的方式生產一個新消息

(1)在Linux服務端經過curl 生產一個新消息

curl -d 'Hello Linux localhost Msg' 'http://127.0.0.1:4151/pub?topic=test'

(2)在windows本地環境,在git環境中經過curl 生產一個新消息

$ curl -d "hello windows MSG" "http://nsqadmin.tinywan.com:4151/pub?topic=test"
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    19  100     2  100    17     64    548 --:--:-- --:--:-- --:--:--   548OK

(3)經過go語言腳本發送一條信息

package main

import (
        "github.com/nsqio/go-nsq"
       )

var producer *nsq.Producer

func main() {
    nsqd := "139.224.239.21:4150"
    producer, err := nsq.NewProducer(nsqd, nsq.NewConfig())
    producer.Publish("test", []byte("Hello Go Client Msg"))
    if err != nil {
        panic(err)
    }
}

十一、開始消費消息

  因爲這裏docker 消費消息須要掛在一個虛擬磁盤,而我沒有,因此換是直接經過上面演示的go代碼進行測試(消費消息)。

(1)經過以上的 go_client_sub.go 腳本去跑這個消費信息,注意修改host 爲當前nsq服務器的IP,而不是本地

 err = consumer.ConnectToNSQD("139.224.239.21:4150")

(2)修改要訂閱的頻道

consumer, err := nsq.NewConsumer("test", "nsq_to_Linux", config)

(3)開始運行腳本

D:\Tinywan\go-socket>go run go_client_sub.go
2018/05/06 14:12:57 INF    1 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF    2 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF    3 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF    4 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF    5 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF    6 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF    7 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF    8 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF    9 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF   10 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
receive 139.224.239.21:4150 message: Hello Linux localhost Msg
receive 139.224.239.21:4150 message: Hello windows MSG
receive 139.224.239.21:4150 message: Hello Go Client Msg 

 以上表示消費的不一樣的消息

十二、在看看監控頁面

  PS:以上能夠很清楚的看到接收到3條信息了 

1三、使用docker 須要注意

(1)docker 沒有在後臺進程跑的時候(沒有加參數 -d),也就是測試的時候,如:docker run --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd

(2)若是使用 ctrl + c 退出後,繼續啓動提示錯誤:

docker run --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
docker: Error response from daemon: Conflict. The container name "/lookupd" is already in use by container "35bb6a3e51d367db846d376580d5b8e389d419e48e2f3c330c4e0dd093a218c3". You have to remove (or rename) that container to be able to reuse that name.
See 'docker run --help'.

PS:開啓一個容器而後先中止、刪除後才能夠繼續重啓,請執行如下命令後從新運行命令

(3)中止:docker stop lookupd
(4)移除:docker rm lookupd

3、基於ThinkPHP5.0 的一個demo

<?php
namespace app\index\controller;

use think\Controller;
class NsqController extends Controller
{
    public function index()
    {
        ini_set('memory_limit', '8M');
        $nsqdAddr = [
            "127.0.0.1:4151",
            "127.0.0.1:4150"
        ];

        $nsq = new \Nsq();
        $isTrue = $nsq->connectNsqd($nsqdAddr);

        for ($i = 0; $i < 6; $i++) {
            $nsq->publish("test", "Hi Tinywan");
        }
        $nsq->closeNsqdConnection();

        halt($isTrue);
        // Deferred publish
        //function : deferredPublish(string topic,string message, int millisecond);
        //millisecond default : [0 < millisecond < 3600000]

        $deferred = new \Nsq();
        $isTrue = $deferred->connectNsqd($nsqdAddr);
        for ($i = 0; $i < 20; $i++) {
            $deferred->deferredPublish("test", "message daly", 3000);
        }
        $deferred->closeNsqdConnection();
    }

    public function nsqSubMessage()
    {
        $nsq_lookupd = new \NsqLookupd("127.0.0.1:4161"); //the nsqlookupd http addr
        $nsq = new \Nsq();
        $config = array(
            "topic" => "test",
            "channel" => "struggle",
            "rdy" => 2,                //optional , default 1
            "connect_num" => 1,        //optional , default 1
            "retry_delay_time" => 5000,  //optional, default 0 , if run callback failed, after 5000 msec, message will be retried
            "auto_finish" => true, //default true
        );
        $nsq->subscribe($nsq_lookupd, $config, function ($msg, $bev) {
            echo $msg->payload . "\n";
            echo $msg->attempts . "\n";
            echo $msg->messageId . "\n";
            echo $msg->timestamp . "\n";
        });
    }
}

使用命令行模式,在console 顯示信息

php think pay nsq
Connect succeed
nihao
1
09f0f1ae3bef9002
1530525939845171789
Hi Tinywan
1
09f0f1c2262f9000
1530525961228600358
Hi Tinywan
1
09f0f1c2266f9001
1530525961229073539
Hi Tinywan
1
09f0f1ec952f9000
1530526006791156404
Hi Tinywan
1
09f0f1ec956f9001
1530526006792159596
Hi Tinywan
1
09f0f1ec95af9000
1530526006792847311

 4、集羣搭建

一、主服務器IP地址(公網):59.110.213.20

(1)拉取NSQ鏡像

docker pull nsqio/nsq

(2)查看nsq鏡像

docker images 

(3)啓動nsqlookupd服務

docker run -d --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd

(4)開啓nsqadmin管理系統

docker run -d  --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin --lookupd-http-address=59.110.213.20:4161

nsqadmin能夠部署在任何一個安裝有nsq服務的機器上,只須要指定惟一的 --lookupd-http-address 服務IP地址  

(5)開啓一個nsqd節點服務

docker run -d --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=59.110.213.20 --lookupd-tcp-address=59.110.213.20:4160
  • -d 表示守護進程
  • --broadcast-address:當前服務器IP地址,
  • --lookupd-tcp-address:指向的lookupd服務器IP地址  

二、從服務器IP地址(公網):47.99.94.49

開啓一個nsqd節點服務

docker run -d --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=47.99.94.49 --lookupd-tcp-address=59.110.213.20:4160
  • -d 表示守護進程
  • --broadcast-address:當前服務器IP地址,
  • --lookupd-tcp-address:指向的lookupd服務器IP地址  

 三、測試主從

打開admin 管理平臺

可看出已經部署的兩個nsqd 節點以及廣播地址

打開任意一個shell終端(本測試爲Windows 系統的bash )

tinyw@DESKTOP-HH02AC6 MINGW64 ~/Desktop
$ curl -d 'Hello Nsq 20-2' 'http://59.110.213.20:4151/pub?topic=Tinywan_20'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    16  100     2  100    14     32    225 --:--:-- --:--:-- --:--:--   258OK

 從

tinyw@DESKTOP-HH02AC6 MINGW64 ~/Desktop
$ curl -d 'Hello Nsq 49-2' 'http://47.99.94.49:4151/pub?topic=Tinywan_49'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    16  100     2  100    14    125    875 --:--:-- --:--:-- --:--:--  1000OK

 admin 測試結果

 注意點:消費消息是根據節點Id去消費,單獨分開的,在20上訂閱的,在49上是無法消費的

相關文章
相關標籤/搜索