前言php
NSQ是一個基於Go語言的分佈式實時消息平臺,它基於MIT開源協議發佈,代碼託管在GitHub,其當前最新版本是0.3.1版。NSQ可用於大規模系統中的實時消息服務,而且天天可以處理數億級別的消息,其設計目標是爲在分佈式環境下運行的去中心化服務提供一個強大的基礎架構。NSQ具備分佈式、去中心化的拓撲結構,該結構具備無單點故障、故障容錯、高可用性以及可以保證消息的可靠傳遞的特徵。NSQ很是容易配置和部署,且具備最大的靈活性,支持衆多消息協議。另外,官方還提供了拆箱即用Go和Python庫。若是讀者興趣構建本身的客戶端的話,還能夠參考官方提供的協議規範。html
NSQ是由四個重要組件構成:nginx
NSQ的主要特色以下:git
爲了達到高效的分佈式消息服務,NSQ實現了合理、智能的權衡,從而使得其可以徹底適用於生產環境中,具體內容以下:github
官方文檔:https://nsq.io/overview/quick_start.htmlsql
1、Windows 環境靜態方式安裝docker
一、下載安裝 nsqshell
下載最新版本:https://github.com/nsqio/nsq/releases/download/v1.0.0-compat/nsq-1.0.0-compat.windows-amd64.go1.8.tar.gzubuntu
直接解壓(重命名)放在本身的磁盤位置(D盤)windows
二、配置環境變量path
三、新建CMD命令行
運行命令:nsqlookupd
四、再新建一個CMD命令行
運行命令:nsqd --lookupd-tcp-address=127.0.0.1:4160
五、再新建一個CMD命令行
運行命令:nsqadmin --lookupd-http-address=127.0.0.1:4161
六、再新建一個CMD命令行(windows上用git bash)發佈一個帶有初始化信息的topic,
運行命令:curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
七、再新建一個CMD命令行,開始 nsq_to_file(消費者開始消費這些消息)
運行命令:nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
注意以上信息保存在你本地C盤下
九、發送更多的信息
查看磁盤消費信息內容
十、打開admin後臺,能夠查看更多信息
地址欄運行:http://127.0.0.1:4171/
使用go編寫一個客戶端
一、安裝官網客戶端連接,這裏以go 庫爲案例
(1)go-nsq擴展庫地址:https://github.com/nsqio/go-nsq
(2)使用go get方式安裝:go get -u github.com/nsqio/go-nsq
二、開始編寫代碼
(1)訂閱:go_client_sub.go
package main
import (
"fmt"
"sync"
"github.com/nsqio/go-nsq"
)
type NSQHandler struct {
}
func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
return nil
}
func testNSQ() {
waiter := sync.WaitGroup{}
waiter.Add(1)
go func() {
defer waiter.Done()
config:=nsq.NewConfig()
config.MaxInFlight=9
//創建多個鏈接
for i := 0; i<10; i++ {
consumer, err := nsq.NewConsumer("test", "nsq_to_Tinywan", config)
if nil != err {
fmt.Println("err", err)
return
}
consumer.AddHandler(&NSQHandler{})
err = consumer.ConnectToNSQD("127.0.0.1:4150")
if nil != err {
fmt.Println("err", err)
return
}
}
select{}
}()
waiter.Wait()
}
func main() {
testNSQ();
}
(2)發佈:go_client_pub.go
package main
import (
"github.com/nsqio/go-nsq"
)
var producer *nsq.Producer
func main() {
nsqd := "127.0.0.1:4150"
producer, err := nsq.NewProducer(nsqd, nsq.NewConfig())
producer.Publish("test", []byte("Hello Tinywan 0002"))
if err != nil {
panic(err)
}
}
(3)測試結果
三、查看nsqadmin 後臺監控
代碼訂閱修改頻道(channel)信息:consumer, err := nsq.NewConsumer("test", "nsq_to_Tinywan", config)
固然咱們發送的消息同時發送給文件了,打開文件內容以下
打開的全部測試窗口以下所示
2、Linux 下使用docker 搭建環境
一、官方docker文檔:https://nsq.io/deployment/docker.html
二、請提早在你的虛擬主機安裝好 docker 服務
三、使用 docker pull ,從鏡像倉庫中拉取或者更新指定鏡像
$ docker pull nsqio/nsq
Using default tag: latest
latest: Pulling from nsqio/nsq
709515475419: Pull complete
efd1c5a69d15: Pull complete
fa61d00bb52d: Pull complete
Digest: sha256:fad1937a88fec5b66fb9f4837b72ad3b70012692826aed5c6435f93c5a23b690
Status: Downloaded newer image for nsqio/nsq:latest
四、查看鏡像
~$ docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
ubuntu 17.10 14107f6d2c97 3 weeks ago 99.1MB
nginx latest b175e7467d66 3 weeks ago 109MB
hello-world latest f2a91732366c 5 months ago 1.85kB
nsqio/nsq latest 2714222e1b39 13 months ago 55.8MB
五、docker 運行 nsqlookupd
$ docker run --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
[nsqlookupd] 2018/05/06 05:37:27.120708 nsqlookupd v1.0.0-compat (built w/go1.8)
[nsqlookupd] 2018/05/06 05:37:27.121254 TCP: listening on [::]:4160
[nsqlookupd] 2018/05/06 05:37:27.121291 HTTP: listening on [::]:4161
PS:若是官方的是lookupd 會致使 nsqadmin 鏈接失敗,修改爲了:nsq
lookupd
六、docker 運行 nsqd
$ docker run --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=139.224.239.21 --lookupd-tcp-address=139.224.239.21:4160
[nsqd] 2018/05/06 05:32:10.163609 nsqd v1.0.0-compat (built w/go1.8)
[nsqd] 2018/05/06 05:32:10.163642 ID: 302
[nsqd] 2018/05/06 05:32:10.163677 NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2018/05/06 05:32:10.167041 TCP: listening on [::]:4150
[nsqd] 2018/05/06 05:32:10.167078 HTTP: listening on [::]:4151
說明:上面的http和tcp鏈接地址我都是寫的公網IP地址,端口號默認不變
七、docker 運行 nsqadmin
$ docker run --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin --lookupd-http-address=139.224.239.21:4161
[nsqadmin] 2018/05/06 05:38:41.011751 nsqadmin v1.0.0-compat (built w/go1.8)
[nsqadmin] 2018/05/06 05:38:41.011901 HTTP: listening on [::]:4171
使用 docker ps 命令列出已經運行中的容器
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED PORTS NAMES
6534ccee1e6a nsqio/nsq "/nsqlookupd" 2 seconds ago 4150-4151/tcp, 4170-4171/tcp, 0.0.0.0:4160-4161->4160-4161/tcp lookupd
1e642cf63071 nsqio/nsq "/nsqadmin --lookupd…" About a minute ago 4150-4151/tcp, 4160-4161/tcp, 4170/tcp, 0.0.0.0:4171->4171/tcp nsqadmin
9d5e0678077a nsqio/nsq "/nsqd --broadcast-a…" About a minute ago 4160-4161/tcp, 0.0.0.0:4150-4151->4150-4151/tcp, 4170-4171/tcp nsqd
八、使用Nginx 作一個域名代理,代理nsqadmin
server {
server_name nsqadmin.tinywan.com;
location / {
proxy_pass http://139.224.239.21:4171;
#Proxy Settings
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}
PS:這一步爲非必須,只是爲了好看而已,若是域名的話,配置個仍是很好的
九、瀏覽器訪問nsqadmin,如下表示配置ok
十、經過不一樣的方式生產一個新消息
(1)在Linux服務端經過curl 生產一個新消息
curl -d 'Hello Linux localhost Msg' 'http://127.0.0.1:4151/pub?topic=test'
(2)在windows本地環境,在git環境中經過curl 生產一個新消息
$ curl -d "hello windows MSG" "http://nsqadmin.tinywan.com:4151/pub?topic=test"
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 19 100 2 100 17 64 548 --:--:-- --:--:-- --:--:-- 548OK
(3)經過go語言腳本發送一條信息
package main
import (
"github.com/nsqio/go-nsq"
)
var producer *nsq.Producer
func main() {
nsqd := "139.224.239.21:4150"
producer, err := nsq.NewProducer(nsqd, nsq.NewConfig())
producer.Publish("test", []byte("Hello Go Client Msg"))
if err != nil {
panic(err)
}
}
十一、開始消費消息
因爲這裏docker 消費消息須要掛在一個虛擬磁盤,而我沒有,因此換是直接經過上面演示的go代碼進行測試(消費消息)。
(1)經過以上的 go_client_sub.go 腳本去跑這個消費信息,注意修改host 爲當前nsq服務器的IP,而不是本地
err = consumer.ConnectToNSQD("139.224.239.21:4150")
(2)修改要訂閱的頻道
consumer, err := nsq.NewConsumer("test", "nsq_to_Linux", config)
(3)開始運行腳本
D:\Tinywan\go-socket>go run go_client_sub.go
2018/05/06 14:12:57 INF 1 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF 2 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF 3 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF 4 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF 5 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF 6 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF 7 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF 8 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF 9 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF 10 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
receive 139.224.239.21:4150 message: Hello Linux localhost Msg
receive 139.224.239.21:4150 message: Hello windows MSG
receive 139.224.239.21:4150 message: Hello Go Client Msg
以上表示消費的不一樣的消息
十二、在看看監控頁面
PS:以上能夠很清楚的看到接收到3條信息了
1三、使用docker 須要注意
(1)docker 沒有在後臺進程跑的時候(沒有加參數 -d),也就是測試的時候,如:docker run --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
(2)若是使用 ctrl + c 退出後,繼續啓動提示錯誤:
docker run --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
docker: Error response from daemon: Conflict. The container name "/lookupd" is already in use by container "35bb6a3e51d367db846d376580d5b8e389d419e48e2f3c330c4e0dd093a218c3". You have to remove (or rename) that container to be able to reuse that name.
See 'docker run --help'.
PS:開啓一個容器而後先中止、刪除後才能夠繼續重啓,請執行如下命令後從新運行命令
(3)中止:docker stop lookupd
(4)移除:docker rm lookupd
<?php
namespace app\index\controller;
use think\Controller;
class NsqController extends Controller
{
public function index()
{
ini_set('memory_limit', '8M');
$nsqdAddr = [
"127.0.0.1:4151",
"127.0.0.1:4150"
];
$nsq = new \Nsq();
$isTrue = $nsq->connectNsqd($nsqdAddr);
for ($i = 0; $i < 6; $i++) {
$nsq->publish("test", "Hi Tinywan");
}
$nsq->closeNsqdConnection();
halt($isTrue);
// Deferred publish
//function : deferredPublish(string topic,string message, int millisecond);
//millisecond default : [0 < millisecond < 3600000]
$deferred = new \Nsq();
$isTrue = $deferred->connectNsqd($nsqdAddr);
for ($i = 0; $i < 20; $i++) {
$deferred->deferredPublish("test", "message daly", 3000);
}
$deferred->closeNsqdConnection();
}
public function nsqSubMessage()
{
$nsq_lookupd = new \NsqLookupd("127.0.0.1:4161"); //the nsqlookupd http addr
$nsq = new \Nsq();
$config = array(
"topic" => "test",
"channel" => "struggle",
"rdy" => 2, //optional , default 1
"connect_num" => 1, //optional , default 1
"retry_delay_time" => 5000, //optional, default 0 , if run callback failed, after 5000 msec, message will be retried
"auto_finish" => true, //default true
);
$nsq->subscribe($nsq_lookupd, $config, function ($msg, $bev) {
echo $msg->payload . "\n";
echo $msg->attempts . "\n";
echo $msg->messageId . "\n";
echo $msg->timestamp . "\n";
});
}
}
使用命令行模式,在console 顯示信息
php think pay nsq
Connect succeed
nihao
1
09f0f1ae3bef9002
1530525939845171789
Hi Tinywan
1
09f0f1c2262f9000
1530525961228600358
Hi Tinywan
1
09f0f1c2266f9001
1530525961229073539
Hi Tinywan
1
09f0f1ec952f9000
1530526006791156404
Hi Tinywan
1
09f0f1ec956f9001
1530526006792159596
Hi Tinywan
1
09f0f1ec95af9000
1530526006792847311
4、集羣搭建
一、主服務器IP地址(公網):59.110.213.20
(1)拉取NSQ鏡像
docker pull nsqio/nsq
(2)查看nsq鏡像
docker images
(3)啓動nsqlookupd服務
docker run -d --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
(4)開啓nsqadmin管理系統
docker run -d --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin --lookupd-http-address=59.110.213.20:4161
nsqadmin能夠部署在任何一個安裝有nsq服務的機器上,只須要指定惟一的 --lookupd-http-address 服務IP地址
(5)開啓一個nsqd節點服務
docker run -d --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=59.110.213.20 --lookupd-tcp-address=59.110.213.20:4160
二、從服務器IP地址(公網):47.99.94.49
開啓一個nsqd節點服務
docker run -d --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=47.99.94.49 --lookupd-tcp-address=59.110.213.20:4160
三、測試主從
打開admin 管理平臺
可看出已經部署的兩個nsqd 節點以及廣播地址
打開任意一個shell終端(本測試爲Windows 系統的bash )
主
tinyw@DESKTOP-HH02AC6 MINGW64 ~/Desktop
$ curl -d 'Hello Nsq 20-2' 'http://59.110.213.20:4151/pub?topic=Tinywan_20'
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 16 100 2 100 14 32 225 --:--:-- --:--:-- --:--:-- 258OK
從
tinyw@DESKTOP-HH02AC6 MINGW64 ~/Desktop
$ curl -d 'Hello Nsq 49-2' 'http://47.99.94.49:4151/pub?topic=Tinywan_49'
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 16 100 2 100 14 125 875 --:--:-- --:--:-- --:--:-- 1000OK
admin 測試結果
注意點:消費消息是根據節點Id去消費,單獨分開的,在20上訂閱的,在49上是無法消費的