本文翻譯自RabbitMQ官網的Go語言客戶端系列教程,本文首發於個人我的博客:liwenzhou.com,共分爲六篇,本文是第一篇——HelloWorld。html
這些教程涵蓋了使用RabbitMQ建立消息傳遞應用程序的基礎知識。
你須要安裝RabbitMQ服務器才能完成這些教程,請參閱安裝指南或使用Docker鏡像。
這些教程的代碼是開源的,官方網站也是如此。git
本教程假設RabbitMQ已安裝並運行在本機上的標準端口(5672)。若是你使用不一樣的主機、端口或憑據,則須要調整鏈接設置。github
RabbitMQ是一個消息代理:它接受並轉發消息。你能夠把它想象成一個郵局:當你把你想要郵寄的郵件放進一個郵箱時,你能夠肯定郵差先生或女士最終會把郵件送到你的收件人那裏。在這個比喻中,RabbitMQ是一個郵箱、一個郵局和一個郵遞員。web
RabbitMQ和郵局的主要區別在於它不處理紙張,而是接受、存儲和轉發二進制數據塊——消息。docker
RabbitMQ和通常的消息傳遞都使用一些術語。小程序
生產僅意味着發送。發送消息的程序是生產者:數組
隊列是位於RabbitMQ內部的郵箱的名稱。儘管消息經過RabbitMQ和你的應用程序流動,但它們只能存儲在隊列中。隊列只受主機內存和磁盤限制的限制,實際上它是一個大的消息緩衝區。許多生產者能夠向一個隊列發送消息,而許多消費者能夠嘗試從一個隊列接收數據。如下是咱們表示隊列的方式:bash
消費與接收具備類似的含義。消費者是一個主要等待接收消息的程序:服務器
請注意,生產者,消費者和代理(broker)沒必要位於同一主機上。實際上,在大多數應用程序中它們不是。一個應用程序既能夠是生產者,也能夠是消費者。異步
(使用Go RabbitMQ客戶端)
在本教程的這一部分中,咱們將在Go中編寫兩個小程序:發送單個消息的生產者和接收消息並將其打印出來的消費者。咱們將忽略Go-RabbitMQ API中的一些細節,只關注很是簡單的事情,以便開始教程。這是一個消息傳遞版的「Hello World」。
在下圖中,「 P」是咱們的生產者,「 C」是咱們的消費者。中間的框是一個隊列——RabbitMQ表明消費者保存的消息緩衝區。
Go RabbitMQ客戶端庫
RabbitMQ講多種協議。本教程使用amqp0-9-1,這是一個開放的、通用的消息傳遞協議。RabbitMQ有許多不一樣語言的客戶端。在本教程中,咱們將使用Go amqp客戶端。
首先,使用
go get
安裝amqpgo get github.com/streadway/amqp
如今安裝好amqp以後,咱們就能夠編寫一些代碼。
咱們將消息發佈者(發送者)稱爲 send.go
,將消息消費者(接收者)稱爲receive.go
。發佈者將鏈接到RabbitMQ,發送一條消息,而後退出。
在send.go
中,咱們須要首先導入庫:
package main import ( "log" "github.com/streadway/amqp" )
咱們還須要一個輔助函數來檢查每一個amqp調用的返回值:
func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }
而後鏈接到RabbitMQ服務器
// 1. 嘗試鏈接RabbitMQ,創建鏈接 // 該鏈接抽象了套接字鏈接,併爲咱們處理協議版本協商和認證等。 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close()
鏈接抽象了socket鏈接,併爲咱們處理協議版本協商和認證等。接下來,咱們建立一個通道,這是大多數用於完成任務的API所在的位置:
// 2. 接下來,咱們建立一個通道,大多數API都是用過該通道操做的。 ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close()
要發送,咱們必須聲明要發送到的隊列。而後咱們能夠將消息發佈到隊列:
// 3. 聲明消息要發送到的隊列 q, err := ch.QueueDeclare( "hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") body := "Hello World!" // 4.將消息發佈到聲明的隊列 err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing { ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message")
聲明隊列是冪等的——僅當隊列不存在時才建立。消息內容是一個字節數組,所以你能夠在此處編碼任何內容。
上面是咱們的發佈者。咱們的消費者監聽來自RabbitMQ的消息,所以與發佈單個消息的發佈者不一樣,咱們將使消費者保持運行狀態以監聽消息並打印出來。
該代碼(在receive.go
中)具備與send
相同的導入和幫助功能:
package main import ( "log" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }
設置與發佈者相同;咱們打開一個鏈接和一個通道,並聲明要消耗的隊列。請注意,這與send
發佈到的隊列匹配。
// 創建鏈接 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // 獲取channel ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 聲明隊列 q, err := ch.QueueDeclare( "hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue")
請注意,咱們也在這裏聲明隊列。由於咱們可能在發佈者以前啓動使用者,因此咱們但願在嘗試使用隊列中的消息以前確保隊列存在。
咱們將告訴服務器將隊列中的消息傳遞給咱們。因爲它將異步地向咱們發送消息,所以咱們將在goroutine中從通道(由amqp::Consume
返回)中讀取消息。
// 獲取接收消息的Delivery通道 msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever
如今咱們能夠運行兩個腳本。在一個終端窗口,運行發佈者:
go run send.go
而後,運行使用者:
go run receive.go
消費者將打印經過RabbitMQ從發佈者那裏獲得的消息。使用者將持續運行,等待消息(使用Ctrl-C中止它),所以請嘗試從另外一個終端運行發佈者。
若是要檢查隊列,請嘗試使用rabbitmqctl list_queues
命令。
接下來該繼續教程的第二部分並創建一個簡單的任務隊列。