rabbitmq消息隊列——"路由"

在以前的教程中,咱們建立了一個簡單的日誌系統。咱們可以向許多交換器轉發日誌消息。html

在本教程中,咱們將添加一個功能——咱們讓它僅僅接收咱們感興趣的日誌類別。舉例:咱們 實現僅將嚴重級別的錯誤日誌寫入磁盤(爲了節省磁盤空間),其他日誌級別的日誌直接打印到控制檯。git

綁定github

以前的章節中咱們已經建立過綁定,你可能還會記得:算法

err = ch.QueueBind(
  q.Name, // queue name
  "",     // routing key
  "logs", // exchange
  false,
  nil)

綁定是用來維繫交換器和隊列關係的,這能夠被簡單地理解爲:隊列僅僅對從交換器中傳的消息感興趣。3d

綁定有個額外參數叫作routing_key,爲了不與Channel.Publish方法中的參數相混淆,咱們稱之爲binding key(綁定鍵)。使用綁定鍵建立綁定以下:日誌

err = ch.QueueBind(
  q.Name,    // queue name
  "black",   // routing key
  "logs",    // exchange
  false,
  nil)

綁定鍵的含義取決於交換器的類型。咱們以前使用的fanout類型的交換器,就會直接忽略這個參數。htm

Direct型交換器blog

咱們以前的教程中的日誌系統是廣播全部的消息到全部消費者。咱們但願以此拓展來實現根據消息嚴重性來過濾消息。好比咱們但願 寫日誌到硬盤的代碼僅僅接收嚴重級別的,不要浪費磁盤存儲在warning或者info級別的日誌。教程

以前使用的是fanout類型交換器,沒有更好的拓展性或者說靈活性——它只能盲目的廣播。rabbitmq

如今 使用direct型交換器替代。Direct型的路由算法 比較簡單——消息會被派發到某個隊列,該隊列的綁定鍵剛好和消息的路由鍵一致。

爲了闡述,考慮以下設置:

image

該設置中,能夠看到direct型的交換器X被綁定到了兩個隊列:Q一、Q2。Q1使用綁定鍵orange綁定,Q2包含兩個綁定鍵:black和green。

基於如上設置的話,使用路由鍵orange發佈的消息會被路由到Q1隊列,而使用black或者green路由鍵的消息均會被路由到Q2,全部其他消息將被丟棄。

備註:這裏的交換器X和隊列的綁定是多對多的關係,也就是說一個交換器能夠到綁定多個隊列,一個隊列也能夠被多個交換器綁定,消息只會被路由一次,不能由於兩個綁定鍵都匹配上了路由鍵消息就會被路由兩次,這種是不存在的。

多個綁定

image

用相同的綁定鍵去綁定多個隊列是徹底合法的,咱們能夠再添加一個black綁定鍵來綁定X和Q1,這樣Q1和Q2都使用black綁定到了交換器X,這其實和fanout類型的交換器直接綁定到隊列Q一、Q2功能相同:使用black路由鍵的消息會被直接路由到Q1和Q2。

發送日誌

咱們將使用該模型來構建日誌系統。使用direct型的交換器替換fanout型的,咱們將日誌的嚴重級別做爲路由鍵,這樣的話接收端程序能夠選擇日誌接收級別進行接收,首先聚焦下日誌發送端:

首先建立一個交換器:

err = ch.ExchangeDeclare(
  "logs_direct", // name
  "direct",      // type
  true,          // durable
  false,         // auto-deleted
  false,         // internal
  false,         // no-wait
  nil,           // arguments
)

而後是發送消息:

err = ch.ExchangeDeclare(
  "logs_direct", // name
  "direct",      // type
  true,          // durable
  false,         // auto-deleted
  false,         // internal
  false,         // no-wait
  nil,           // arguments
)
failOnError(err, "Failed to declare an exchange")

body := bodyFrom(os.Args)
err = ch.Publish(
  "logs_direct",         // exchange
  severityFrom(os.Args), // routing key
  false, // mandatory
  false, // immediate
  amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte(body),
  })

