雖然 rabbitmq 沒有延時隊列的功能,可是稍微變更一下也是能夠實現的git
基於第一點,我利用的是消息存在過時時間這一特性, 消息一旦過時就會變成dead letter
,能夠讓單獨的消息過時,也能夠設置整個隊列消息的過時時間
而rabbitmq
會有限取兩個值的最小值github
基於第二點,是用到了rabbitmq
的過時消息處理機制:
. x-dead-letter-exchange
將過時的消息發送到指定的 exchange
中
. x-dead-letter-routing-key
將過時的消息發送到自定的 route
當中golang
在這裏例子當中,我使用的是 過時消息+轉發指定exchangebash
首先是消費者comsumer.go
工具
package main import ( "log" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { // 創建連接 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 聲明一個主要使用的 exchange err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") // 聲明一個常規的隊列, 其實這個也不必聲明,由於 exchange 會默認綁定一個隊列 q, err := ch.QueueDeclare( "test_logs", // name false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") /** * 注意,這裏是重點!!!!! * 聲明一個延時隊列, ß咱們的延時消息就是要發送到這裏 */ _, errDelay := ch.QueueDeclare( "test_delay", // name false, // durable false, // delete when unused true, // exclusive false, // no-wait amqp.Table{ // 當消息過時時把消息發送到 logs 這個 exchange "x-dead-letter-exchange":"logs", }, // arguments ) failOnError(errDelay, "Failed to declare a delay_queue") err = ch.QueueBind( q.Name, // queue name, 這裏指的是 test_logs "", // routing key "logs", // exchange false, nil) failOnError(err, "Failed to bind a queue") // 這裏監聽的是 test_logs msgs, err := ch.Consume( q.Name, // queue name, 這裏指的是 test_logs "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }
而後是生產者productor.go
code
package main import ( "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() body := bodyFrom(os.Args) // 將消息發送到延時隊列上 err = ch.Publish( "", // exchange 這裏爲空則不選擇 exchange "test_delay", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), Expiration: "5000", // 設置五秒的過時時間 }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s }
運行一下:blog
go run comsumer.go go run productor.go
具體看代碼和註釋就行, 這裏的關鍵點就是將要延時的消息發送到過時隊列當中, 而後監聽的是過時隊列轉發到的 exchange 下的隊列
正常狀況就是始終監聽一個隊列,而後把過時消息發送到延時隊列中,當消息到達時間後就把消息發到正在監聽的隊列rabbitmq