Go RabbitMQ(五)主題

RabbitMQ topic

在以前咱們將交換器的類型從fanout設置爲direct後可以根據咱們的選擇得到響應的消息,雖然改良咱們的消息日誌系統,可是還有不少侷限性,好比它不能基於多個標準進行路由ui

在咱們的日誌系統中咱們可能不單單是依據消息的嚴重性進行訂閱,還有可能同時基於消息的危險等級和消息來源,好比咱們監聽來自cron的危險錯誤和來自kern的全部日誌。經過topic咱們能夠來實現以上功能日誌

主題交換器(topic exchange)

消息若是發送到主題交換器的話不能使用任何的routing_key,它必須是由點分隔的單詞列表。單詞能夠是任意的,但一般它們是與消息相關的一些特性code

binding key必須是具備相同格式,topic交換器背後的邏輯跟direct交換器的邏輯相似,一個指定了routing key的消息將會被投遞到全部使用binding key並與routing key 相匹配的隊列中。binding key有兩種特殊狀況:blog

  • * 能夠表明代替一個單詞
  • # 能夠代替0個或多個單詞

在本例子中,咱們將發送全部描述動物的消息,這些消息將使用由三個單詞(兩個點)組成的routing_key發送。routing_key第一個單詞描述速度,第二個表示顏色,第三個表示物種rabbitmq

隊列Q1使用binding_key:*.orange.*,隊列Q2使用binding_key*.*.rabbitlazy.#。總結以下:隊列

  • Q1隊列將會接收全部orange的動物,好比quick.orange.rabbit,lazy.orange.elephant,quick.orange.fox,
  • Q2隊列會接收全部跟rabbit相關和lazy類型的動物,好比quick.orange.rabbit,lazy.orange.elephant,lazy.brown.fox,lazy.pink.rabbit
  • quick.brown.fox跟以上兩個隊列的routing_key都不匹配因此該消息會被丟棄
  • lazy.orange.male.rabbit則只能匹配隊列Q2
topic 交換器很是靈活而且能夠表現爲其餘交換器
好比設置隊列的binding_key爲#,則隊列會接收全部的消息無論routing_key是什麼
當binding_key不使用特殊字段`*`和`#`的時候,此時topic交換器跟direct交換器同樣

完整代碼以下:

  • emitLogsTopic.go
func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "logs_topic", // name
                "topic",      // type
                true,         // durable
                false,        // auto-deleted
                false,        // internal
                false,        // no-wait
                nil,          // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        body := bodyFrom(os.Args)
        err = ch.Publish(
                "logs_topic",          // exchange
                severityFrom(os.Args), // routing key
                false, // mandatory
                false, // immediate
                amqp.Publishing{
                        ContentType: "text/plain",
                        Body:        []byte(body),
                })
        failOnError(err, "Failed to publish a message")

        log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 3) || os.Args[2] == "" {
                s = "hello"
        } else {
                s = strings.Join(args[2:], " ")
        }
        return s
}

func severityFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "anonymous.info"
        } else {
                s = os.Args[1]
        }
        return s
}
  • receiveLogsTopic.go
func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "logs_topic", // name
                "topic",      // type
                true,         // durable
                false,        // auto-deleted
                false,        // internal
                false,        // no-wait
                nil,          // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        q, err := ch.QueueDeclare(
                "",    // name
                false, // durable
                false, // delete when usused
                true,  // exclusive
                false, // no-wait
                nil,   // arguments
        )
        failOnError(err, "Failed to declare a queue")

        if len(os.Args) < 2 {
                log.Printf("Usage: %s [binding_key]...", os.Args[0])
                os.Exit(0)
        }
        for _, s := range os.Args[1:] {
                log.Printf("Binding queue %s to exchange %s with routing key %s",
                        q.Name, "logs_topic", s)
                err = ch.QueueBind(
                        q.Name,       // queue name
                        s,            // routing key
                        "logs_topic", // exchange
                        false,
                        nil)
                failOnError(err, "Failed to bind a queue")
        }

        msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                true,   // auto ack
                false,  // exclusive
                false,  // no local
                false,  // no wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
                for d := range msgs {
                        log.Printf(" [x] %s", d.Body)
                }
        }()

        log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
        <-forever
}
  • 接收全部的日誌:go run receiveLogsTopic.go "#"
  • 接收來自kern的全部日誌:go run receiveLogsTopic.go "kern.*"
  • 接收關於critical的日誌:go run receiveLogsTopic.go "*.critical"
  • 建立多個binding:go run receiveLogsTopic.go "kern.*" "*.critical"
  • 啓動發送消息的腳本:go run receiveLogsTopic.go "kern.critical" "A critical kernel error"
相關文章
相關標籤/搜索