【c#】RabbitMQ學習文檔(六)RPC(遠程調用)

遠程過程調用(Remote Proceddure call【RPC】)

(本實例都是使用的Net的客戶端,使用C#編寫)

  在第二個教程中,咱們學習瞭如何使用工做隊列在多個工做實例之間分配耗時的任務。

  可是,若是咱們須要在遠程計算機上運行功能並等待結果怎麼辦? 那是一個不一樣的故事。 此模式一般稱爲遠程過程調用或RPC。

  在本教程中,咱們將使用RabbitMQ構建一個RPC系統:一個客戶機和一個可擴展的RPC服務器。 因爲咱們沒有任何值得分發的耗時任務,咱們將建立一個返回斐波納契數字的虛擬RPC服務。

一、客戶端接口【Client Interface】

  爲了說明如何使用RPC服務,咱們將建立一個簡單的客戶端類。 它將公開一個名爲call的方法,該方法發送RPC請求並阻塞,直到接收到答案:html

複製代碼
var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close();
複製代碼


   關於RPC的註釋

   雖然RPC是一個很常見的計算模式,但它常常被批評。 當系統出現問題的時候,程序員不知道函數調用是本地函數仍是緩慢的RPC調用,這樣的混亂致使了系統的不可預測性,並增長了調試的複雜性。 濫用RPC可能致使代碼的可維護性不好,這樣的設計不但沒有簡化軟件,並且只會是系統更糟。

   銘記這一點,請考慮如下建議:

     確保顯而易見哪一個函數調用是本地的,哪一個是遠程的。
     記錄您的系統。 使組件之間的依賴關係清除。
     處理錯誤狀況。 當RPC服務器停機很長時間後,客戶端應該如何反應?

    當有疑問避免RPC。 若是能夠的話,您應該使用異步管道 - 而不是相似RPC的阻塞,將異步推送到下一個計算階段。

二、回調隊列【Callback queue】
 
   通常來講RPC對RabbitMQ來講很容易。 客戶端發送請求消息,服務器回覆一條響應消息。 爲了收到一個響應,咱們須要發送一個請求向'回調'的隊列地址:git

複製代碼
var corrId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;

var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
                     routingKey: "rpc_queue",
                     basicProperties: props,
                     body: messageBytes);

// ... then code to read a response message from the callback_queue ...
複製代碼


消息屬性

  AMQP 0-9-1協議預先定義了一組14個隨附消息的屬性。 大多數屬性不多使用,除了如下內容:

  deliveryMode:將消息標記爲persistent(值爲2)或transient(任何其餘值)。 您可能會從第二個教程中記住此屬性。
  contentType:用於描述mime類型的編碼。 例如對於常用的JSON編碼,將此屬性設置爲:application / json是一個很好的作法。
  replyTo:一般用來命名一個回調隊列。
  correlationId:用於將RPC響應與請求相關聯。

三、相關標識【Correlation Id】

  在上面所提出的方法中,咱們建議爲每一個RPC請求建立一個回調隊列。這是很是低效的,但幸運的是有一個更好的方法 - 讓咱們爲每一個客戶端建立一個回調隊列。

  這將引起了一個新問題,在該隊列中收到響應,響應所屬的請求是不知道的。此時正是使用correlationId屬性的時候。咱們將爲每一個請求設置一個惟一的值。稍後,當咱們在回調隊列中收到一條消息時,咱們將查看此屬性,而且基於此,咱們將可以將響應與請求相匹配。若是咱們看到一個未知的correlationId值,咱們能夠安全地丟棄該消息 - 它不屬於咱們的請求。

  您可能會問,爲何咱們應該忽略回調隊列中的未知消息,而不是出現錯誤?這是因爲服務器端可能出現競爭狀況。雖然不太可能,RPC服務器可能會在發送答覆以後,但在發送請求的確認消息以前死亡。若是發生這種狀況,從新啓動的RPC服務器將再次處理該請求。這就是爲何在客戶端上,咱們必須優雅地處理這些重複的響應,而且RPC應該理想地是冪等的。

四、概要【Summary】

 
  咱們的RPC將像這樣工做:

     當客戶端啓動時,它建立一個匿名獨佔回調隊列。
     對於RPC請求,客戶端發送一個具備兩個屬性的消息:replyTo,它被設置爲回調隊列和correlationId,它被設置爲每一個請求的惟一值。
     請求被髮送到rpc_queue隊列。
     RPC worker(aka:server)正在等待隊列上的請求。 當請求出現時,它將執行該做業,並使用replyTo字段中的隊列將結果發送回客戶端。
     客戶端等待回呼隊列中的數據。 當信息出現時,它檢查correlationId屬性。 若是它與請求中的值相匹配,則返回對應用程序的響應。

五、整合

  斐波納契【Fibonacci】任務:程序員

private static int fib(int n)
  {
    if (n == 0 || n == 1) return n;
    return fib(n - 1) + fib(n - 2);
  }


  咱們聲明斐波那契函數。 它只假設有效的正整數輸入。 (不要期望這一個能爲大數字工做,並且這多是最慢的遞歸實現)

   咱們的RPC服務器RPCServer.cs的代碼以下所示:github

複製代碼
 1  using System;
 2 using RabbitMQ.Client;
 3 using RabbitMQ.Client.Events;
 4 using System.Text;
 5 
 6 class RPCServer
 7 {
 8     public static void Main()
 9     {
10         var factory = new ConnectionFactory() { HostName = "localhost" };
11         using (var connection = factory.CreateConnection())
12         using (var channel = connection.CreateModel())
13         {
14             channel.QueueDeclare(queue: "rpc_queue", durable: false,
15               exclusive: false, autoDelete: false, arguments: null);
16             channel.BasicQos(0, 1, false);
17             var consumer = new EventingBasicConsumer(channel);
18             channel.BasicConsume(queue: "rpc_queue",
19               noAck: false, consumer: consumer);
20             Console.WriteLine(" [x] Awaiting RPC requests");
21 
22             consumer.Received += (model, ea) =>
23             {
24                 string response = null;
25 
26                 var body = ea.Body;
27                 var props = ea.BasicProperties;
28                 var replyProps = channel.CreateBasicProperties();
29                 replyProps.CorrelationId = props.CorrelationId;
30 
31                 try
32                 {
33                     var message = Encoding.UTF8.GetString(body);
34                     int n = int.Parse(message);
35                     Console.WriteLine(" [.] fib({0})", message);
36                     response = fib(n).ToString();
37                 }
38                 catch (Exception e)
39                 {
40                     Console.WriteLine(" [.] " + e.Message);
41                     response = "";
42                 }
43                 finally
44                 {
45                     var responseBytes = Encoding.UTF8.GetBytes(response);
46                     channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
47                       basicProperties: replyProps, body: responseBytes);
48                     channel.BasicAck(deliveryTag: ea.DeliveryTag,
49                       multiple: false);
50                 }
51             };
52 
53             Console.WriteLine(" Press [enter] to exit.");
54             Console.ReadLine();
55         }
56     }
57 
58     ///
59 
60     /// Assumes only valid positive integer input.
61     /// Don't expect this one to work for big numbers, and it's
62     /// probably the slowest recursive implementation possible.
63     ///
64 
65     private static int fib(int n)
66     {
67         if (n == 0 || n == 1)
68         {
69             return n;
70         }
71 
72         return fib(n - 1) + fib(n - 2);
73     }
74 }
複製代碼


 服務器代碼至關簡單:

     像往常同樣,咱們開始創建鏈接,通道並聲明隊列。
     咱們可能想要運行多個服務器進程。 爲了在多個服務器上平均分配負載,咱們須要在channel.basicQos中設置prefetchCount設置。
     咱們使用basicConsume訪問隊列。 而後咱們註冊一個交付處理程序,咱們在其中進行工做併發迴響應。

咱們的RPC客戶端的代碼RPCClient.csjson

複製代碼
 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 using RabbitMQ.Client;
 7 using RabbitMQ.Client.Events;
 8 
 9 class RPCClient
