基於 rabbitmq 實現的延時隊列

雖然 rabbitmq 沒有延時隊列的功能,可是稍微變更一下也是能夠實現的git

實現延時隊列的基本要素

  1. 存在一個倒計時機制:Time To Live(TTL)
  2. 當到達時間點的時候會觸發一個發送消息的事件:Dead Letter Exchanges(DLX)

基於第一點,我利用的是消息存在過時時間這一特性, 消息一旦過時就會變成dead letter,能夠讓單獨的消息過時,也能夠設置整個隊列消息的過時時間
rabbitmq會有限取兩個值的最小值github

基於第二點,是用到了rabbitmq的過時消息處理機制:
. x-dead-letter-exchange 將過時的消息發送到指定的 exchange
. x-dead-letter-routing-key 將過時的消息發送到自定的 route當中golang

在這裏例子當中,我使用的是 過時消息+轉發指定exchangebash

在 golang 中的實現

首先是消費者comsumer.go工具

package main

import (
    "log"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}


func main() {
    // 創建連接
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    // 聲明一個主要使用的 exchange
    err = ch.ExchangeDeclare(
        "logs",   // name
        "fanout", // type
        true,     // durable
        false,    // auto-deleted
        false,    // internal
        false,    // no-wait
        nil,      // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    // 聲明一個常規的隊列, 其實這個也不必聲明,由於 exchange 會默認綁定一個隊列
    q, err := ch.QueueDeclare(
        "test_logs",    // name
        false, // durable
        false, // delete when unused
        true,  // exclusive
        false, // no-wait
        nil,   // arguments
    )
    failOnError(err, "Failed to declare a queue")

    /**
     * 注意,這裏是重點!!!!!
     * 聲明一個延時隊列, ß咱們的延時消息就是要發送到這裏
     */
    _, errDelay := ch.QueueDeclare(
        "test_delay",    // name
        false, // durable
        false, // delete when unused
        true,  // exclusive
        false, // no-wait
        amqp.Table{
            // 當消息過時時把消息發送到 logs 這個 exchange
            "x-dead-letter-exchange":"logs",
        },   // arguments
    )
    failOnError(errDelay, "Failed to declare a delay_queue")

    err = ch.QueueBind(
        q.Name, // queue name, 這裏指的是 test_logs
        "",     // routing key
        "logs", // exchange
        false,
        nil)
    failOnError(err, "Failed to bind a queue")

    // 這裏監聽的是 test_logs
    msgs, err := ch.Consume(
        q.Name, // queue name, 這裏指的是 test_logs
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf(" [x] %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
    <-forever
}

而後是生產者productor.gocode

package main

import (
    "log"
    "os"
    "strings"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    body := bodyFrom(os.Args)
    // 將消息發送到延時隊列上
    err = ch.Publish(
        "",                 // exchange 這裏爲空則不選擇 exchange
        "test_delay",         // routing key
        false,              // mandatory
        false,              // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
            Expiration: "5000",    // 設置五秒的過時時間
        })
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hello"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

運行一下:blog

go run comsumer.go
go run productor.go

具體看代碼和註釋就行, 這裏的關鍵點就是將要延時的消息發送到過時隊列當中, 而後監聽的是過時隊列轉發到的 exchange 下的隊列
正常狀況就是始終監聽一個隊列,而後把過時消息發送到延時隊列中,當消息到達時間後就把消息發到正在監聽的隊列rabbitmq

一個本身寫的mq工具
博客原文隊列

相關文章
相關標籤/搜索