RabbitMQ消息傳遞模型的核心思想是,生產者永遠不會將任何消息直接發送到隊列,實際上,一般生產者甚至不知道消息是否被傳遞到某個隊列。git
相反,生產者只能向交換器發送消息。交換器一邊接收來自生產者發佈的消息一邊將消息放入到隊列當中。能夠經過exchangeType來設置交換器對消息的處理,好比拼接到指定的隊列,或是拼接到多個隊列中,或是丟棄。github
exchange Type有如下幾種:direct,topic,headers,fanout。咱們先使用最後一種建立相應的交換器並取名logs:日誌
err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments )
fanout模式就是廣播全部接收到的消息到它已知的全部隊列當中code
使用如下命令能夠羅列RabbitMQ中全部的交換器: sudo rabbitmqctl list_exchanges
在以前的例子中咱們沒有使用交換器可是依舊能夠發送消息到隊列當中,說明咱們已經使用了默認的交換器,咱們能夠看下之前的代碼:router
err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })
在這裏咱們使用了默認的交換器:消息將被依據routering_key指定的名字路由到隊列中.rabbitmq
一旦咱們定義好了交換器,則能夠在生產者發送消息的時候使用:隊列
err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( "logs", // exchange "", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })
咱們想要獲取全部日誌消息不僅是子集,同時咱們只對當前的信息流感興趣,爲了解決這個問題咱們須要兩個東西:路由
首先,咱們須要一個新的空的隊列無論咱們是否有連接Rabbit,咱們可使用一個隨機名字建立一個隊列,或是讓系統指定給咱們string
其次,一旦咱們斷開與消費者的連接,隊列必須自動刪除。it
在amqp客戶端中,當咱們使用一個空的名字建立一個隊列的時候:
q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive false, // no-wait nil, // arguments )
當咱們獲得其返回的隊列的時候,隊列實例將會包含一個由RabbitMQ產生的名字,差很少這個樣子:amq.gen-JzTY20BRgKO-HjmUJj0wLg
當咱們連接關閉的時候,隊列將被刪除由於它被聲明爲exclusive
在前面咱們已經建立了一個fanout類型的交換器和一個隊列,接下來咱們咱們須要讓交換器將消息發送到咱們隊列中,將交換器(exchange)和隊列(queue)關聯起來稱爲綁定
err = ch.QueueBind( q.Name, // 隊列名 name "", // routing key "logs", // 交換器名 false, nil )
通過以上關聯以後,logs交換器就會將消息拼接到咱們的隊列當中。
羅列出全部的綁定: rabbitmqctl list_bindings
完整代碼以下:
emit.go
package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( "logs", // exchange "", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s }
receive.go
package main import ( "github.com/streadway/amqp" "log" ) func main() { conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/") DealWithError(err,"Failed to connect to RabbitMQ") defer conn.Close() ch,err := conn.Channel() DealWithError(err,"Failed to open a channel") defer ch.Close() //聲明交換器 ch.ExchangeDeclare( "logs", "fanout", true, false, false, false, nil, ) DealWithError(err,"Failed to declare an exchange") //聲明瞭隊列 q,err := ch.QueueDeclare( "", //隊列名字爲rabbitMQ自動生成 false, false, true, false, nil, ) DealWithError(err,"Failed to declare an exchange") //交換器跟隊列進行綁定,交換器將接收到的消息放進隊列中 err = ch.QueueBind( q.Name, "", "logs", false, nil, ) DealWithError(err,"Failed to bind a queue") msgs,err := ch.Consume( q.Name, "", true, false, false, false, nil, ) DealWithError(err,"Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs{ log.Printf(" [x] %s",d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever } func DealWithError(err error,msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }