手冊:http://www.rabbitmq.com/getstarted.htmlcss
安裝:http://www.rabbitmq.com/download.htmlhtml
參考:http://blog.csdn.net/whycold/article/details/41119807java
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。nginx
AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。git
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Go、Python、Ruby。用於在分佈式系統中存儲轉發消息。github
ubuntu直接下載deb文件安裝,默認已經啓動,sudo敲入:正則表達式
sudo rabbitmq-server start sudo lsof -i:5672
啓用插件,進入UI:ubuntu
sudo rabbitmq-plugins enable rabbitmq_management
登陸http://127.0.0.1:15672(默認guest只能localhost訪問,要遠程訪問,須要使用可遠程訪問的管理員帳號)
segmentfault
用戶名:密碼=guest:guest緩存
# 敲入查看幫助 sudo rabbitmqctl # 建立用戶 sudo rabbitmqctl add_user 登陸用戶名 密碼 # 能夠建立管理員用戶,負責整個MQ的運維 sudo rabbitmqctl set_user_tags 登陸用戶名 administrator # 能夠建立RabbitMQ監控用戶,負責整個MQ的監控 sudo rabbitmqctl set_user_tags 登陸用戶名 monitoring # 能夠建立某個項目的專用用戶,只能訪問項目本身的virtual hosts sudo rabbitmqctl set_user_tags 登陸用戶名 management # 查看用戶 sudo rabbitmqctl list_users # 受權 # 該命令使用戶具備/這個virtual host中全部資源的配置、寫、讀權限以便管理其中的資源 # set_permissions [-p <vhostpath>] <user> <conf> <write> <read> # 其中,<conf> <write> <read>的位置分別用正則表達式來匹配特定的資源,如'^(amq\.gen.*|amq\.default)$'能夠匹配server生成的和默認的exchange,'^$'不匹配任何資源 sudo rabbitmqctl set_permissions -p / 登陸用戶名 '.*' '.*' '.*'
ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。
Connection是RabbitMQ的socket連接,它封裝了socket協議相關部分邏輯。
ConnectionFactory爲Connection的製造工廠。
Channel是咱們與RabbitMQ打交道的最重要的一個接口,咱們大部分的業務操做是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發佈消息等。
Queue(隊列)是RabbitMQ的內部對象,用於存儲消息,用下圖表示。
RabbitMQ中的消息都只能存儲在Queue中,生產者(下圖中的P)生產消息並最終投遞到Queue中,消費者(下圖中的C)能夠從Queue中獲取消息並消費。
多個消費者能夠訂閱同一個Queue,這時Queue中的消息會被平均分攤給多個消費者進行處理,而不是每一個消費者都收到全部的消息並處理。
在實際應用中,可能會發生消費者收到Queue中的消息,但沒有處理完成就宕機(或出現其餘意外)的狀況,這種狀況下就可能會致使消息丟失。爲了不這種狀況發生,咱們能夠要求消費者在消費完消息後發送一個回執給RabbitMQ,RabbitMQ收到消息回執(Message acknowledgment)後纔將該消息從Queue中移除;若是RabbitMQ沒有收到回執並檢測到消費者的RabbitMQ鏈接斷開,則RabbitMQ會將該消息發送給其餘消費者(若是存在多個消費者)進行處理。這裏不存在timeout概念,一個消費者處理消息時間再長也不會致使該消息被髮送給其餘消費者,除非它的RabbitMQ鏈接斷開。
這裏會產生另一個問題,若是咱們的開發人員在處理完業務邏輯後,忘記發送回執給RabbitMQ,這將會致使嚴重的bug——Queue中堆積的消息會愈來愈多;消費者重啓後會重複消費這些消息並重復執行業務邏輯…
另外pub message是沒有ack的。(??)
若是咱們但願即便在RabbitMQ服務重啓的狀況下,也不會丟失消息,咱們能夠將Queue與Message都設置爲可持久化的(durable),這樣能夠保證絕大部分狀況下咱們的RabbitMQ消息不會丟失。但依然解決不了小几率丟失事件的發生(好比RabbitMQ服務器已經接收到生產者的消息,但還沒來得及持久化該消息時RabbitMQ服務器就斷電了),若是咱們須要對這種小几率事件也要管理起來,那麼咱們要用到事務。因爲這裏僅爲RabbitMQ的簡單介紹,因此這裏將不講解RabbitMQ相關的事務。
前面咱們講到若是有多個消費者同時訂閱同一個Queue中的消息,Queue中的消息會被平攤給多個消費者。這時若是每一個消息的處理時間不一樣,就有可能會致使某些消費者一直在忙,而另一些消費者很快就處理完手頭工做並一直空閒的狀況。咱們能夠經過設置prefetchCount來限制Queue每次發送給每一個消費者的消息數,好比咱們設置prefetchCount=1,則Queue每次給每一個消費者發送一條消息;消費者處理完這條消息後Queue會再給該消費者發送一條消息。就是變慢而已。訂閱模式如何平攤?這種模式是一個消費者一次性拿不少條消息?
在上一節咱們看到生產者將消息投遞到Queue中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的狀況是,生產者將消息發送到Exchange(交換器,下圖中的X),由Exchange將消息路由到一個或多個Queue中(或者丟棄)。
Exchange是按照什麼邏輯將消息路由到Queue的?這個將在Binding一節介紹。
RabbitMQ中的Exchange有四種類型,不一樣的類型有着不一樣的路由策略,這將在Exchange Types一節介紹。
生產者在將消息發送給Exchange的時候,通常會指定一個routing key,來指定這個消息的路由規則,而這個routing key須要與Exchange Type及binding key聯合使用才能最終生效。
在Exchange Type與binding key固定的狀況下(在正常使用時通常這些內容都是固定配置好的),咱們的生產者就能夠在發送消息給Exchange時,經過指定routing key來決定消息流向哪裏。
RabbitMQ爲routing key設定的長度限制爲255 bytes。
RabbitMQ中經過Binding將Exchange與Queue關聯起來,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。
在綁定(Binding)Exchange與Queue的同時,通常會指定一個binding key;消費者將消息發送給Exchange時,通常會指定一個routing key;當binding key與routing key相匹配時,消息將會被路由到對應的Queue中。這個將在Exchange Types章節會列舉實際的例子加以說明。
在綁定多個Queue到同一個Exchange的時候,這些Binding容許使用相同的binding key。 binding key 並非在全部狀況下都生效,它依賴於Exchange Type,好比fanout類型(廣播)的Exchange就會無視binding key,而是將消息路由到全部綁定到該Exchange的Queue。
RabbitMQ經常使用的Exchange Type有fanout、direct、topic、headers這四種(AMQP規範裏還提到兩種Exchange Type,分別爲system與自定義,這裏不予以描述),下面分別進行介紹。
fanout類型的Exchange路由規則很是簡單,它會把全部發送到該Exchange的消息路由到全部與它綁定的Queue中。
direct類型的Exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key徹底匹配的Queue中。
以上圖的配置爲例,咱們以routingKey=」error」發送消息到Exchange,則消息會路由到Queue1(amqp.gen-S9b…,這是由RabbitMQ自動生成的Queue名稱)和Queue2(amqp.gen-Agl…);若是咱們以routingKey=」info」或routingKey=」warning」來發送消息,則消息只會路由到Queue2。若是咱們以其餘routingKey發送消息,則消息不會路由到這兩個Queue中。
前面講到direct類型的Exchange路由規則是徹底匹配binding key與routing key,但這種嚴格的匹配方式在不少狀況下不能知足實際業務需求。topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage類似,也是將消息路由到binding key與routing key相匹配的Queue中,但這裏的匹配規則有些不一樣,它約定:
1. routing key爲一個句點號「. 」分隔的字符串(咱們將被句點號「. 」分隔開的每一段獨立的字符串稱爲一個單詞),如「stock.usd.nyse」、「nyse.vmw」、「quick.orange.rabbit」 2. binding key與routing key同樣也是句點號「. 」分隔的字符串 3. binding key中能夠存在兩種特殊字符「*」與「#」,用於作模糊匹配,其中「*」用於匹配一個單詞,「#」用於匹配多個單詞(能夠是零個)
以上圖中的配置爲例,routingKey=」quick.orange.rabbit」的消息會同時路由到Q1與Q2,routingKey=」lazy.orange.fox」的消息會路由到Q1與Q2,routingKey=」lazy.brown.fox」的消息會路由到Q2,routingKey=」lazy.pink.rabbit」的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=」quick.brown.fox」、routingKey=」orange」、routingKey=」quick.orange.male.rabbit」的消息將會被丟棄,由於它們沒有匹配任何bindingKey
headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。
在綁定Queue與Exchange時指定一組鍵值對;當消息發送到Exchange時,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否徹底匹配Queue與Exchange綁定時指定的鍵值對;若是徹底匹配則消息會路由到該Queue,不然不會路由到該Queue。
該類型的Exchange沒有用到過(不過也應該頗有用武之地),因此不作介紹。
MQ自己是基於異步的消息處理,前面的示例中全部的生產者(P)將消息發送到RabbitMQ後不會知道消費者(C)處理成功或者失敗(甚至連有沒有消費者來處理這條消息都不知道)。
但實際的應用場景中,咱們極可能須要一些同步處理,須要同步等待服務端將個人消息處理完成後再進行下一步處理。這至關於RPC(Remote Procedure Call,遠程過程調用)。在RabbitMQ中也支持RPC。
RabbitMQ中實現RPC的機制是:
http://www.rabbitmq.com/tutorials/tutorial-one-go.html
請看官方示例:
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/send.go
上面的例子仔細看,有必要看源碼!
四種模式
發佈方,一個!
package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { // 撥號,下面例子都同樣 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // 這個是最重要的 ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 申明一個隊列 // https://godoc.org/github.com/streadway/amqp#Channel.QueueDeclare q, err := ch.QueueDeclare( "task_queue", // name 有名字! true, // durable 持久性的,若是事前已經聲明瞭該隊列,不能重複聲明 false, // delete when unused false, // exclusive 若是是真,鏈接一斷開,隊列刪除 false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") body := bodyFrom(os.Args) // 發佈 err = ch.Publish( "", // exchange 默認模式,exchange爲空 q.Name, // routing key 默認模式路由到同名隊列,便是task_queue false, // mandatory false, amqp.Publishing{ // 持久性的發佈,由於隊列被聲明爲持久的,發佈消息必須加上這個(可能不用),但消息仍是可能會丟,如消息到緩存但MQ掛了來不及持久化。 DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s }
工做方,多個,拿發佈方的消息
package main import ( "bytes" "fmt" "github.com/streadway/amqp" "log" "time" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 指定隊列! q, err := ch.QueueDeclare( "task_queue", // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") // Fair dispatch 預取,每一個工做方每次拿一個消息,確認後纔拿下一次,緩解壓力 err = ch.Qos( 1, // prefetch count // 待解釋 0, // prefetch size false, // global ) failOnError(err, "Failed to set QoS") // 消費根據隊列名 msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto-ack 設置爲真自動確認消息 false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) log.Printf("Done") // 確認消息被收到!!若是爲真的,那麼同在一個channel,在該消息以前未確認的消息都會確認,適合批量處理 // 真時場景:每十條消息確認一次,相似 d.Ack(false) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever }
發佈方
package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 默認模式有默認交換機,廣播本身定義一個交換機,交換機可與隊列進行綁定 err = ch.ExchangeDeclare( "logs", // name "fanout", // type 廣播模式 true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) // 發佈 err = ch.Publish( "logs", // exchange 消息發送到交換機,這個時候沒隊列綁定交換機,消息會丟棄 "", // routing key 廣播模式不須要這個,它會把全部消息路由到綁定的全部隊列 false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s }
訂閱方
package main import ( "fmt" "log" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 一樣要申明交換機 err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") // 新建隊列,這個隊列沒名字,隨機生成一個名字 q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive 表示鏈接一斷開,這個隊列自動刪除 false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") // 隊列和交換機綁定,便是隊列訂閱了發到這個交換機的消息 err = ch.QueueBind( q.Name, // queue name 隊列的名字 "", // routing key 廣播模式不須要這個 "logs", // exchange 交換機名字 false, nil) failOnError(err, "Failed to bind a queue") // 開始消費消息,可開多個訂閱方,由於隊列是臨時生成的,全部每一個訂閱方都能收到一樣的消息 msgs, err := ch.Consume( q.Name, // queue 隊列名字 "", // consumer true, // auto-ack 自動確認 false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }
發佈-訂閱每一個綁定的隊列都收到同樣的消息,如今不想!使用路由功能,隊列綁定進行分發。
發佈方
package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 交換機申明,且類型爲點對點默認 err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) // 發佈 err = ch.Publish( "logs_direct", // exchange 發到這個交換機 severityFrom(os.Args), // routing key 且路由key是由命令行指定,以下方,指定了error false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 3) || os.Args[2] == "" { s = "hello" } else { s = strings.Join(args[2:], " ") } return s } func severityFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "info" } else { s = os.Args[1] } return s }
發消息到交換機,路由key爲error
go run *.go error "Run. Run. Or it will explode."
消費方
package main import ( "fmt" "log" "os" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 慣例 err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") // 申明臨時隊列 q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") if len(os.Args) < 2 { log.Printf("Usage: %s [info] [warning] [error]", os.Args[0]) os.Exit(0) } # 綁定隊列和交換機,綁定多個路由key,見下方 for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s) // 下面同個隊列能夠收到不一樣路由key的消息 ,廣播模式除外! err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_direct", // exchange false, nil) failOnError(err, "Failed to bind a queue") } // 消費隊列 msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <forever }
消費這些key:info warning error
go run *.go info warning error
上面的路由都是標準的,就是固定字符串名字,話題模式可使用類正則的路由,這樣模糊匹配更棒!!
路由相似於這樣 *.love.*
* (star) can substitute for exactly one word. # (hash) can substitute for zero or more words.
發佈方
package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 交換機,話題模式 err = ch.ExchangeDeclare( "logs_topic", // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) // 相似上面 err = ch.Publish( "logs_topic", // exchange severityFrom(os.Args), // routing key 路由能夠不標準了 false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 3) || os.Args[2] == "" { s = "hello" } else { s = strings.Join(args[2:], " ") } return s } func severityFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "anonymous.info" } else { s = os.Args[1] } return s }
命令運行
To receive all the logs:
go run *.go "#"
To receive all logs from the facility 「kern」:
go run *.go "kern.*"
Or if you want to hear only about 「critical」 logs:
go run *.go "*.critical"
You can create multiple bindings:
go run *.go "kern.*" "*.critical"
消費方
package main import ( "fmt" "log" "os" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 同理 err = ch.ExchangeDeclare( "logs_topic", // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") // 臨時隊列 q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") if len(os.Args) < 2 { log.Printf("Usage: %s [binding_key]...", os.Args[0]) os.Exit(0) } for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_topic", s) // 綁定也是相似的 err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_topic", // exchange false, nil) failOnError(err, "Failed to bind a queue") } msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <forever }
命令運行
go run *.go "kern.critical" "A critical kernel error"
應答方
package main import ( "fmt" "log" "strconv" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func fib(n int) int { if n == 0 { return 0 } else if n == 1 { return 1 } else { return fib(n-1) + fib(n-2) } } func main() { // 撥號 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // channel ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 聲明匿名隊列 q, err := ch.QueueDeclare( "rpc_queue", // name false, // durable false, // delete when usused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") // 公平分發 沒有這個則round-robbin:https://segmentfault.com/a/1190000004492447 err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global ) failOnError(err, "Failed to set QoS") // 消費,等待請求 msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { //請求來了 for d := range msgs { n, err := strconv.Atoi(string(d.Body)) failOnError(err, "Failed to convert body to integer") log.Printf(" [.] fib(%d)", n) // 計算 response := fib(n) // 回答 err = ch.Publish( "", // exchange d.ReplyTo, // routing key 回答隊列 false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: d.CorrelationId, 序列號 Body: []byte(strconv.Itoa(response)), }) failOnError(err, "Failed to publish a message") // 確認回答完畢 d.Ack(false) } }() log.Printf(" [*] Awaiting RPC requests") <forever }
請教方
package main import ( "fmt" "log" "math/rand" "os" "strconv" "strings" "time" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func randomString(l int) string { bytes := make([]byte, l) for i := 0; i < l; i++ { bytes[i] = byte(randInt(65, 90)) } return string(bytes) } func randInt(min int, max int) int { return min + rand.Intn(max-min) } func fibonacciRPC(n int) (res int, err error) { // 撥號 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 隊列聲明 q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive 爲真即鏈接斷開就刪除 false, // noWait nil, // arguments ) failOnError(err, "Failed to declare a queue") // 消費 msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive 這個爲真,服務器會認爲這是該隊列惟一的消費者 false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") corrId := randomString(32) // 請教! err = ch.Publish( "", // exchange "rpc_queue", // routing key 問題發到這裏 false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: corrId, ReplyTo: q.Name, 但願回答被髮到這裏 Body: []byte(strconv.Itoa(n)), }) failOnError(err, "Failed to publish a message") // 取答案 for d := range msgs { if corrId == d.CorrelationId { res, err = strconv.Atoi(string(d.Body)) failOnError(err, "Failed to convert body to integer") break } } return } func main() { rand.Seed(time.Now().UTC().UnixNano()) n := bodyFrom(os.Args) log.Printf(" [x] Requesting fib(%d)", n) res, err := fibonacciRPC(n) failOnError(err, "Failed to handle RPC request") log.Printf(" [.] Got %d", res) } func bodyFrom(args []string) int { var s string if (len(args) < 2) || os.Args[1] == "" { s = "30" } else { s = strings.Join(args[1:], " ") } n, err := strconv.Atoi(s) failOnError(err, "Failed to convert arg to integer") return n }
http://www.rabbitmq.com/tutorials/tutorial-six-go.html
package main import ( "fmt" "github.com/streadway/amqp" "log" "time" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } // Queue屬性測試 // // durable屬性和auto-delete屬性能夠同時生效; // durable屬性和exclusive屬性會有性質上的衝突,二者同時設置時,僅exclusive屬性生效; // auto_delete屬性和exclusive屬性能夠同時生效; // // auto_delete若是有鏈接存在消費者訂閱該http://www.lenggirl.com/tool/RabbitMQ.htmlqueue,正常,若是消費者所有消失,自動刪除隊列 // 能夠在沒有建立consumer的狀況下,建立出具備auto-delete屬性的queue。 // // exclusive,若是聲明該隊列的鏈接斷開,自動刪除隊列 // queue的存在條件是在聲明該隊列的鏈接上存在某個consumer訂閱了該queue。 // func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 默認模式有默認交換機,廣播本身定義一個交換機,交換機可與隊列進行綁定 /* ExchangeDeclare declares an exchange on the server. If the exchange does not already exist, the server will create it. If the exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags. ExchangeDeclare方法在服務器聲明一個exchange。若是不存在,新建一個,存在的話則確認type和durability和auto-delete的標誌是否一致。 Errors returned from this method will close the channel. 若是方法返回錯誤,channel會被關閉。 Exchange names starting with "amq." are reserved for pre-declared and standardized exchanges. The client MAY declare an exchange starting with "amq." if the passive option is set, or the exchange already exists. Names can consists of a non-empty sequence of letters, digits, hyphen, underscore, period, or colon. 名字以"amq."開頭的Exchange是爲以前已經聲明和標準化的exchange們保留的, 在exchange已經存在的狀況下,或者passive選項設置爲真,客戶端纔有可能聲明一個這樣的exchange。 exchange的名字是一個非空序列,僅能包含字母,數字,連字符-,下劃線_,句號.,冒號: 另外的方法ExchangeDeclarePassive主要用來檢測exchange是否已經存在。 Each exchange belongs to one of a set of exchange kinds/types implemented by the server. The exchange types define the functionality of the exchange - i.e. how messages are routed through it. Once an exchange is declared, its type cannot be changed. The common types are "direct", "fanout", "topic" and "headers". Durable and Non-Auto-Deleted exchanges will survive server restarts and remain declared when there are no remaining bindings. This is the best lifetime for long-lived exchange configurations like stable routes and default exchanges. Non-Durable and Auto-Deleted exchanges will be deleted when there are no remaining bindings and not restored on server restart. This lifetime is useful for temporary topologies that should not pollute the virtual host on failure or after the consumers have completed. Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is running including when there are no remaining bindings. This is useful for temporary topologies that may have long delays between bindings. Durable and Auto-Deleted exchanges will survive server restarts and will be removed before and after server restarts when there are no remaining bindings. These exchanges are useful for robust temporary topologies or when you require binding durable queues to auto-deleted exchanges. Note: RabbitMQ declares the default exchange types like 'amq.fanout' as durable, so queues that bind to these pre-declared exchanges must also be durable. Exchanges declared as `internal` do not accept accept publishings. Internal exchanges are useful for when you wish to implement inter-exchange topologies that should not be exposed to users of the broker. When noWait is true, declare without waiting for a confirmation from the server. The channel may be closed as a result of an error. Add a NotifyClose listener to respond to any exceptions. Optional amqp.Table of arguments that are specific to the server's implementation of the exchange can be sent for exchange types that require extra parameters. func (me *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error { */ err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil) failOnError(err,