前言:若是你對rabbitmq基本概念都不懂,能夠移步此篇博文查閱消息隊列RabbitMQhtml
1、單發單收git
2、工做隊列Work Queuegithub
4、路由Routing服務器
在下圖中,「 P」是咱們的生產者,「 C」是咱們的消費者。中間的框是一個隊列-RabbitMQ表明使用者保留的消息緩衝區。post
單發單收模式下:一發一收ui
發送端只須要建立隊列,而後向隊列發送消息。url
接收端也須要建立隊列,由於若是接收端先啓動,沒有此隊列就會報錯,雖然發送端和接收端都建立此隊列,但rabbitmq仍是很智能的,它只會建立一次。
須要注意的地方:
1.發送端和接收端都須要建立同名隊列
2.接收端指定從這個同名隊列中接收消息
發送端
package main import ( "RabbitMQ" "time" ) func main(){ //第一個參數指定rabbitmq服務器的連接,第二個參數指定建立隊列的名字 send_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello") for{ time.Sleep(1) send_mq.Send("Hello World!") } }
接收端
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ //第一個參數指定rabbitmq服務器的連接,第二個參數指定建立隊列的名字 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello") for{ //接收消息時,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) } }() } }
工做隊列和單發單收模式比起來,接收端能夠有多個,接收端多了之後就會出現數據分配問題,發過來的數據到底該被哪一個接收端接收,因此有兩種模式:
公平分發:每一個接收端接收消息的機率是相等的,發送端會循環依次給每一個接收端發送消息,圖一是公平分發。
公平派遣:保證接收端在處理完某個任務,併發送確認信息後,RabbitMQ纔會向它推送新的消息,在此之間如果有新的消息話,將會被推送到其它接收端,若全部的接收端都在處理任務,那麼就會等待,圖二爲公平派遣。
圖一:
圖二:
發送端
package main import ( "RabbitMQ" "strconv" "strings" "time" ) func main(){ //第一個參數指定rabbitmq服務器的連接,第二個參數指定建立隊列的名字 send_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello") i := 0 for{ time.Sleep(1) greetings := []string{"Helloworld!",strconv.Itoa(i)} send_mq.Send(strings.Join( greetings, " ")) i = i+1 } }
接收端1
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ //第一個參數指定rabbitmq服務器的連接,第二個參數指定建立隊列的名字 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello") for{ //接收消息時,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("recevie 1 Received a message: %s", d.Body) } }() } }
接收端2
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ //第一個參數指定rabbitmq服務器的連接,第二個參數指定建立隊列的名字 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello") for{ //接收消息時,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("recevie 1 Received a message: %s", d.Body) } }() } }
咱們能夠將預取計數設置爲1。這告訴RabbitMQ一次不要給工人一個以上的消息。換句話說,在處理並確認上一條消息以前,不要將新消息發送給工做人員。而是將其分派給不忙的下一個工做程序。
//配置隊列參數 func (q *RabbitMQ)Qos(){ e := q.channel.Qos(1,0,false) failOnError(e,"沒法設置QoS") }
接收端
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ //第一個參數指定rabbitmq服務器的連接,第二個參數指定建立隊列的名字 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello") //配置公平派遣 receive_mq.Qos() for{ //接收消息時,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("recevie 2 Received a message: %s", d.Body) } }() } }
官方在這裏介紹了出現如下兩種問題的解決辦法:
1.當接收者掛掉的時候,咱們將丟失發送給接收端尚未處理的消息。
2.當rabbitmq服務器掛了,咱們怎麼保證咱們的消息不丟失。
具體參考:https://www.rabbitmq.com/tutorials/tutorial-two-go.html
發佈訂閱模式下多了一個概念:exchange,如何理解這個exchange,exchange的做用就是相似路由器,發送端發送消息須要帶有routing key 就是路由鍵,服務器會根據路由鍵將消息從交換器路由到隊列上去,因此發送端和接收端之間有了中介。
exchange有多個種類:direct,fanout,topic,header(非路由鍵匹配,功能和direct相似,不多用)。
首先介紹exchange下的fanout exchange,它會將發到這個exchange的消息廣播到關注此exchange的全部接收端上。
廣播模式下(1:N):
發送端鏈接到rabbitmq後,建立exchange,須要指定交換機的名字和類型,fanout爲廣播,而後向此exchange發送消息,其它就不用管了。
接收端的執行流程在程序備註中。
注意:廣播模式下的exchange是發送端是不須要帶路由鍵的哦。
package main import ( "RabbitMQ" "strconv" "strings" "time" ) func main(){ ch := rabbitMQ.Connect("amqp://user:password@ip:port/") rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange1","fanout") i := 0 for{ time.Sleep(1) greetings := []string{"Helloworld!",strconv.Itoa(i)} ch.Publish("exchange1",strings.Join( greetings, " "),"") i = i+1 } }
接收端1
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ // 1.接收者,首先建立本身隊列 // 2.建立交換機 // 3.將本身綁定到交換機上 // 4.接收交換機上發過來的消息 //第一個參數指定rabbitmq服務器的連接,第二個參數指定建立隊列的名字 //1 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello1") //2 //第一個參數:rabbitmq服務器的連接,第二個參數:交換機名字,第三個參數:交換機類型 rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange1","fanout") //3 // 隊列綁定到exchange receive_mq.Bind("exchange1","") //4 for{ //接收消息時,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("recevie1 Received a message: %s", d.Body) } }() } }
接收端2
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ // 1.接收者,首先建立本身隊列 // 2.建立交換機 // 3.將本身綁定到交換機上 // 4.接收交換機上發過來的消息 //第一個參數指定rabbitmq服務器的連接,第二個參數指定建立隊列的名字 //1 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2") //2 //第一個參數:rabbitmq服務器的連接,第二個參數:交換機名字,第三個參數:交換機類型 rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange1","fanout") //3 // 隊列綁定到exchange receive_mq.Bind("exchange1","") //4 for{ //接收消息時,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("recevie2 Received a message: %s", d.Body) } }() } }
路由模式其實就是全值匹配模式(direct),發送端發送消息須要帶有路由鍵,就是下面發送端程序的routing key1,是一個字符串,發送端發給exchange,路由模式下的exchange會匹配這個路由鍵,以下面這個圖,發送者發送時帶有orange此路由鍵時,這條消息只會被轉發給Q1隊列,若是路由鍵沒有匹配上的怎麼辦?,全值匹配,沒有匹配到,那麼全部接收者都接收不到消息,消息只會發送給匹配的隊列,接收端的路由鍵是綁定exchange的時候用的。
注意:接收隊列能夠綁定多個路由鍵到exchange上,好比下面,當發送路由鍵爲black,green,會被Q2接收。
發送端
package main import ( "RabbitMQ" "strconv" "strings" "time" ) func main(){ ch := rabbitMQ.Connect("amqp://user:password@ip:port/") rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","direct") i := 0 for{ time.Sleep(1) greetings := []string{"Helloworld!",strconv.Itoa(i)} if i%2 ==1 { //若是是奇數 ch.Publish("exchange",strings.Join( greetings, " "),"routing key1") } else{ ch.Publish("exchange",strings.Join( greetings, " "),"routing key2") } i = i+1 } }
接收端1
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ // 1.接收者,首先本身隊列 // 2.建立交換機 // 3.將本身綁定到交換機上 // 4.接收交換機上發過來的消息 //第一個參數指定rabbitmq服務器的連接,第二個參數指定建立隊列的名字 //1 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2") //2 //第一個參數:rabbitmq服務器的連接,第二個參數:交換機名字,第三個參數:交換機類型 rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","direct") //3 receive_mq.Bind("exchange","routing key1") //4 for{ //接收消息時,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("recevie1 Received a message: %s", d.Body) } }() } }
接收端2
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ // 1.接收者,首先本身隊列 // 2.建立交換機 // 3.將本身綁定到交換機上 // 4.接收交換機上發過來的消息 //第一個參數指定rabbitmq服務器的連接,第二個參數指定建立隊列的名字 //1 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2") //2 //第一個參數:rabbitmq服務器的連接,第二個參數:交換機名字,第三個參數:交換機類型 rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","direct") //3 receive_mq.Bind("exchange","routing key2") //4 for{ //接收消息時,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("recevie2 Received a message: %s", d.Body) } }() } }
前面的direct是全值匹配,那麼topic就能夠部分匹配,又能夠全值匹配,比direct更加靈活。
消息發送到topic類型的exchange上時不能隨意指定routing_key(必定是指由一系列由點號鏈接單詞的字符串,單詞能夠是任意的,但通常都會與消息或多或少的有些關聯)。Routing key的長度不能超過255個字節。
Binding key也必定要是一樣的方式。Topic類型的exchange就像一個直接的交換:一個由生產者指定了肯定routing key的消息將會被推送給全部Binding key能與之匹配的消費者。然而這種綁定有兩種特殊的狀況:
下邊來舉個例子:
在這個例子中,咱們將會發送一些描述動物的消息。Routing key的第一個單詞是描述速度的,第二個單詞是描述顏色的,第三個是描述物種的:「<speed>.<colour>.<species>」。
這裏咱們建立三個Binding:Binding key爲」*.orange.*」的Q1,和binding key爲」*.*.rabbit」和」lazy.#」的Q2。
這些binding能夠總結爲:
一條以」 quick.orange.rabbit」爲routing key的消息將會推送到Q1和Q2兩個queue上,routing key爲「lazy.orange.elephant」的消息一樣會被推送到Q1和Q2上。但若是routing key爲」quick.orange.fox」的話,消息只會被推送到Q1上;routing key爲」lazy.brown.fox」的消息會被推送到Q2上,routing key爲"lazy.pink.rabbit」的消息也會被推送到Q2上,但同一條消息只會被推送到Q2上一次。
若是在發送消息時所指定的exchange和routing key在消費者端沒有對應的exchange和binding key與之綁定的話,那麼這條消息將會被丟棄掉。例如:"orange"和"quick.orange.male.rabbit"。可是routing爲」lazy.orange.male.rabbit」的消息,將會被推到Q2上。
Topic類型的exchange:
Topic類型的exchange是很強大的,也能夠實現其它類型的exchange。
發送端
package main import ( "RabbitMQ" "time" ) func main(){ ch := rabbitMQ.Connect("amqp://user:password@ip/") rabbitMQ.NewExchange("amqp://user:password@ip/","exchange","topic") for{ time.Sleep(1) ch.Publish("exchange","hello world","lazy.brown.fox") } }
接收端
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ // 1.接收者,首先本身隊列 // 2.建立交換機 // 3.將本身綁定到交換機上 // 4.接收交換機上發過來的消息 //第一個參數指定rabbitmq服務器的連接,第二個參數指定建立隊列的名字 //1 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello1") //2 //第一個參數:rabbitmq服務器的連接,第二個參數:交換機名字,第三個參數:交換機類型 rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","topic") //3 receive_mq.Bind("exchange","*.orange.*") //4 for{ //接收消息時,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("recevie1 Received a message: %s", d.Body) } }() } }
接收端2
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ // 1.接收者,首先本身隊列 // 2.建立交換機 // 3.將本身綁定到交換機上 // 4.接收交換機上發過來的消息 //第一個參數指定rabbitmq服務器的連接,第二個參數指定建立隊列的名字 //1 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2") //2 //第一個參數:rabbitmq服務器的連接,第二個參數:交換機名字,第三個參數:交換機類型 rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","topic") //3 receive_mq.Bind("exchange","*.*.rabbit") receive_mq.Bind("exchange","lazy.#") //4 for{ //接收消息時,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("recevie2 Received a message: %s", d.Body) } }() } }
目錄參考:
準備工做:
1.咱們再建立go項目時,首先指定gopath目錄,而後在目錄下建立bin、src、pkg目錄。
2.下載github.com/streadway/amqp包,會自動添加到項目的pkg目錄下。
go get github.com/streadway/amqp
3.在rabbitmq服務器上建立用戶,指定管理員,並賦予訪問權限。
4.rabbitmq封裝
package rabbitMQ import ( "encoding/json" "github.com/streadway/amqp" "log" ) //聲明隊列類型 type RabbitMQ struct { channel *amqp.Channel Name string exchange string } //鏈接服務器 func Connect(s string) * RabbitMQ{ //鏈接rabbitmq conn,e := amqp.Dial(s) failOnError(e,"鏈接Rabbitmq服務器失敗!") ch ,e :=conn.Channel() failOnError(e,"沒法打開頻道!") mq := new(RabbitMQ) mq.channel =ch return mq } //初始化單個消息隊列 //第一個參數:rabbitmq服務器的連接,第二個參數:隊列名字 func New(s string,name string) * RabbitMQ{ //鏈接rabbitmq conn,e := amqp.Dial(s) failOnError(e,"鏈接Rabbitmq服務器失敗!") ch ,e :=conn.Channel() failOnError(e,"沒法打開頻道!") q,e := ch.QueueDeclare( name,//隊列名 false,//是否開啓持久化 true,//不使用時刪除 false, //排他 false, //不等待 nil, //參數 ) failOnError(e,"初始化隊列失敗!") mq := new(RabbitMQ) mq.channel =ch mq.Name =q.Name return mq } //批量初始化消息隊列 //第一個參數:rabbitmq服務器的連接,第二個參數:隊列名字列表 //配置隊列參數 func (q *RabbitMQ)Qos(){ e := q.channel.Qos(1,0,false) failOnError(e,"沒法設置QoS") } //配置交換機參數 //初始化交換機 //第一個參數:rabbitmq服務器的連接,第二個參數:交換機名字,第三個參數:交換機類型 func NewExchange(s string,name string,typename string){ //鏈接rabbitmq conn,e := amqp.Dial(s) failOnError(e,"鏈接Rabbitmq服務器失敗!") ch ,e :=conn.Channel() failOnError(e,"沒法打開頻道!") e = ch.ExchangeDeclare( name, // name typename, // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(e,"初始化交換機失敗!") } //刪除交換機 func (q *RabbitMQ)ExchangeDelete(exchange string){ e := q.channel.ExchangeDelete(exchange,false,true) failOnError(e,"綁定隊列失敗!") } //綁定消息隊列到哪一個exchange func (q *RabbitMQ)Bind(exchange string,key string){ e := q.channel.QueueBind( q.Name, key, exchange, false, nil, ) failOnError(e,"綁定隊列失敗!") q.exchange = exchange } //向消息隊列發送消息 //Send方法能夠往某個消息隊列發送消息 func (q *RabbitMQ) Send(body interface{}){ str,e := json.Marshal(body) failOnError(e,"消息序列化失敗!") e = q.channel.Publish( "",//交換 q.Name,//路由鍵:當前隊列的名字 false, //必填 false, //當即 amqp.Publishing{ ReplyTo:q.Name, Body:[]byte(str), }) msg := "向隊列:"+q.Name+"發送消息失敗!" failOnError(e,msg) } //向exchange發送消息 //Publish方法能夠往某個exchange發送消息 func (q *RabbitMQ) Publish(exchange string,body interface{},key string) { str,e := json.Marshal(body) failOnError(e,"消息序列化失敗!") e = q.channel.Publish( exchange, key, false, false, amqp.Publishing{ReplyTo:q.Name, Body:[]byte(str)}, ) failOnError(e,"向路由發送消息失敗!") } //接收某個消息隊列的消息 func (q * RabbitMQ) Consume() <-chan amqp.Delivery{ c,e :=q.channel.Consume( q.Name,//指定從哪一個隊列中接收消息 "", true, false, false, false, nil, ) failOnError(e,"接收消息失敗!") return c } //關閉隊列鏈接 func (q *RabbitMQ) Close() { q.channel.Close() } //錯誤處理函數 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }