在以前的教程中,咱們建立了一個簡單的日誌系統。咱們可以向許多交換器轉發日誌消息。html
在本教程中,咱們將添加一個功能——咱們讓它僅僅接收咱們感興趣的日誌類別。舉例:咱們 實現僅將嚴重級別的錯誤日誌寫入磁盤(爲了節省磁盤空間),其他日誌級別的日誌直接打印到控制檯。git
綁定github
以前的章節中咱們已經建立過綁定,你可能還會記得:算法
err = ch.QueueBind( q.Name, // queue name "", // routing key "logs", // exchange false, nil)
綁定是用來維繫交換器和隊列關係的,這能夠被簡單地理解爲:隊列僅僅對從交換器中傳的消息感興趣。3d
綁定有個額外參數叫作routing_key,爲了不與Channel.Publish方法中的參數相混淆,咱們稱之爲binding key(綁定鍵)。使用綁定鍵建立綁定以下:日誌
err = ch.QueueBind( q.Name, // queue name "black", // routing key "logs", // exchange false, nil)
綁定鍵的含義取決於交換器的類型。咱們以前使用的fanout類型的交換器,就會直接忽略這個參數。htm
Direct型交換器blog
咱們以前的教程中的日誌系統是廣播全部的消息到全部消費者。咱們但願以此拓展來實現根據消息嚴重性來過濾消息。好比咱們但願 寫日誌到硬盤的代碼僅僅接收嚴重級別的,不要浪費磁盤存儲在warning或者info級別的日誌。教程
以前使用的是fanout類型交換器,沒有更好的拓展性或者說靈活性——它只能盲目的廣播。rabbitmq
如今 使用direct型交換器替代。Direct型的路由算法 比較簡單——消息會被派發到某個隊列,該隊列的綁定鍵剛好和消息的路由鍵一致。
爲了闡述,考慮以下設置:
該設置中,能夠看到direct型的交換器X被綁定到了兩個隊列:Q一、Q2。Q1使用綁定鍵orange綁定,Q2包含兩個綁定鍵:black和green。
基於如上設置的話,使用路由鍵orange發佈的消息會被路由到Q1隊列,而使用black或者green路由鍵的消息均會被路由到Q2,全部其他消息將被丟棄。
備註:這裏的交換器X和隊列的綁定是多對多的關係,也就是說一個交換器能夠到綁定多個隊列,一個隊列也能夠被多個交換器綁定,消息只會被路由一次,不能由於兩個綁定鍵都匹配上了路由鍵消息就會被路由兩次,這種是不存在的。
多個綁定
用相同的綁定鍵去綁定多個隊列是徹底合法的,咱們能夠再添加一個black綁定鍵來綁定X和Q1,這樣Q1和Q2都使用black綁定到了交換器X,這其實和fanout類型的交換器直接綁定到隊列Q一、Q2功能相同:使用black路由鍵的消息會被直接路由到Q1和Q2。
發送日誌
咱們將使用該模型來構建日誌系統。使用direct型的交換器替換fanout型的,咱們將日誌的嚴重級別做爲路由鍵,這樣的話接收端程序能夠選擇日誌接收級別進行接收,首先聚焦下日誌發送端:
首先建立一個交換器:
err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments )
而後是發送消息:
err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( "logs_direct", // exchange severityFrom(os.Args), // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })
爲了簡單起見,咱們假設日誌嚴重級別以下:'info', 'warning', 'error'。
訂閱
接收還和以前章節接收同樣,只有一個例外:咱們將爲每個感興趣的嚴重級別建立一個綁定:
q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") if len(os.Args) < 2 { log.Printf("Usage: %s [info] [warning] [error]", os.Args[0]) os.Exit(0) } for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s) err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_direct", // exchange false, nil) failOnError(err, "Failed to bind a queue") }
糅合在一塊兒
發送端:
// rabbitmq_4_emit_log_direct.go project main.go package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { //連接隊列服務 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() //聲明一個channel ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() //聲明一個direct類型交換器 err = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) ch.Publish("logs_direct", severityFrom(os.Args), false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } //接收消息發送內容 func bodyFrom(args []string) string { var s string if (len(args) < 3) || os.Args[2] == "" { s = "hello" } else { s = strings.Join(args[2:], " ") } return s } //接收日誌級別,做爲路由鍵使用 func severityFrom(args []string) string { var s string if len(args) < 2 || args[1] == "" { s = "info" } else { s = args[1] } return s }
接收端:
// rabbitmq_4_receive_logs_direct.go project main.go package main import ( "fmt" "log" "os" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { //連接隊列服務 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() //聲明一個channel ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() //聲明一個direct類型交換器 err = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil) failOnError(err, "Failed to declare an exchange") //聲明一個隊列 q, err := ch.QueueDeclare("", false, false, true, false, nil) failOnError(err, "Failed to declare a queue") //判斷cmd窗口接收參數是否足夠 if len(os.Args) < 2 { log.Printf("Usage:%s [info] [warning] [error]", os.Args[0]) os.Exit(0) } //cmd窗口輸入的多個日誌級別,分別循環處理—進行綁定 for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s) ch.QueueBind(q.Name, s, "logs_direct", false, nil) failOnError(err, "Failed to bind a queue") } msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }
若是您只想保存「警告」和「錯誤」(而不是「信息」)日誌消息到文件,只須要打開一個控制檯而後輸入:
go run receive_logs_direct.go warning error > logs_from_rabbit.log
若是你想看到全部的日誌消息在你的屏幕上,打開一個新的終端,輸入:
go run receive_logs_direct.go info warning error
發出一個錯誤日誌消息類型以下:
go run emit_log_direct.go error "Run. Run. Or it will explode."
能夠觀察到:
消息能夠進行分類接收了, 只有error級別的消息纔會被存入log日誌文件,而info、warning級別的都不存入。
實際效果以下: