本節內容咱們將對發佈訂閱增長一個特性:訂閱子集。好比咱們將一些危險的錯誤消息保存進硬盤中,同時在控制檯仍然可以讀取全部的消息git
上一節內容咱們將隊列跟交換器進行binging:github
err = ch.QueueBind( q.Name, // queue name "", // routing key "logs", // exchange false, nil)
一個binging是將交換器跟隊列進行關聯,能夠簡單理解爲,綁定好的隊列只會接收來自這個交換器的消息日誌
Bindings可以攜帶一個額外的參數:routing_key,爲了不跟Channel.Publish中的參數混淆咱們稱這個routing_key爲binding key。因此咱們能夠建立一個binding:code
err = ch.QueueBind( q.Name, // queue name "black", // routing key "logs", // exchange false, nil)
binding key的含義取決交換類型,像以前咱們使用的fanout類型就徹底忽略了binding key的價值blog
上一節中咱們的發佈訂閱模式是將全部的消息廣播給消費者,接下來咱們進行擴展:容許根據消息的嚴重性過濾消息。舉個例子,對於嚴重性錯誤的消息咱們直接寫硬盤,對於通常的提醒消息或日誌則不須要浪費硬盤空間rabbitmq
咱們以前使用的fanout交換類型則不具有這些靈活操做,它只可以將消息不加過濾的進行廣播。想要達到上面的靈活性咱們使用direct交換模式來替代fanout,它可以將消息傳遞到binding key跟routing key徹底匹配的隊列隊列
如上圖所示,direct類型的交換器綁定了兩個隊列,第一個隊列使用了綁定鍵是orange,第二個隊列使用了兩個綁定鍵,一個是black,另一個是green路由
在這樣的設置中,使用路由鍵orange發佈到交換器的消息將被路由到隊列Q1,使用路由鍵black或者green則會被路由到隊列Q2,其餘的消息則會被丟棄string
咱們也可使用同一個綁定鍵來綁定交換器跟不一樣隊列,在這種狀況下,direct模式跟fanout有點類似,使用路由鍵black發佈到交換器的消息將被路由到隊列Q1跟Q2it
接下來咱們來實現如下配置的消息系統:
完整的代碼以下所示:
emit_log_direct腳本
package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( "logs_direct", // exchange severityFrom(os.Args), // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 3) || os.Args[2] == "" { s = "hello" } else { s = strings.Join(args[2:], " ") } return s } func severityFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "info" } else { s = os.Args[1] } return s }
receive_logs_direct腳本
package main import ( "fmt" "log" "os" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") if len(os.Args) < 2 { log.Printf("Usage: %s [info] [warning] [error]", os.Args[0]) os.Exit(0) } for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s) err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_direct", // exchange false, nil) failOnError(err, "Failed to bind a queue") } msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }
使用下面命令運行:
go run receive_logs_direct.go info warning error go run receive_logs_direct.go warning error go run emit_log_direct.go error "Run. Run. Or it will explode."