【譯】RabbitMQ:遠程過程調用(RPC)

在教程二中,咱們學習瞭如何使用工做隊列在多個工做線程中分發耗時的任務。但若是咱們須要去執行遠程機器上的方法而且等待結果會怎麼樣呢?那又是另一回事了。這種模式一般被稱爲遠程過程調用(RPC)。html

本教程中咱們將使用RabbitMQ構建一個遠程過程調用系統:一個客戶端和一個可擴展的服務器。因爲沒有什麼耗時的任務值得分發,咱們將建立一個虛擬的RPC服務用於返回斐波那契數列。git

客戶端接口

爲了闡釋如何使用RPC服務咱們將建立一個簡單的客戶端類。類中獎公開一個方法用於發送一個RPC請求,而後阻塞知道收到應答,方法名稱叫作call:程序員

1 var rpcClient = new RPCClient();
2 
3 Console.WriteLine(" [x] Requesting fib(30)");
4 var response = rpcClient.Call("30");
5 Console.WriteLine(" [.] Got '{0}'", response);
6 
7 rpcClient.Close();

 

RPC註記github

儘管RPC在計算機技術中是一種很是常見的模式,可是它卻飽受批判,問題發生在程序員不知道一個調用是本地的仍是一個耗時的RPC。這樣的混亂,致使不可預知的系統,並將沒必要要的複雜性調價到調試過程當中。誤用RPC將致使不可維護的混亂的代碼,而不是簡化軟件。json

銘記這些限制,考慮下面的建議:安全

  • 確保方法是本地調用仍是遠程調用能清晰明瞭
  • 將系統歸檔備案,使組件間的依賴關係足夠清晰
  • 捕獲異常,當RPC服務宕機很長時間客戶端做何響應?

應該在不能肯定的時候避免使用RPC,若是能夠的話,你可使用異步管道,而不是類RPC的阻塞,結果被異步推送到下一個計算階段。服務器

 

回調隊列

通常來講,在RabbitMQ之上構建RPC很是的容易,客戶端發送請求消息,服務返回應答消息。爲了可以接收到應答的消息,咱們須要在請求時指定一個回調隊列地址:網絡

 1 var corrId = Guid.NewGuid().ToString();
 2 var props = channel.CreateBasicProperties();
 3 props.ReplyTo = replyQueueName;
 4 props.CorrelationId = corrId;
 5 
 6 var messageBytes = Encoding.UTF8.GetBytes(message);
 7 channel.BasicPublish(exchange: "",
 8                      routingKey: "rpc_queue",
 9                      basicProperties: props,
10                      body: messageBytes);
11 
12 // ... 而後是從回調隊列中讀取消息的代碼 ... 

消息屬性

AMQP協議預約義了一個包含14個屬性的屬性集做用於消息之上,大多數都不多使用,除了下面這些:app

  • deliveryMode:將消息標記爲持續(使用數值2)或瞬時(其餘任意值)的,經過教程二你應該還記得這個屬性。
  • contentType:用於描述媒體類型編碼,例如:針對經常使用的JSON編碼,最好的作法是把這個屬性設置爲:application/json
  • relayTo:一般用於命名一個回調隊列。
  • correlationId:關聯RPC請求和響應的時候很是有用

關聯ID

在上面準備的方法中,咱們建議爲每個RPC請求建立一個回調隊列。這樣至關低效,辛運的是有更好的方法,讓咱們爲每個客戶端建立一個回調隊列。異步

這樣引出了一個新問題,當收到一個響應的時候,它沒法清楚的知道響應屬於哪個請求。這就是correlationId派上用場的時候。咱們將爲每個請求設置一個惟一的關聯ID,以後當咱們從回調隊列收到一個響應的時候,咱們將檢查這個屬性,基於此,便能將響應和請求關聯起來了。若是發現一個未知的關聯ID值,咱們能夠安全的銷燬消息,由於消息不屬於任何一個請求。

你可能會奇怪,爲何咱們忽略掉未知關聯ID值得消息,而不是用錯誤來標記失敗?這是由於在服務器端可能存在爭用條件。儘管不太可能,可是RPC服務器可能在發送了響應消息而未發送消息確認的狀況下出現故障,若是出現這樣的狀況,在RPC服務器重啓以後將再次處理該請求。這就是爲何咱們必須在客戶端優雅的捕獲重複的請求,而且RPC理論上應該是冪等的。

總結

                                 

咱們的RPC將這樣工做:

  • 當客戶端啓動時,它會建立一個匿名的獨佔回調隊列。
  • 對於一個RPC請求,客戶端經過兩個屬性發送一條消息:relayTo,設置回調隊列;correlationId,爲每一個請求設置一個惟一值。
  • 消息將被髮送到一個rpc_queue隊列。
  • RPC工做線程(即,服務器)在該隊列上等待請求。當請求出現,他將處理請求並把結果發回給客戶端,使用的隊列是在replayTo中設置的。
  • 客戶端在回調隊列上等待響應,當消息出現,它檢查關聯ID,若是匹配來自請求的關聯ID值,返回消息到該應用程序。

組合在一塊兒

斐波那契任務:

1 private static int fib(int n)
2 {
3     if (n == 0 || n == 1) return n;
4     return fib(n - 1) + fib(n - 2);
5 }

咱們定義斐波那契函數,它只採用正整數做爲輸入。(別期望它能在大數值的狀況下工做,並且這多是最慢的一種遞歸實現)

RPC服務器RPCServer.cs中的代碼看起來是這樣的:

 1 using System;
 2 using RabbitMQ.Client;
 3 using RabbitMQ.Client.Events;
 4 using System.Text;
 5 
 6 class RPCServer
 7 {
 8     public static void Main()
 9     {
10         var factory = new ConnectionFactory() { HostName = "localhost" };
11         using(var connection = factory.CreateConnection())
12         using(var channel = connection.CreateModel())
13         {
14             channel.QueueDeclare(queue: "rpc_queue",
15                                  durable: false,
16                                  exclusive: false,
17                                  autoDelete: false,
18                                  arguments: null);
19             channel.BasicQos(0, 1, false);
20             var consumer = new QueueingBasicConsumer(channel);
21             channel.BasicConsume(queue: "rpc_queue",
22                                  noAck: false,
23                                  consumer: consumer);
24             Console.WriteLine(" [x] Awaiting RPC requests");
25 
26             while(true)
27             {
28                 string response = null;
29                 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
30 
31                 var body = ea.Body;
32                 var props = ea.BasicProperties;
33                 var replyProps = channel.CreateBasicProperties();
34                 replyProps.CorrelationId = props.CorrelationId;
35 
36                 try
37                 {
38                     var message = Encoding.UTF8.GetString(body);
39                     int n = int.Parse(message);
40                     Console.WriteLine(" [.] fib({0})", message);
41                     response = fib(n).ToString();
42                 }
43                 catch(Exception e)
44                 {
45                     Console.WriteLine(" [.] " + e.Message);
46                     response = "";
47                 }
48                 finally
49                 {
50                     var responseBytes = Encoding.UTF8.GetBytes(response);
51                     channel.BasicPublish(exchange: "",
52                                          routingKey: props.ReplyTo,
53                                          basicProperties: replyProps,
54                                          body: responseBytes);
55                     channel.BasicAck(deliveryTag: ea.DeliveryTag,
56                                      multiple: false);
57                 }
58             }
59         }
60     }
61 
62     /// <summary>
63     /// Assumes only valid positive integer input.
64     /// Don't expect this one to work for big numbers,
65     /// and it's probably the slowest recursive implementation possible.
66     /// </summary>
67     private static int fib(int n)
68     {
69         if(n == 0 || n == 1)
70         {
71             return n;
72         }
73 
74         return fib(n - 1) + fib(n - 2);
75     }
76 }

服務端代碼至關簡單:

  • 一般狀況下,咱們都會以建立連接、信道和申明隊列做爲開始。
  • 咱們可能但願運行不止一個服務器進程。爲了將加載均勻分佈到多個服務器,咱們須要將prefetchCount設置爲channel.basicQos
  • 咱們使用basicConsume來訪問隊列。以後進入While循環,等待請求消息,完成工做,而後發回響應。

RPC客戶端RPCClient.cs中的代碼:

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 using RabbitMQ.Client;
 7 using RabbitMQ.Client.Events;
 8 
 9 class RPCClient