10 {
11     private IConnection connection;
12     private IModel channel;
13     private string replyQueueName;
14     private QueueingBasicConsumer consumer;
15 
16     public RPCClient()
17     {
18         var factory = new ConnectionFactory() { HostName = "localhost" };
19         connection = factory.CreateConnection();
20         channel = connection.CreateModel();
21         replyQueueName = channel.QueueDeclare().QueueName;
22         consumer = new QueueingBasicConsumer(channel);
23         channel.BasicConsume(queue: replyQueueName,
24                              noAck: true,
25                              consumer: consumer);
26     }
27 
28     public string Call(string message)
29     {
30         var corrId = Guid.NewGuid().ToString();
31         var props = channel.CreateBasicProperties();
32         props.ReplyTo = replyQueueName;
33         props.CorrelationId = corrId;
34 
35         var messageBytes = Encoding.UTF8.GetBytes(message);
36         channel.BasicPublish(exchange: "",
37                              routingKey: "rpc_queue",
38                              basicProperties: props,
39                              body: messageBytes);
40 
41         while(true)
42         {
43             var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
44             if(ea.BasicProperties.CorrelationId == corrId)
45             {
46                 return Encoding.UTF8.GetString(ea.Body);
47             }
48         }
49     }
50 
51     public void Close()
52     {
53         connection.Close();
54     }
55 }
56 
57 class RPC
58 {
59     public static void Main()
60     {
61         var rpcClient = new RPCClient();
62 
63         Console.WriteLine(" [x] Requesting fib(30)");
64         var response = rpcClient.Call("30");
65         Console.WriteLine(" [.] Got '{0}'", response);
66 
67         rpcClient.Close();
68     }
69 }
複製代碼


客戶端代碼稍微複雜一些:

     咱們創建一個鏈接和通道,併爲回覆聲明一個獨佔的'回調'隊列。
     咱們訂閱'回調'隊列,這樣咱們能夠收到RPC響應。
     咱們的調用方法使得實際的RPC請求。
     在這裏,咱們首先生成一個惟一的correlationId數字並保存它 - while循環將使用此值來捕獲適當的響應。
     接下來,咱們發佈請求消息,此請求消息具備兩個屬性:replyTo和correlationId。
     在這一點上,咱們能夠坐下來等待適當的響應到達。
     while循環正在作一個很是簡單的工做,對於每一個響應消息,它檢查correlationId是不是咱們正在尋找的。 若是是這樣,它會保存響應。
     最後,咱們將響應返回給用戶。

讓客戶端發送請求:

安全

複製代碼
var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close();
複製代碼


如今是看看咱們的RPCClient.csRPCServer.cs的完整示例源代碼(包括基本異常處理)的好時機。

照常設置(參見教程一):

咱們的RPC服務如今已經準備好了。 咱們能夠啓動服務器:服務器

cd RPCServer
dotnet run
# => [x] Awaiting RPC requests

要請求運行客戶端的fibonacci號碼:網絡

cd RPCClient
dotnet run
# => [x] Requesting fib(30)


這裏提出的設計不是RPC服務的惟一可能的實現,而是具備一些重要的優勢:

     若是RPC服務器太慢,能夠經過運行另外一個RPC服務器進行擴展。 嘗試在新的控制檯中運行第二個RPCServer。
     在客戶端,RPC須要發送和接收一條消息。 不須要像queueDeclare這樣的同步調用。 所以,RPC客戶端只須要一個網絡往返單個RPC請求。

咱們的代碼仍然很是簡單,沒有嘗試解決更復雜(但重要的)問題,例如:

     若是沒有服務器運行,客戶端應該如何反應?
     客戶端是否須要RPC的某種超時時間?
     若是服務器發生故障並引起異常,應該將其轉發給客戶端?
     在處理以前防止無效的傳入消息(例如檢查邊界,類型)。
好了,這個系列也快結束了。

在把原地址貼出來,讓你們瞭解更多。地址以下:http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html併發

相關文章
相關標籤/搜索