爲了簡單起見,咱們假設日誌嚴重級別以下:'info', 'warning', 'error'。

訂閱

接收還和以前章節接收同樣,只有一個例外:咱們將爲每個感興趣的嚴重級別建立一個綁定:

q, err := ch.QueueDeclare(
  "",    // name
  false, // durable
  false, // delete when usused
  true,  // exclusive
  false, // no-wait
  nil,   // arguments
)
failOnError(err, "Failed to declare a queue")

if len(os.Args) < 2 {
  log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
  os.Exit(0)
}
for _, s := range os.Args[1:] {
  log.Printf("Binding queue %s to exchange %s with routing key %s",
     q.Name, "logs_direct", s)
  err = ch.QueueBind(
    q.Name,        // queue name
    s,             // routing key
    "logs_direct", // exchange
    false,
    nil)
  failOnError(err, "Failed to bind a queue")
}

糅合在一塊兒

image

發送端:

// rabbitmq_4_emit_log_direct.go project main.go
package main

import (
	"fmt"
	"log"
	"os"
	"strings"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
		panic(fmt.Sprintf("%s: %s", msg, err))
	}
}

func main() {
	//連接隊列服務
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	//聲明一個channel
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	//聲明一個direct類型交換器
	err = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil)
	failOnError(err, "Failed to declare an exchange")

	body := bodyFrom(os.Args)
	ch.Publish("logs_direct", severityFrom(os.Args), false, false, amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(body),
	})
	failOnError(err, "Failed to publish a message")

	log.Printf(" [x] Sent %s", body)

}

//接收消息發送內容
func bodyFrom(args []string) string {
	var s string
	if (len(args) < 3) || os.Args[2] == "" {
		s = "hello"
	} else {
		s = strings.Join(args[2:], " ")
	}
	return s
}

//接收日誌級別,做爲路由鍵使用
func severityFrom(args []string) string {
	var s string
	if len(args) < 2 || args[1] == "" {
		s = "info"
	} else {
		s = args[1]
	}
	return s
}

接收端:

// rabbitmq_4_receive_logs_direct.go project main.go
package main

import (
	"fmt"
	"log"
	"os"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
		panic(fmt.Sprintf("%s: %s", msg, err))
	}
}

func main() {
	//連接隊列服務
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	//聲明一個channel
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	//聲明一個direct類型交換器
	err = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil)
	failOnError(err, "Failed to declare an exchange")

	//聲明一個隊列
	q, err := ch.QueueDeclare("", false, false, true, false, nil)
	failOnError(err, "Failed to declare a queue")

	//判斷cmd窗口接收參數是否足夠
	if len(os.Args) < 2 {
		log.Printf("Usage:%s [info] [warning] [error]", os.Args[0])
		os.Exit(0)
	}

	//cmd窗口輸入的多個日誌級別,分別循環處理—進行綁定
	for _, s := range os.Args[1:] {
		log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s)
		ch.QueueBind(q.Name, s, "logs_direct", false, nil)
		failOnError(err, "Failed to bind a queue")
	}

	msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
	failOnError(err, "Failed to register a consumer")

	forever := make(chan bool)
	go func() {
		for d := range msgs {
			log.Printf(" [x] %s", d.Body)
		}
	}()
	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}

若是您只想保存「警告」和「錯誤」(而不是「信息」)日誌消息到文件,只須要打開一個控制檯而後輸入:

go run receive_logs_direct.go warning error > logs_from_rabbit.log

若是你想看到全部的日誌消息在你的屏幕上,打開一個新的終端,輸入:

go run receive_logs_direct.go info warning error

發出一個錯誤日誌消息類型以下:

go run emit_log_direct.go error "Run. Run. Or it will explode."

能夠觀察到:

消息能夠進行分類接收了, 只有error級別的消息纔會被存入log日誌文件,而info、warning級別的都不存入。

實際效果以下:

image

image

image

相關文章
相關標籤/搜索