Go RabbitMQ(三)發佈訂閱模式

RabbitMQ

  • 在上一節中咱們建立了工做隊列,而且假設每個任務都可以準確的到達對應的worker。在本節中咱們將介紹如何將一個消息傳遞到多個消費者,這也就是所說的發佈訂閱模式
  • 爲了驗證該模式咱們使用兩個創建一個簡單的打印系統,一個負責發出消息,另外一個負責接收並打印。在該系統多個receiver中,其中一個直接將日誌寫入到硬盤,另外一個負責從屏幕上查看日誌
  • 在以前的簡介中,咱們能夠做如下簡單總結:
    • 生產者負責發送消息
    • 隊列是一個存儲消息的緩衝區
    • 消費者負責接收消息

RabbitMQ消息傳遞模型的核心思想是,生產者永遠不會將任何消息直接發送到隊列,實際上,一般生產者甚至不知道消息是否被傳遞到某個隊列。git

相反,生產者只能向交換器發送消息。交換器一邊接收來自生產者發佈的消息一邊將消息放入到隊列當中。能夠經過exchangeType來設置交換器對消息的處理,好比拼接到指定的隊列,或是拼接到多個隊列中,或是丟棄。github

exchange Type有如下幾種:direct,topic,headers,fanout。咱們先使用最後一種建立相應的交換器並取名logs:日誌

err = ch.ExchangeDeclare(
  "logs",   // name
  "fanout", // type
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)

fanout模式就是廣播全部接收到的消息到它已知的全部隊列當中code

使用如下命令能夠羅列RabbitMQ中全部的交換器:
sudo rabbitmqctl list_exchanges

在以前的例子中咱們沒有使用交換器可是依舊能夠發送消息到隊列當中,說明咱們已經使用了默認的交換器,咱們能夠看下之前的代碼:router

err = ch.Publish(
  "",     // exchange
  q.Name, // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte(body),
})

在這裏咱們使用了默認的交換器:消息將被依據routering_key指定的名字路由到隊列中.rabbitmq

一旦咱們定義好了交換器,則能夠在生產者發送消息的時候使用:隊列

err = ch.ExchangeDeclare(
  "logs",   // name
  "fanout", // type
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)
failOnError(err, "Failed to declare an exchange")

body := bodyFrom(os.Args)
err = ch.Publish(
  "logs", // exchange
  "",     // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing{
          ContentType: "text/plain",
          Body:        []byte(body),
  })

臨時隊列

咱們想要獲取全部日誌消息不僅是子集,同時咱們只對當前的信息流感興趣,爲了解決這個問題咱們須要兩個東西:路由

首先,咱們須要一個新的空的隊列無論咱們是否有連接Rabbit,咱們可使用一個隨機名字建立一個隊列,或是讓系統指定給咱們string

其次,一旦咱們斷開與消費者的連接,隊列必須自動刪除。it

在amqp客戶端中,當咱們使用一個空的名字建立一個隊列的時候:

q, err := ch.QueueDeclare(
  "",    // name
  false, // durable
  false, // delete when usused
  true,  // exclusive
  false, // no-wait
  nil,   // arguments
)

當咱們獲得其返回的隊列的時候,隊列實例將會包含一個由RabbitMQ產生的名字,差很少這個樣子:amq.gen-JzTY20BRgKO-HjmUJj0wLg
當咱們連接關閉的時候,隊列將被刪除由於它被聲明爲exclusive

綁定

在前面咱們已經建立了一個fanout類型的交換器和一個隊列,接下來咱們咱們須要讓交換器將消息發送到咱們隊列中,將交換器(exchange)和隊列(queue)關聯起來稱爲綁定

err = ch.QueueBind(
  q.Name, // 隊列名 name
  "",     // routing key
  "logs", // 交換器名
  false,
  nil
)

通過以上關聯以後,logs交換器就會將消息拼接到咱們的隊列當中。

羅列出全部的綁定:
rabbitmqctl list_bindings

完整代碼以下:

emit.go

package main

import (
        "fmt"
        "log"
        "os"
        "strings"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "logs",   // name
                "fanout", // type
                true,     // durable
                false,    // auto-deleted
                false,    // internal
                false,    // no-wait
                nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        body := bodyFrom(os.Args)
        err = ch.Publish(
                "logs", // exchange
                "",     // routing key
                false,  // mandatory
                false,  // immediate
                amqp.Publishing{
                        ContentType: "text/plain",
                        Body:        []byte(body),
                })
        failOnError(err, "Failed to publish a message")

        log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

receive.go

package main

import (
    "github.com/streadway/amqp"
    "log"
)

func main() {
    conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    DealWithError(err,"Failed to connect to RabbitMQ")
    defer conn.Close()

    ch,err := conn.Channel()
    DealWithError(err,"Failed to open a channel")
    defer ch.Close()
    //聲明交換器
    ch.ExchangeDeclare(
        "logs",
        "fanout",
        true,
        false,
        false,
        false,
        nil,
        )
    DealWithError(err,"Failed to declare an exchange")
    //聲明瞭隊列
    q,err := ch.QueueDeclare(
        "", //隊列名字爲rabbitMQ自動生成
        false,
        false,
        true,
        false,
        nil,
        )
    DealWithError(err,"Failed to declare an exchange")
    //交換器跟隊列進行綁定,交換器將接收到的消息放進隊列中
    err = ch.QueueBind(
        q.Name,
        "",
        "logs",
        false,
        nil,
        )
    DealWithError(err,"Failed to bind a queue")
    msgs,err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
        )
    DealWithError(err,"Failed to register a consumer")
    forever := make(chan bool)
    go func() {
        for d := range msgs{
            log.Printf(" [x] %s",d.Body)
        }
    }()
    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
    <-forever
}

func DealWithError(err error,msg string)  {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}
相關文章
相關標籤/搜索