RabbitMQ服務器安裝
一、安裝erlangmysql
wget https://www.rabbitmq.com/releases/erlang/erlang-18.2-1.el6.x86_64.rpm
二、安裝RabbitMQgit
wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-3.6.15-1.el6.noarch.rpm
三、 經常使用命令github
systemctl start rabbitmq-server 啓動 rabbitmq stop 中止 rabbitmq-plugins list 插件命令 rabbitmq-plugins enable rabbitmq_management 安裝管理插件 rabbitmq-plugins disable rabbitmq_management 卸載管理插件
四、 瀏覽器打開sql
端口號默認:15672<br/>
密碼和用戶名默認:guest
http://127.0.0.1:15672/#/
五、常見錯誤
錯誤提示:zsh: command not found: rabbitmq-plugins<br/>
解決辦法:
第一種:export PATH=/usr/local/Cellar/rabbitmq/3.8.2/sbin/:$PATH<br/>
第二種:1: vim .bash_profile(前提是存在該文件,若是不存在,能夠先建立mkdir .bash_profile,以後再執行vi編輯)<br/>
2:export PATH=/usr/local/Cellar/rabbitmq/3.8.2/sbin/sbin/:$PATH數據庫
最後:source ~/.bash_profilevim
## RabbitMQ核心概念
### Virtual Hosts管理
像mysql擁有數據庫的概念而且能夠指定用戶對庫和表等操做的權限。那RabbitMQ呢?RabbitMQ也有相似的權限管理。在RabbitMQ中能夠虛擬消息服務器VirtualHost,每一個VirtualHost至關於一個相對獨立的RabbitMQ服務器,每一個VirtualHost之間是相互隔離的。exchange、queue、message不能互通。 至關於mysql的db。Virtual Name通常以/開頭<br/>
一、建立Virtual Hosts:
Admin->Virtual Hosts->Add a new virtual host<br/>
二、建立用戶:Admin->Users->Add a user<br/>
二、對用戶進行受權,點擊須要受權的vhosts->Permissions->Set permission瀏覽器
## RabbitMQ五種模式
url格式:amqp:// 帳號 密碼@地址:端口號/vhost <br/>
一、Simple模式 最簡單最經常使用的模式,一個消息只能被一個消費者消費<br/>bash
二、Work模式,一個消息只能被一個消費者消費服務器
三、Publish/Subscribe訂閱模式,消息被路由投遞給多個隊列,一個消息被多個消費者獲取,生產端不容許指定消費函數
四、Routing路由模式,一個消息被多個消費者獲取,而且消息的目標隊列能夠被生產者指定
五、Topic話題模式,一個消息被多個消息獲取,消息的目標queue可用BindKey以通配符,(#:一個或多個詞,*:一個詞)的方式指定。
示例代碼
package RabbitMQ import ( "fmt" "github.com/streadway/amqp" "log" ) //amqp:// 帳號 密碼@地址:端口號/vhost const MQURL = "amqp://imoocuser:imoocuser@127.0.0.1:5672/imooc" type RabbitMQ struct { //鏈接 conn *amqp.Connection //管道 channel *amqp.Channel //隊列名稱 QueueName string //交換機 Exchange string //key Simple模式 幾乎用不到 Key string //鏈接信息 Mqurl string } //建立RabbitMQ結構體實例 func NewRabbitMQ(queuename string, exchange string,key string) *RabbitMQ { rabbitmq := &RabbitMQ{QueueName:queuename,Exchange:exchange,Key:key,Mqurl:MQURL} var err error //建立rabbitmq鏈接 rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err, "建立鏈接錯誤!") rabbitmq.channel,err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err,"獲取channel失敗") return rabbitmq } //斷開channel和connection func (r *RabbitMQ) Destory() { r.channel.Close() r.conn.Close() } //錯誤處理函數 func (r *RabbitMQ) failOnErr (err error,message string) { if err !=nil { log.Fatalf("%s:%s",message,err) panic(fmt.Sprintf("%s:%s",message, err)) } } //簡單模式step:1。建立簡單模式下RabbitMQ實例 func NewRabbitMQSimple(queueName string) * RabbitMQ { return NewRabbitMQ(queueName, "", "") } //訂閱模式建立rabbitmq實例 func NewRabbitMQPubSub(exchangeName string) * RabbitMQ { //建立rabbitmq實例 rabbitmq := NewRabbitMQ("", exchangeName, "") var err error //獲取connection rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err, "failed to connecct rabbitmq!") //獲取channel rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "failed to open a channel!") return rabbitmq } //訂閱模式生成 func (r *RabbitMQ) PublishPub(message string) { //嘗試建立交換機,不存在建立 err := r.channel.ExchangeDeclare( //交換機名稱 r.Exchange, //交換機類型 廣播類型 "fanout", //是否持久化 true, //是否字段刪除 false, //true表示這個exchange不能夠被client用來推送消息,僅用來進行exchange和exchange之間的綁定 false, //是否阻塞 true表示要等待服務器的響應 false, nil, ) r.failOnErr(err, "failed to declare an excha" + "nge") //2 發送消息 err = r.channel.Publish( r.Exchange, "", false, false, amqp.Publishing{ //類型 ContentType:"text/plain", //消息 Body:[]byte(message), }, ) } //訂閱模式消費端代碼 func (r * RabbitMQ) RecieveSub() { //嘗試建立交換機,不存在建立 err := r.channel.ExchangeDeclare( //交換機名稱 r.Exchange, //交換機類型 廣播類型 "fanout", //是否持久化 true, //是否字段刪除 false, //true表示這個exchange不能夠被client用來推送消息,僅用來進行exchange和exchange之間的綁定 false, //是否阻塞 true表示要等待服務器的響應 false, nil, ) r.failOnErr(err, "failed to declare an excha" + "nge") //2試探性建立隊列,建立隊列 q, err := r.channel.QueueDeclare( "",//隨機生產隊列名稱 false, false, true, false, nil, ) r.failOnErr(err, "Failed to declare a queue") //綁定隊列到exchange中 err = r.channel.QueueBind( q.Name, //在pub/sub模式下,這裏的key要爲空 "", r.Exchange, false, nil, ) //消費消息 message, err := r.channel.Consume( q.Name, "", true, false, false, false, nil, ) forever := make(chan bool) go func() { for d := range message { log.Printf("Received a message:%s,", d.Body) } }() fmt.Println("退出請按 Ctrl+C") <- forever } //話題模式 建立RabbitMQ實例 func NewRabbitMQTopic(exchagne string, routingKey string) *RabbitMQ { //建立rabbitmq實例 rabbitmq := NewRabbitMQ("", exchagne, routingKey) var err error rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err,"failed to connect rabbingmq!") rabbitmq.channel,err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "failed to open a channel") return rabbitmq } //話題模式發送信息 func (r * RabbitMQ) PublishTopic(message string) { //嘗試建立交換機,不存在建立 err := r.channel.ExchangeDeclare( //交換機名稱 r.Exchange, //交換機類型 話題模式 "topic", //是否持久化 true, //是否字段刪除 false, //true表示這個exchange不能夠被client用來推送消息,僅用來進行exchange和exchange之間的綁定 false, //是否阻塞 true表示要等待服務器的響應 false, nil, ) r.failOnErr(err, "topic failed to declare an excha" + "nge") //2發送信息 err = r.channel.Publish( r.Exchange, //要設置 r.Key, false, false, amqp.Publishing{ //類型 ContentType:"text/plain", //消息 Body:[]byte(message), }, ) } //話題模式接收信息 //要注意key //其中* 用於匹配一個單詞,#用於匹配多個單詞(能夠是零個) //匹配 表示匹配imooc.* 表示匹配imooc.hello,可是imooc.hello.one須要用imooc.#才能匹配到 func (r *RabbitMQ) RecieveTopic() { //嘗試建立交換機,不存在建立 err := r.channel.ExchangeDeclare( //交換機名稱 r.Exchange, //交換機類型 話題模式 "topic", //是否持久化 true, //是否字段刪除 false, //true表示這個exchange不能夠被client用來推送消息,僅用來進行exchange和exchange之間的綁定 false, //是否阻塞 true表示要等待服務器的響應 false, nil, ) r.failOnErr(err, "failed to declare an excha" + "nge") //2試探性建立隊列,建立隊列 q, err := r.channel.QueueDeclare( "",//隨機生產隊列名稱 false, false, true, false, nil, ) r.failOnErr(err, "Failed to declare a queue") //綁定隊列到exchange中 err = r.channel.QueueBind( q.Name, //在pub/sub模式下,這裏的key要爲空 r.Key, r.Exchange, false, nil, ) //消費消息 message, err := r.channel.Consume( q.Name, "", true, false, false, false, nil, ) forever := make(chan bool) go func() { for d := range message { log.Printf("Received a message:%s,", d.Body) } }() fmt.Println("退出請按 Ctrl+C") <- forever } //路由模式 建立RabbitMQ實例 func NewRabbitMQRouting(exchagne string, routingKey string) *RabbitMQ { //建立rabbitmq實例 rabbitmq := NewRabbitMQ("", exchagne, routingKey) var err error rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err,"failed to connect rabbingmq!") rabbitmq.channel,err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "failed to open a channel") return rabbitmq } //路由模式發送信息 func (r * RabbitMQ) PublishRouting(message string) { //嘗試建立交換機,不存在建立 err := r.channel.ExchangeDeclare( //交換機名稱 r.Exchange, //交換機類型 廣播類型 "direct", //是否持久化 true, //是否字段刪除 false, //true表示這個exchange不能夠被client用來推送消息,僅用來進行exchange和exchange之間的綁定 false, //是否阻塞 true表示要等待服務器的響應 false, nil, ) r.failOnErr(err, "failed to declare an excha" + "nge") //發送信息 err = r.channel.Publish( r.Exchange, //要設置 r.Key, false, false, amqp.Publishing{ //類型 ContentType:"text/plain", //消息 Body:[]byte(message), }, ) } //路由模式接收信息 func (r *RabbitMQ) RecieveRouting() { //嘗試建立交換機,不存在建立 err := r.channel.ExchangeDeclare( //交換機名稱 r.Exchange, //交換機類型 廣播類型 "direct", //是否持久化 true, //是否字段刪除 false, //true表示這個exchange不能夠被client用來推送消息,僅用來進行exchange和exchange之間的綁定 false, //是否阻塞 true表示要等待服務器的響應 false, nil, ) r.failOnErr(err, "failed to declare an excha" + "nge") //2試探性建立隊列,建立隊列 q, err := r.channel.QueueDeclare( "",//隨機生產隊列名稱 false, false, true, false, nil, ) r.failOnErr(err, "Failed to declare a queue") //綁定隊列到exchange中 err = r.channel.QueueBind( q.Name, //在pub/sub模式下,這裏的key要爲空 r.Key, r.Exchange, false, nil, ) //消費消息 message, err := r.channel.Consume( q.Name, "", true, false, false, false, nil, ) forever := make(chan bool) go func() { for d := range message { log.Printf("Received a message:%s,", d.Body) } }() fmt.Println("退出請按 Ctrl+C") <- forever } //簡單模式Step:二、簡單模式下生產代碼 func (r *RabbitMQ) PublishSimple (message string) { //一、申請隊列,若是隊列存在就跳過,不存在建立 //優勢:保證隊列存在,消息能發送到隊列中 _, err := r.channel.QueueDeclare( //隊列名稱 r.QueueName, //是否持久化 false, //是否爲自動刪除 當最後一個消費者斷開鏈接以後,是否把消息從隊列中刪除 false, //是否具備排他性 true表示本身可見 其餘用戶不能訪問 false, //是否阻塞 true表示要等待服務器的響應 false, //額外數學系 nil, ) if err != nil { fmt.Println(err) } //2.發送消息到隊列中 r.channel.Publish( //默認的Exchange交換機是default,類型是direct直接類型 r.Exchange, //要賦值的隊列名稱 r.QueueName, //若是爲true,根據exchange類型和routkey規則,若是沒法找到符合條件的隊列那麼會把發送的消息返回給發送者 false, //若是爲true,當exchange發送消息到隊列後發現隊列上沒有綁定消費者,則會把消息還給發送者 false, //消息 amqp.Publishing{ //類型 ContentType:"text/plain", //消息 Body:[]byte(message), }) } func (r *RabbitMQ) ConsumeSimple() { //一、申請隊列,若是隊列存在就跳過,不存在建立 //優勢:保證隊列存在,消息能發送到隊列中 _, err := r.channel.QueueDeclare( //隊列名稱 r.QueueName, //是否持久化 false, //是否爲自動刪除 當最後一個消費者斷開鏈接以後,是否把消息從隊列中刪除 false, //是否具備排他性 false, //是否阻塞 false, //額外數學系 nil, ) if err != nil { fmt.Println(err) } //接收消息 msgs, err := r.channel.Consume( r.QueueName, //用來區分多個消費者 "", //是否自動應答 true, //是否具備排他性 false, //若是設置爲true,表示不能同一個connection中發送的消息傳遞給這個connection中的消費者 false, //隊列是否阻塞 false, nil, ) if err!=nil { fmt.Println(err) } forever := make(chan bool) //啓用協程處理 go func() { for d := range msgs { //實現咱們要處理的邏輯函數 log.Printf("Received a message:%s",d.Body) //fmt.Println(d.Body) } }() log.Printf("【*】warting for messages, To exit press CCTRAL+C") <- forever }
測試
//Simple模式 發送者 rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple") rabbitmq.PublishSimple("hello imooc!") //接收者 rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple") rabbitmq.ConsumeSimple() //訂閱模式發送者 rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct") for i :=0; i<=100 ; i++ { rabbitmq.PublishPub("訂閱模式生產第" + strconv.Itoa(i) + "條數據") fmt.Println(i) time.Sleep(1 * time.Second) } //接收者 rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct") rabbitmq.RecieveSub() //路由模式發送者 imoocOne := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_one") imoocTwo := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_two") for i :=0; i<=10; i++ { imoocOne.PublishRouting("hello imooc one!" + strconv.Itoa(i)) imoocTwo.PublishRouting("hello imooc two!" + strconv.Itoa(i)) time.Sleep(1 * time.Second) fmt.Println(i) } //接收者 rabbitmq := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_one") rabbitmq.RecieveRouting() //Topic模式發送者 imoocOne := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three") imoocTwo := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.four") for i :=0; i<=10; i++ { imoocOne.PublishTopic("hello imooc topic three!" + strconv.Itoa(i)) imoocTwo.PublishTopic("hello imooc topic four!" + strconv.Itoa(i)) time.Sleep(1 * time.Second) fmt.Println(i) } //Topic接收者 rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "#") rabbitmq.RecieveTopic()