在以前咱們將交換器的類型從fanout設置爲direct後可以根據咱們的選擇得到響應的消息,雖然改良咱們的消息日誌系統,可是還有不少侷限性,好比它不能基於多個標準進行路由ui
在咱們的日誌系統中咱們可能不單單是依據消息的嚴重性進行訂閱,還有可能同時基於消息的危險等級和消息來源,好比咱們監聽來自cron的危險錯誤和來自kern的全部日誌。經過topic咱們能夠來實現以上功能日誌
消息若是發送到主題交換器的話不能使用任何的routing_key,它必須是由點分隔的單詞列表。單詞能夠是任意的,但一般它們是與消息相關的一些特性code
binding key必須是具備相同格式,topic交換器背後的邏輯跟direct交換器的邏輯相似,一個指定了routing key的消息將會被投遞到全部使用binding key並與routing key 相匹配的隊列中。binding key有兩種特殊狀況:blog
*
能夠表明代替一個單詞#
能夠代替0個或多個單詞在本例子中,咱們將發送全部描述動物的消息,這些消息將使用由三個單詞(兩個點)組成的routing_key發送。routing_key第一個單詞描述速度,第二個表示顏色,第三個表示物種rabbitmq
隊列Q1使用binding_key:*.orange.*
,隊列Q2使用binding_key*.*.rabbit
和lazy.#
。總結以下:隊列
quick.orange.rabbit
,lazy.orange.elephant
,quick.orange.fox
,quick.orange.rabbit
,lazy.orange.elephant
,lazy.brown.fox
,lazy.pink.rabbit
quick.brown.fox
跟以上兩個隊列的routing_key都不匹配因此該消息會被丟棄lazy.orange.male.rabbit
則只能匹配隊列Q2topic 交換器很是靈活而且能夠表現爲其餘交換器 好比設置隊列的binding_key爲#,則隊列會接收全部的消息無論routing_key是什麼 當binding_key不使用特殊字段`*`和`#`的時候,此時topic交換器跟direct交換器同樣
func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs_topic", // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( "logs_topic", // exchange severityFrom(os.Args), // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 3) || os.Args[2] == "" { s = "hello" } else { s = strings.Join(args[2:], " ") } return s } func severityFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "anonymous.info" } else { s = os.Args[1] } return s }
func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs_topic", // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") if len(os.Args) < 2 { log.Printf("Usage: %s [binding_key]...", os.Args[0]) os.Exit(0) } for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_topic", s) err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_topic", // exchange false, nil) failOnError(err, "Failed to bind a queue") } msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }
go run receiveLogsTopic.go "#"
go run receiveLogsTopic.go "kern.*"
go run receiveLogsTopic.go "*.critical"
go run receiveLogsTopic.go "kern.*" "*.critical"
go run receiveLogsTopic.go "kern.critical" "A critical kernel error"