本文翻譯自RabbitMQ官網的Go語言客戶端系列教程,本文首發於個人我的博客:liwenzhou.com,教程共分爲六篇,本文是第六篇——RPC。html
這些教程涵蓋了使用RabbitMQ建立消息傳遞應用程序的基礎知識。
你須要安裝RabbitMQ服務器才能完成這些教程,請參閱安裝指南或使用Docker鏡像。
這些教程的代碼是開源的,官方網站也是如此。git
本教程假設RabbitMQ已安裝並運行在本機上的標準端口(5672)。若是你使用不一樣的主機、端口或憑據,則須要調整鏈接設置。程序員
(使用Go RabbitMQ客戶端)github
在第二個教程中,咱們學習瞭如何使用工做隊列在多個worker之間分配耗時的任務。web
可是,若是咱們須要在遠程計算機上運行函數並等待結果怎麼辦?好吧,那是一個不一樣的故事。這種模式一般稱爲遠程過程調用或RPC。docker
在本教程中,咱們將使用RabbitMQ構建一個RPC系統:客戶端和可伸縮RPC服務器。因爲咱們沒有值得分配的耗時任務,所以咱們將建立一個虛擬RPC服務,該服務返回斐波那契數。json
有關RPC的說明bash
儘管RPC是計算中很是常見的模式,但它常常受到批評。服務器
當程序員不知道函數調用是本地的仍是緩慢的RPC時,就會出現問題。這樣的混亂會致使系統變幻莫測,並給調試增長了沒必要要的複雜性。濫用RPC可能會致使沒法維護的意大利麪條式代碼而不是簡化軟件,網絡
牢記這一點,請考慮如下建議:
- 肯定哪一個函數調用是本地的,哪一個是遠程的。
- 爲你的系統編寫文檔。明確組件之間的依賴關係。
- 處理錯誤狀況。 當RPC服務器長時間關閉時,客戶端應如何處理?
一般,經過RabbitMQ進行RPC很容易。客戶端發送請求消息,服務器發送響應消息。爲了接收響應,咱們須要發送帶有「回調」隊列地址的請求。咱們可使用默認隊列。讓咱們嘗試一下:
q, err := ch.QueueDeclare( "", // 不指定隊列名,默認使用隨機生成的隊列名 false, // durable false, // delete when unused true, // exclusive false, // noWait nil, // arguments ) err = ch.Publish( "", // exchange "rpc_queue", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: corrId, ReplyTo: q.Name, // 在這裏指定callback隊列名,也是在這個隊列等回覆 Body: []byte(strconv.Itoa(n)), })
消息屬性
AMQP 0-9-1協議預約義了消息附帶的14個屬性集。除如下屬性外,大多數屬性不多使用:
persistent
:將消息標記爲持久性(值爲true
)或瞬態(false
)。你可能還記得第二個教程中的此屬性。content_type
:用於描述編碼的mime類型。例如,對於常用的JSON編碼,將此屬性設置爲application/ json
是一個好習慣。reply_to
:經常使用於命名回調隊列correlation_id
:有助於將RPC響應與請求相關聯
在上面介紹的方法中,咱們建議爲每一個RPC請求建立一個回調隊列。這是至關低效的,可是幸運的是,有一種更好的方法——讓咱們爲每一個客戶端建立一個回調隊列。
這就引起了一個新問題,在該隊列中收到響應後,尚不清楚響應屬於哪一個請求。這個時候就該使用correlation_id
這個屬性了。針對每一個請求咱們將爲其設置一個惟一值。隨後,當咱們在回調隊列中收到消息時,咱們將查看該屬性,並基於這個屬性將響應與請求進行匹配。若是咱們看到未知的correlation_id
值,則能夠放心地丟棄該消息——它不屬於咱們的請求。
你可能會問,爲何咱們應該忽略回調隊列中的未知消息,而不是報錯而失敗?這是因爲服務器端可能出現競爭情況。儘管可能性不大,但RPC服務器可能會在向咱們發送答案以後但在發送請求的確認消息以前死亡。若是發生這種狀況,從新啓動的RPC服務器將再次處理該請求。這就是爲何在客戶端上咱們必須妥善處理重複的響應,而且理想狀況下RPC應該是冪等的。
咱們的RPC工做流程以下:
reply_to
(設置爲回調隊列)和correlation_id
(設置爲每一個請求的惟一值)。rpc_queue
隊列。replay_to
字段中的隊列發回給客戶端。correlation_id
屬性。若是它與請求中的值匹配,則將響應返回給應用程序。斐波那契函數:
func fib(n int) int { if n == 0 { return 0 } else if n == 1 { return 1 } else { return fib(n-1) + fib(n-2) } }
聲明咱們的斐波那契函數。它僅假設有效的正整數輸入。 (不要期望這種方法適用於大量用戶,它多是最慢的遞歸實現)。
咱們的RPC服務器rpc_server.go的代碼以下所示:
package main import ( "log" "strconv" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func fib(n int) int { if n == 0 { return 0 } else if n == 1 { return 1 } else { return fib(n-1) + fib(n-2) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "rpc_queue", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global ) failOnError(err, "Failed to set QoS") msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { n, err := strconv.Atoi(string(d.Body)) failOnError(err, "Failed to convert body to integer") log.Printf(" [.] fib(%d)", n) response := fib(n) err = ch.Publish( "", // exchange d.ReplyTo, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: d.CorrelationId, Body: []byte(strconv.Itoa(response)), }) failOnError(err, "Failed to publish a message") d.Ack(false) } }() log.Printf(" [*] Awaiting RPC requests") <-forever }
服務器代碼很是簡單:
prefetch
設置。Channel.Consume
獲取去隊列,咱們從隊列中接收消息。而後,咱們進入goroutine進行工做,並將響應發送回去。咱們的RPC客戶端rpc_client.go的代碼:
package main import ( "log" "math/rand" "os" "strconv" "strings" "time" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func randomString(l int) string { bytes := make([]byte, l) for i := 0; i < l; i++ { bytes[i] = byte(randInt(65, 90)) } return string(bytes) } func randInt(min int, max int) int { return min + rand.Intn(max-min) } func fibonacciRPC(n int) (res int, err error) { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when unused true, // exclusive false, // noWait nil, // arguments ) failOnError(err, "Failed to declare a queue") msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") corrId := randomString(32) err = ch.Publish( "", // exchange "rpc_queue", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: corrId, ReplyTo: q.Name, Body: []byte(strconv.Itoa(n)), }) failOnError(err, "Failed to publish a message") for d := range msgs { if corrId == d.CorrelationId { res, err = strconv.Atoi(string(d.Body)) failOnError(err, "Failed to convert body to integer") break } } return } func main() { rand.Seed(time.Now().UTC().UnixNano()) n := bodyFrom(os.Args) log.Printf(" [x] Requesting fib(%d)", n) res, err := fibonacciRPC(n) failOnError(err, "Failed to handle RPC request") log.Printf(" [.] Got %d", res) } func bodyFrom(args []string) int { var s string if (len(args) < 2) || os.Args[1] == "" { s = "30" } else { s = strings.Join(args[1:], " ") } n, err := strconv.Atoi(s) failOnError(err, "Failed to convert arg to integer") return n }
如今是時候看看rpc_client.go和rpc_server.go的完整示例源代碼了。
咱們的RPC服務現已準備就緒。咱們能夠啓動服務器:
go run rpc_server.go # => [x] Awaiting RPC requests
要請求斐波那契數,請運行客戶端:
go run rpc_client.go 30 # => [x] Requesting fib(30)
這裏介紹的設計不是RPC服務的惟一可能的實現,可是它具備一些重要的優勢:
rpc_server.go
。咱們的代碼仍然很是簡單,而且不會嘗試解決更復雜(但很重要)的問題,例如:
若是要進行實驗,可能會發現管理後臺界面對於查看隊列頗有用。