10 {
11     private IConnection connection;
12     private IModel channel;
13     private string replyQueueName;
14     private QueueingBasicConsumer consumer;
15 
16     public RPCClient()
17     {
18         var factory = new ConnectionFactory() { HostName = "localhost" };
19         connection = factory.CreateConnection();
20         channel = connection.CreateModel();
21         replyQueueName = channel.QueueDeclare().QueueName;
22         consumer = new QueueingBasicConsumer(channel);
23         channel.BasicConsume(queue: replyQueueName,
24                              noAck: true,
25                              consumer: consumer);
26     }
27 
28     public string Call(string message)
29     {
30         var corrId = Guid.NewGuid().ToString();
31         var props = channel.CreateBasicProperties();
32         props.ReplyTo = replyQueueName;
33         props.CorrelationId = corrId;
34 
35         var messageBytes = Encoding.UTF8.GetBytes(message);
36         channel.BasicPublish(exchange: "",
37                              routingKey: "rpc_queue",
38                              basicProperties: props,
39                              body: messageBytes);
40 
41         while(true)
42         {
43             var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
44             if(ea.BasicProperties.CorrelationId == corrId)
45             {
46                 return Encoding.UTF8.GetString(ea.Body);
47             }
48         }
49     }
50 
51     public void Close()
52     {
53         connection.Close();
54     }
55 }
56 
57 class RPC
58 {
59     public static void Main()
60     {
61         var rpcClient = new RPCClient();
62 
63         Console.WriteLine(" [x] Requesting fib(30)");
64         var response = rpcClient.Call("30");
65         Console.WriteLine(" [.] Got '{0}'", response);
66 
67         rpcClient.Close();
68     }
69 }

客戶端的代碼要稍微複雜一些:

  • 建立一個連接、信道、爲響應申明獨佔的回調隊列。
  • 訂閱回調隊列,以便接收RPC響應。
  • call方法完成實際的RPC調用。
  • 首先建立一個惟一的關聯Id而且保存它,while循環使用它去匹配合適的應答。
  • 接下來,咱們發佈請求消息,使用了兩個屬性:replyTocorrelationId
  • 這時咱們就能夠坐等正確的響應到達了。
  • While循環作的事情很是簡單,檢測每個響應,若是correlactionId是咱們須要的,就保存該響應。
  • 最後,把響應返回給用戶。

構建客戶端請求:

1 RPCClient fibonacciRpc = new RPCClient();
2 
3 System.out.println(" [x] Requesting fib(30)");
4 String response = fibonacciRpc.call("30");
5 System.out.println(" [.] Got '" + response + "'");
6 
7 fibonacciRpc.close();

如今是時候來看看完整示例的源代碼了(包含基本的異常處理)。RPCClient.csRPCServer.cs

編譯(參見教程一):

1 $ csc /r:"RabbitMQ.Client.dll" RPCClient.cs
2 $ csc /r:"RabbitMQ.Client.dll" RPCServer.cs

如今RPC服務已經準備就緒,能夠啓動服務了:

1 $ RPCServer.exe
2  [x] Awaiting RPC requests

運行客戶端去請求斐波那契數列:

1 $ RPCClient.exe
2  [x] Requesting fib(30)

這裏介紹的設計並不是RPC服務的惟一實現方式,可是它有一些重要的優點:

  • 若是RPC服務太慢,你能夠經過運行另一個實例來對其進行橫向擴展,試着在一個新的控制檯裏面運行另外一個服務器。
  • 在客戶端,RPC只要求發送和接收一條消息,沒有如同declareQueue的同步調用被要求。做爲結果,RPC客戶端對於一個RPC請求,只須要一個網絡往返。

咱們的代碼依然很是簡單,並無嘗試去解決一些複雜(可是重要)的問題,好比:

  • 若是沒有運行中的服務器,客戶端將做何響應?
  • 客戶端對於RPC是否能夠有某種形式的超時?
  • 若是服務器發生故障,引起異常,是否應當被轉發給客戶端?
  • 在處理以前,避免無效的輸入數據,好比:檢查邊界、類型等。

 

若是你想嘗試,你能夠找到有用的RabbitMQ管理插件去瀏覽隊列。

 

原文連接:http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html

相關文章
相關標籤/搜索