RabbitMQ Go客戶端教程6——RPC

本文翻譯自RabbitMQ官網的Go語言客戶端系列教程,本文首發於個人我的博客:liwenzhou.com,教程共分爲六篇,本文是第六篇——RPC。html

這些教程涵蓋了使用RabbitMQ建立消息傳遞應用程序的基礎知識。
你須要安裝RabbitMQ服務器才能完成這些教程,請參閱安裝指南或使用Docker鏡像
這些教程的代碼是開源的,官方網站也是如此。git

先決條件

本教程假設RabbitMQ已安裝並運行在本機上的標準端口(5672)。若是你使用不一樣的主機、端口或憑據,則須要調整鏈接設置。程序員

遠程過程調用(RPC)

(使用Go RabbitMQ客戶端)github

在第二個教程中,咱們學習瞭如何使用工做隊列在多個worker之間分配耗時的任務。web

可是,若是咱們須要在遠程計算機上運行函數並等待結果怎麼辦?好吧,那是一個不一樣的故事。這種模式一般稱爲遠程過程調用RPCdocker

在本教程中,咱們將使用RabbitMQ構建一個RPC系統:客戶端和可伸縮RPC服務器。因爲咱們沒有值得分配的耗時任務,所以咱們將建立一個虛擬RPC服務,該服務返回斐波那契數。json

有關RPC的說明bash

儘管RPC是計算中很是常見的模式,但它常常受到批評。服務器

當程序員不知道函數調用是本地的仍是緩慢的RPC時,就會出現問題。這樣的混亂會致使系統變幻莫測,並給調試增長了沒必要要的複雜性。濫用RPC可能會致使沒法維護的意大利麪條式代碼而不是簡化軟件,網絡

牢記這一點,請考慮如下建議:

  • 肯定哪一個函數調用是本地的,哪一個是遠程的。
  • 爲你的系統編寫文檔。明確組件之間的依賴關係。
  • 處理錯誤狀況。 當RPC服務器長時間關閉時,客戶端應如何處理?

回調隊列

一般,經過RabbitMQ進行RPC很容易。客戶端發送請求消息,服務器發送響應消息。爲了接收響應,咱們須要發送帶有「回調」隊列地址的請求。咱們可使用默認隊列。讓咱們嘗試一下:

q, err := ch.QueueDeclare(
  "",    // 不指定隊列名,默認使用隨機生成的隊列名
  false, // durable
  false, // delete when unused
  true,  // exclusive
  false, // noWait
  nil,   // arguments
)

err = ch.Publish(
  "",          // exchange
  "rpc_queue", // routing key
  false,       // mandatory
  false,       // immediate
  amqp.Publishing{
    ContentType:   "text/plain",
    CorrelationId: corrId,
    ReplyTo:       q.Name,  // 在這裏指定callback隊列名,也是在這個隊列等回覆
    Body:          []byte(strconv.Itoa(n)),
})

消息屬性

AMQP 0-9-1協議預約義了消息附帶的14個屬性集。除如下屬性外,大多數屬性不多使用:

  • persistent:將消息標記爲持久性(值爲true)或瞬態(false)。你可能還記得第二個教程中的此屬性。
  • content_type:用於描述編碼的mime類型。例如,對於常用的JSON編碼,將此屬性設置爲application/ json是一個好習慣。
  • reply_to:經常使用於命名回調隊列
  • correlation_id:有助於將RPC響應與請求相關聯

關聯ID(Correlation Id)

在上面介紹的方法中,咱們建議爲每一個RPC請求建立一個回調隊列。這是至關低效的,可是幸運的是,有一種更好的方法——讓咱們爲每一個客戶端建立一個回調隊列。

這就引起了一個新問題,在該隊列中收到響應後,尚不清楚響應屬於哪一個請求。這個時候就該使用correlation_id這個屬性了。針對每一個請求咱們將爲其設置一個惟一值。隨後,當咱們在回調隊列中收到消息時,咱們將查看該屬性,並基於這個屬性將響應與請求進行匹配。若是咱們看到未知的correlation_id值,則能夠放心地丟棄該消息——它不屬於咱們的請求。

你可能會問,爲何咱們應該忽略回調隊列中的未知消息,而不是報錯而失敗?這是因爲服務器端可能出現競爭情況。儘管可能性不大,但RPC服務器可能會在向咱們發送答案以後但在發送請求的確認消息以前死亡。若是發生這種狀況,從新啓動的RPC服務器將再次處理該請求。這就是爲何在客戶端上咱們必須妥善處理重複的響應,而且理想狀況下RPC應該是冪等的。

總結

img

咱們的RPC工做流程以下:

  • 客戶端啓動時,它將建立一個匿名排他回調隊列。
  • 對於RPC請求,客戶端發送一條消息,該消息具備兩個屬性:reply_to(設置爲回調隊列)和correlation_id(設置爲每一個請求的惟一值)。
  • 該請求被髮送到rpc_queue隊列。
  • RPC工做程序(又名:服務器)正在等待該隊列上的請求。當出現請求時,它會完成計算工做並把結果做爲消息使用replay_to字段中的隊列發回給客戶端。
  • 客戶端等待回調隊列上的數據。出現消息時,它將檢查correlation_id屬性。若是它與請求中的值匹配,則將響應返回給應用程序。

完整示例

斐波那契函數:

func fib(n int) int {
        if n == 0 {
                return 0
        } else if n == 1 {
                return 1
        } else {
                return fib(n-1) + fib(n-2)
        }
}

聲明咱們的斐波那契函數。它僅假設有效的正整數輸入。 (不要期望這種方法適用於大量用戶,它多是最慢的遞歸實現)。

咱們的RPC服務器rpc_server.go的代碼以下所示:

package main

import (
        "log"
        "strconv"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func fib(n int) int {
        if n == 0 {
                return 0
        } else if n == 1 {
                return 1
        } else {
                return fib(n-1) + fib(n-2)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "rpc_queue", // name
                false,       // durable
                false,       // delete when unused
                false,       // exclusive
                false,       // no-wait
                nil,         // arguments
        )
        failOnError(err, "Failed to declare a queue")

        err = ch.Qos(
                1,     // prefetch count
                0,     // prefetch size
                false, // global
        )
        failOnError(err, "Failed to set QoS")

        msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                false,  // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
                for d := range msgs {
                        n, err := strconv.Atoi(string(d.Body))
                        failOnError(err, "Failed to convert body to integer")

                        log.Printf(" [.] fib(%d)", n)
                        response := fib(n)

                        err = ch.Publish(
                                "",        // exchange
                                d.ReplyTo, // routing key
                                false,     // mandatory
                                false,     // immediate
                                amqp.Publishing{
                                        ContentType:   "text/plain",
                                        CorrelationId: d.CorrelationId,
                                        Body:          []byte(strconv.Itoa(response)),
                                })
                        failOnError(err, "Failed to publish a message")

                        d.Ack(false)
                }
        }()

        log.Printf(" [*] Awaiting RPC requests")
        <-forever
}

服務器代碼很是簡單:

  • 與往常同樣,咱們首先創建鏈接,通道並聲明隊列。
  • 咱們可能要運行多個服務器進程。爲了將負載平均分配給多個服務器,咱們須要在通道上設置prefetch設置。
  • 咱們使用Channel.Consume獲取去隊列,咱們從隊列中接收消息。而後,咱們進入goroutine進行工做,並將響應發送回去。

咱們的RPC客戶端rpc_client.go的代碼:

package main

import (
        "log"
        "math/rand"
        "os"
        "strconv"
        "strings"
        "time"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func randomString(l int) string {
        bytes := make([]byte, l)
        for i := 0; i < l; i++ {
                bytes[i] = byte(randInt(65, 90))
        }
        return string(bytes)
}

func randInt(min int, max int) int {
        return min + rand.Intn(max-min)
}

func fibonacciRPC(n int) (res int, err error) {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "",    // name
                false, // durable
                false, // delete when unused
                true,  // exclusive
                false, // noWait
                nil,   // arguments
        )
        failOnError(err, "Failed to declare a queue")

        msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                true,   // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        corrId := randomString(32)

        err = ch.Publish(
                "",          // exchange
                "rpc_queue", // routing key
                false,       // mandatory
                false,       // immediate
                amqp.Publishing{
                        ContentType:   "text/plain",
                        CorrelationId: corrId,
                        ReplyTo:       q.Name,
                        Body:          []byte(strconv.Itoa(n)),
                })
        failOnError(err, "Failed to publish a message")

        for d := range msgs {
                if corrId == d.CorrelationId {
                        res, err = strconv.Atoi(string(d.Body))
                        failOnError(err, "Failed to convert body to integer")
                        break
                }
        }

        return
}

func main() {
        rand.Seed(time.Now().UTC().UnixNano())

        n := bodyFrom(os.Args)

        log.Printf(" [x] Requesting fib(%d)", n)
        res, err := fibonacciRPC(n)
        failOnError(err, "Failed to handle RPC request")

        log.Printf(" [.] Got %d", res)
}

func bodyFrom(args []string) int {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "30"
        } else {
                s = strings.Join(args[1:], " ")
        }
        n, err := strconv.Atoi(s)
        failOnError(err, "Failed to convert arg to integer")
        return n
}

如今是時候看看rpc_client.gorpc_server.go的完整示例源代碼了。

咱們的RPC服務現已準備就緒。咱們能夠啓動服務器:

go run rpc_server.go
# => [x] Awaiting RPC requests

要請求斐波那契數,請運行客戶端:

go run rpc_client.go 30
# => [x] Requesting fib(30)

這裏介紹的設計不是RPC服務的惟一可能的實現,可是它具備一些重要的優勢:

  • 若是RPC服務器太慢,則能夠經過運行另外一臺RPC服務器來進行擴展。嘗試在新控制檯中運行另外一個rpc_server.go
  • 在客戶端,RPC只須要發送和接收一條消息。結果,RPC客戶端只須要一個網絡往返就能夠處理單個RPC請求。

咱們的代碼仍然很是簡單,而且不會嘗試解決更復雜(但很重要)的問題,例如:

  • 若是沒有服務器在運行,客戶端應如何反應?
  • 客戶端是否應該爲RPC設置某種超時時間?
  • 若是服務器發生故障並引起異常,是否應該將其轉發給客戶端?
  • 在處理以前防止無效的傳入消息(例如檢查邊界,類型)。

若是要進行實驗,可能會發現管理後臺界面對於查看隊列頗有用。

相關文章
相關標籤/搜索