原文來自 RabbitMQ 英文官網的教程(6.Remote procedure call - RPC),其示例代碼採用了 .NET C# 語言。html
In the second tutorial we learned how to use Work Queues to distribute time-consuming tasks among multiple workers.react
在第二篇教程中,咱們學習瞭如何使用工做隊列在多個工做單元之間分配耗時的任務。程序員
But what if we need to run a function on a remote computer and wait for the result? Well, that's a different story. This pattern is commonly known as Remote Procedure Call or RPC.json
可是假如咱們須要運行一個在遠程電腦上的函數並等待其結果將會怎樣呢?好吧,這將是一個徹底不一樣的故事,這個模式被廣泛認爲叫遠程過程調用或者簡稱 RPC。服務器
In this tutorial we're going to use RabbitMQ to build an RPC system: a client and a scalable RPC server. As we don't have any time-consuming tasks that are worth distributing, we're going to create a dummy RPC service that returns Fibonacci numbers.網絡
在本教程中咱們即將使用 RabbitMQ 來構建一個 RPC 系統:一個客戶端和一個可伸縮的 RPC 服務器。因爲咱們並無任何耗時的任務能拿來分配,那就建立一個返回斐波納契數列的虛擬 RPC 服務吧。app
客戶端接口
To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named call which sends an RPC request and blocks until the answer is received:異步
爲了說明如何使用 RPC 服務咱們來建立一個簡單的客戶端類。我會公開一個名叫 call 的方法,該方法用以發送一個 RPC 請求並保持阻塞狀態,直至接收到應答爲止。async
var rpcClient = new RPCClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close();
關於 RPC
Although RPC is a pretty common pattern in computing, it's often criticised. The problems arise when a programmer is not aware whether a function call is local or if it's a slow RPC. Confusions like that result in an unpredictable system and adds unnecessary complexity to debugging. Instead of simplifying software, misused RPC can result in unmaintainable spaghetti code.ide
儘管 RPC 是一個很常見的計算模式,也時常遭受批評。當程序員不知道針對 call 函數的調用是本地的仍是很慢的 RPC 時就會出現問題,像這樣的困惑每每會致使不可預測的系統(問題)以及徒增沒必要要的調試複雜性。與簡化軟件有所不一樣的是,誤用 RPC 會致使難以維護的意大利麪條式代碼。
Bearing that in mind, consider the following advice:
Make sure it's obvious which function call is local and which is remote.
Document your system. Make the dependencies between components clear.
Handle error cases. How should the client react when the RPC server is down for a long time?
記住以上問題,並考慮如下建議:
- 確保能夠明顯區分哪個函數是調用本地的,哪個是遠程的。
- 爲系統編寫文檔,確保組件之間的依賴很明確。
- 處理錯誤情形,當 RPC 服務端停機很長時間時,客戶端會怎樣應對?
When in doubt avoid RPC. If you can, you should use an asynchronous pipeline - instead of RPC-like blocking, results are asynchronously pushed to a next computation stage.
當有疑問時先避免使用 RPC,若是能夠,考慮使用一個異步管道 - 它相似於 RPC 的阻塞,會經過異步的方式將結果推送到下一個計算場景。
回調隊列
In general doing RPC over RabbitMQ is easy. A client sends a request message and a server replies with a response message. In order to receive a response we need to send a 'callback' queue address with the request:
通常而言,基於 RabbitMQ 來使用 RPC 是很簡單的,即客戶端發送一個請求消息,而後服務端使用一個響應消息做爲應答。爲了能得到一個響應,咱們須要在請求過程當中發送一個「callback」隊列地址。
var corrId = Guid.NewGuid().ToString(); var props = channel.CreateBasicProperties(); props.ReplyTo = replyQueueName; props.CorrelationId = corrId; var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); // ... then code to read a response message from the callback_queue ...
消息屬性
The AMQP 0-9-1 protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:
deliveryMode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.
contentType: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
replyTo: Commonly used to name a callback queue.
correlationId: Useful to correlate RPC responses with requests.
AMQP 0-9-1 協議會在消息中預約義包含有 14 個屬性的集合,大部分的屬性用得都比較少,除了如下幾項以外:
- deliveryMode:將消息標記爲持久的(值爲2),或者瞬時的(其餘值),想必你在第二篇教程中還記得這個屬性。
- contentType:常常用來描述編碼的 mime 類型,好比在常見的 JSON 編碼中一個好的實踐即是設置該屬性爲:application/json。
- replyTo:一般用來爲回調隊列命名。
- correlationId:用以將 RPC 響應與請求關聯起來。
CorrelationId
In the method presented above we suggest creating a callback queue for every RPC request. That's pretty inefficient, but fortunately there is a better way - let's create a single callback queue per client.
在上面呈現的方法中咱們建議爲每個 RPC 請求建立一個回調隊列,不過這很低效,幸運的是咱們有更好的辦法 - 讓咱們爲每個客戶端建立一個單獨的回調。
That raises a new issue, having received a response in that queue it's not clear to which request the response belongs. That's when the correlationId property is used. We're going to set it to a unique value for every request. Later, when we receive a message in the callback queue we'll look at this property, and based on that we'll be able to match a response with a request. If we see an unknown correlationId value, we may safely discard the message - it doesn't belong to our requests.
這就又會出現一個問題,即在收到響應的隊列中,並不清楚哪一個請求隸屬於該響應,這即是 correlationId 屬性所用之處。咱們將會對每個請求設置 correlationId 爲惟一值,而後,當咱們在回調隊列中接收到消息時會查看這個屬性,在該屬性的基礎上,咱們可讓請求與響應進行匹配。若是咱們發現有未知的 correlationId 值,則能夠放心地丟棄這些並不屬於咱們的請求的消息。
You may ask, why should we ignore unknown messages in the callback queue, rather than failing with an error? It's due to a possibility of a race condition on the server side. Although unlikely, it is possible that the RPC server will die just after sending us the answer, but before sending an acknowledgment message for the request.
If that happens, the restarted RPC server will process the request again. That's why on the client we must handle the duplicate responses gracefully, and the RPC should ideally be idempotent.
你可能會問,咱們爲何應該在回調隊列中忽略未知的消息,而不是(直接)返回錯誤?這多是因爲服務端存在競態條件。儘管不太可能,可是針對一個請求,RPC 服務器極可能在發送完應答後停止,而不是在發送確認消息以前。若是確實發生,重啓的 RPC 服務將再一次處理這個請求,這就是爲何咱們在客戶端須要優雅地處理重複的響應,以及應該(保持)理想地冪等性。
總結
Our RPC will work like this:
When the Client starts up, it creates an anonymous exclusive callback queue.
For an RPC request, the Client sends a message with two properties: replyTo, which is set to the callback queue and correlationId, which is set to a unique value for every request.
The request is sent to an rpc_queue queue.
The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the replyTo field.
The client waits for data on the callback queue. When a message appears, it checks the correlationId property. If it matches the value from the request it returns the response to the application.
RPC 會像以下這樣運做:
- 當客戶端啓動時,它將建立一個匿名的獨有回調隊列。
- 針對一個 RPC 請求,客戶端會發送一個基於兩個屬性的消息:一個是指向回調隊列的 replyTo,另外一個是爲每個請求標記惟一值的 correlationId。
- 請求將發送至 rpc_queue 隊列。
- RPC 工做單元(或者叫服務端)會在隊列中持續等待請求。當請求出現時,RPC 將完成工做,同時使用來自 replyTo 字段(所指代)的隊列來發送攜帶着結果的消息返回至客戶端。
- 客戶端在回調隊列上等待着數據,當一個消息出現時,客戶端會檢查 correlationId 屬性,若是該值與當前請求的值相匹配,則把響應返回給應用程序。
融合一塊兒
The Fibonacci task:
斐波納契任務(函數)
private static int fib(int n) { if (n == 0 || n == 1) return n; return fib(n - 1) + fib(n - 2); }
We declare our fibonacci function. It assumes only valid positive integer input. (Don't expect this one to work for big numbers, and it's probably the slowest recursive implementation possible).
咱們聲明瞭斐波納契函數,並假定只(容許)輸入正整數。(不要指望輸入過大的數字,由於極可能這個遞歸實現會很是慢)
The code for our RPC server RPCServer.cs looks like this:
針對咱們的 RPC 服務端,RPCServer.cs 類文件的代碼看起來以下:
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class RPCServer { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer); Console.WriteLine(" [x] Awaiting RPC requests"); consumer.Received += (model, ea) => { string response = null; var body = ea.Body; var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine(" [.] fib({0})", message); response = fib(n).ToString(); } catch (Exception e) { Console.WriteLine(" [.] " + e.Message); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } }; Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } /// /// Assumes only valid positive integer input. /// Don't expect this one to work for big numbers, and it's /// probably the slowest recursive implementation possible. /// private static int fib(int n) { if (n == 0 || n == 1) { return n; } return fib(n - 1) + fib(n - 2); } }
The server code is rather straightforward:
As usual we start by establishing the connection, channel and declaring the queue.
We might want to run more than one server process. In order to spread the load equally over multiple servers we need to set the prefetchCount setting in channel.basicQos.
We use basicConsume to access the queue. Then we register a delivery handler in which we do the work and send the response back.
服務端的代碼是至關簡單的。
- 像往常同樣,咱們先創建鏈接、信道以及聲明隊列。
- 咱們可能想運行不僅一個服務端處理程序,爲了能經過多臺服務器平均地分擔負載,咱們須要設定 channel.basicQos 中 prefetchCount 的值。
- 咱們使用 basicConsume 來訪問隊列,而後註冊一個遞送程序,在這個程序中咱們執行工做並返回響應。
The code for our RPC client RPCClient.cs:
針對咱們的 RPC 客戶端,RPCClient.cs 類文件的代碼以下:
using System; using System.Collections.Concurrent; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; public class RpcClient { private readonly IConnection connection; private readonly IModel channel; private readonly string replyQueueName; private readonly EventingBasicConsumer consumer; private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>(); private readonly IBasicProperties props; public RpcClient() { var factory = new ConnectionFactory() { HostName = "localhost" }; connection = factory.CreateConnection(); channel = connection.CreateModel(); replyQueueName = channel.QueueDeclare().QueueName; consumer = new EventingBasicConsumer(channel); props = channel.CreateBasicProperties(); var correlationId = Guid.NewGuid().ToString(); props.CorrelationId = correlationId; props.ReplyTo = replyQueueName; consumer.Received += (model, ea) => { var body = ea.Body; var response = Encoding.UTF8.GetString(body); if (ea.BasicProperties.CorrelationId == correlationId) { respQueue.Add(response); } }; } public string Call(string message) { var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish( exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); channel.BasicConsume( consumer: consumer, queue: replyQueueName, autoAck: true); return respQueue.Take(); ; } public void Close() { connection.Close(); } } public class Rpc { public static void Main() { var rpcClient = new RpcClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close(); } }
The client code is slightly more involved:
We establish a connection and channel and declare an exclusive 'callback' queue for replies.
We subscribe to the 'callback' queue, so that we can receive RPC responses.
Our call method makes the actual RPC request.
Here, we first generate a unique correlationId number and save it - the while loop will use this value to catch the appropriate response.
Next, we publish the request message, with two properties: replyTo and correlationId.
At this point we can sit back and wait until the proper response arrives.
The while loop is doing a very simple job, for every response message it checks if the correlationId is the one we're looking for. If so, it saves the response.
Finally we return the response back to the user.
客戶端的代碼稍微多一些:
- 咱們創建鏈接和信道,以及針對答覆(響應)聲明一個獨有的「callback」隊列。
- 咱們訂閱這個「callback」隊列,以即可以接收到 RPC 響應。
- 咱們的 call 方法將發起一個實際的 RPC 請求。
- 在此,咱們首先生成一個惟一的 correlationId 編號並保存好它,由於 while 循環會使用該值來捕獲匹配的響應。
- 接下來,咱們發佈請求消息,它包含了兩個屬性:replyTo 和 correlationId。
- 此時,咱們能夠稍微等待一下直到指定的響應到來。
- while 循環所作的事情很是簡單,對於每個響應消息,它都會檢查 correlationId 是否爲咱們正在尋找的那一個,若是是就保存該響應。
- 最終,咱們將響應返回給用戶。
Making the Client request:
客戶端請求
var rpcClient = new RPCClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close();
Now is a good time to take a look at our full example source code (which includes basic exception handling) for RPCClient.cs and RPCServer.cs.
Set up as usual (see tutorial one):
Our RPC service is now ready. We can start the server:
如今是時候來看一下 RPCClient.cs 和 RPCServer.cs 完整的示例代碼了(包含了基本的異常處理)。
像往常一下建立(可參考第一篇):
咱們的 RPC 服務已經就緒,現能夠開啓服務端:
cd RPCServer dotnet run # => [x] Awaiting RPC requests
To request a fibonacci number run the client:
運行客戶端來請求一個斐波納契數:
cd RPCClient dotnet run # => [x] Requesting fib(30)
The design presented here is not the only possible implementation of a RPC service, but it has some important advantages:
If the RPC server is too slow, you can scale up by just running another one. Try running a second RPCServer in a new console.
On the client side, the RPC requires sending and receiving only one message. No synchronous calls like queueDeclare are required. As a result the RPC client needs only one network round trip for a single RPC request.
目前所呈現的設計不只僅是 RPC 服務的可能實現,並且還有一些重要優勢:
- 若是 RPC 服務很慢,你能夠經過運行另外一個來橫向擴展,也就是嘗試在新的控制檯中運行第二個 RPCServer。
- 在客戶端,RPC 只能發送和接收一條消息,必需像 queueDeclare 那樣進行非同步式調用。所以,RPC 客戶端只須要單次請求的一次網絡往返。
Our code is still pretty simplistic and doesn't try to solve more complex (but important) problems, like:
How should the client react if there are no servers running?
Should a client have some kind of timeout for the RPC?
If the server malfunctions and raises an exception, should it be forwarded to the client?
Protecting against invalid incoming messages (eg checking bounds, type) before processing.
咱們的代碼仍然很簡單,也並無嘗試去解決更復雜(但很重要的)問題,好比就像:
- 若是服務端沒有運行,那麼客戶端將如何應對?
- 客戶端針對 RPC 是否應該有某種超時(應對措施)?
- 若是服務端出現故障並引起異常,它是否應該轉發給客戶端?
- 在處理以前防備無效的傳入消息(好比檢查邊界和類型)。