RabbitMQ Go客戶端教程1——HelloWorld

本文翻譯自RabbitMQ官網的Go語言客戶端系列教程,本文首發於個人我的博客:liwenzhou.com,共分爲六篇,本文是第一篇——HelloWorld。html

這些教程涵蓋了使用RabbitMQ建立消息傳遞應用程序的基礎知識。
你須要安裝RabbitMQ服務器才能完成這些教程,請參閱安裝指南或使用Docker鏡像
這些教程的代碼是開源的,官方網站也是如此。git

先決條件

本教程假設RabbitMQ已安裝並運行在本機上的標準端口(5672)。若是你使用不一樣的主機、端口或憑據,則須要調整鏈接設置。github

RabbitMQ Go語言客戶端教程(一)

介紹

RabbitMQ是一個消息代理:它接受並轉發消息。你能夠把它想象成一個郵局:當你把你想要郵寄的郵件放進一個郵箱時,你能夠肯定郵差先生或女士最終會把郵件送到你的收件人那裏。在這個比喻中,RabbitMQ是一個郵箱、一個郵局和一個郵遞員。web

RabbitMQ和郵局的主要區別在於它不處理紙張,而是接受、存儲和轉發二進制數據塊——消息。docker

RabbitMQ和通常的消息傳遞都使用一些術語。小程序

  • 生產僅意味着發送。發送消息的程序是生產者:數組

    img

  • 隊列是位於RabbitMQ內部的郵箱的名稱。儘管消息經過RabbitMQ和你的應用程序流動,但它們只能存儲在隊列中。隊列只受主機內存和磁盤限制的限制,實際上它是一個大的消息緩衝區。許多生產者能夠向一個隊列發送消息,而許多消費者能夠嘗試從一個隊列接收數據。如下是咱們表示隊列的方式:bash

    img

  • 消費與接收具備類似的含義。消費者是一個主要等待接收消息的程序:服務器

    img

請注意,生產者,消費者和代理(broker)沒必要位於同一主機上。實際上,在大多數應用程序中它們不是。一個應用程序既能夠是生產者,也能夠是消費者。異步

"Hello World"

(使用Go RabbitMQ客戶端)

在本教程的這一部分中,咱們將在Go中編寫兩個小程序:發送單個消息的生產者和接收消息並將其打印出來的消費者。咱們將忽略Go-RabbitMQ API中的一些細節,只關注很是簡單的事情,以便開始教程。這是一個消息傳遞版的「Hello World」。

在下圖中,「 P」是咱們的生產者,「 C」是咱們的消費者。中間的框是一個隊列——RabbitMQ表明消費者保存的消息緩衝區。

(P) -> [|||] -> (C)

Go RabbitMQ客戶端庫

RabbitMQ講多種協議。本教程使用amqp0-9-1,這是一個開放的、通用的消息傳遞協議。RabbitMQ有許多不一樣語言的客戶端。在本教程中,咱們將使用Go amqp客戶端。

首先,使用go get安裝amqp

go get github.com/streadway/amqp

如今安裝好amqp以後,咱們就能夠編寫一些代碼。

發送

(P) -> [|||]

咱們將消息發佈者(發送者)稱爲 send.go,將消息消費者(接收者)稱爲receive.go。發佈者將鏈接到RabbitMQ,發送一條消息,而後退出。

send.go中,咱們須要首先導入庫:

package main

import (
  "log"

  "github.com/streadway/amqp"
)

咱們還須要一個輔助函數來檢查每一個amqp調用的返回值:

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
  }
}

而後鏈接到RabbitMQ服務器

// 1. 嘗試鏈接RabbitMQ,創建鏈接
// 該鏈接抽象了套接字鏈接,併爲咱們處理協議版本協商和認證等。
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

鏈接抽象了socket鏈接,併爲咱們處理協議版本協商和認證等。接下來,咱們建立一個通道,這是大多數用於完成任務的API所在的位置:

// 2. 接下來,咱們建立一個通道,大多數API都是用過該通道操做的。
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

要發送,咱們必須聲明要發送到的隊列。而後咱們能夠將消息發佈到隊列:

// 3. 聲明消息要發送到的隊列
q, err := ch.QueueDeclare(
  "hello", // name
  false,   // durable
  false,   // delete when unused
  false,   // exclusive
  false,   // no-wait
  nil,     // arguments
)
failOnError(err, "Failed to declare a queue")

body := "Hello World!"
// 4.將消息發佈到聲明的隊列
err = ch.Publish(
  "",     // exchange
  q.Name, // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing {
    ContentType: "text/plain",
    Body:        []byte(body),
  })
failOnError(err, "Failed to publish a message")

聲明隊列是冪等的——僅當隊列不存在時才建立。消息內容是一個字節數組,所以你能夠在此處編碼任何內容。

點擊查看完整的send.go文件

接收

上面是咱們的發佈者。咱們的消費者監聽來自RabbitMQ的消息,所以與發佈單個消息的發佈者不一樣,咱們將使消費者保持運行狀態以監聽消息並打印出來。

[|||] -> (C)

該代碼(在receive.go中)具備與send相同的導入和幫助功能:

package main

import (
  "log"

  "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
  }
}

設置與發佈者相同;咱們打開一個鏈接和一個通道,並聲明要消耗的隊列。請注意,這與send發佈到的隊列匹配。

// 創建鏈接
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

// 獲取channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

// 聲明隊列
q, err := ch.QueueDeclare(
  "hello", // name
  false,   // durable
  false,   // delete when unused
  false,   // exclusive
  false,   // no-wait
  nil,     // arguments
)
failOnError(err, "Failed to declare a queue")

請注意,咱們也在這裏聲明隊列。由於咱們可能在發佈者以前啓動使用者,因此咱們但願在嘗試使用隊列中的消息以前確保隊列存在。

咱們將告訴服務器將隊列中的消息傳遞給咱們。因爲它將異步地向咱們發送消息,所以咱們將在goroutine中從通道(由amqp::Consume返回)中讀取消息。

// 獲取接收消息的Delivery通道
msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  true,   // auto-ack
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
  for d := range msgs {
    log.Printf("Received a message: %s", d.Body)
  }
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

點擊完整的receive.go腳本

完整示例

如今咱們能夠運行兩個腳本。在一個終端窗口,運行發佈者:

go run send.go

而後,運行使用者:

go run receive.go

消費者將打印經過RabbitMQ從發佈者那裏獲得的消息。使用者將持續運行,等待消息(使用Ctrl-C中止它),所以請嘗試從另外一個終端運行發佈者。

若是要檢查隊列,請嘗試使用rabbitmqctl list_queues命令。

接下來該繼續教程的第二部分並創建一個簡單的任務隊列。

相關文章
相關標籤/搜索