在教程二中,咱們學習瞭如何使用工做隊列在多個工做線程中分發耗時的任務。但若是咱們須要去執行遠程機器上的方法而且等待結果會怎麼樣呢?那又是另一回事了。這種模式一般被稱爲遠程過程調用(RPC)。html
本教程中咱們將使用RabbitMQ構建一個遠程過程調用系統:一個客戶端和一個可擴展的服務器。因爲沒有什麼耗時的任務值得分發,咱們將建立一個虛擬的RPC服務用於返回斐波那契數列。git
爲了闡釋如何使用RPC服務咱們將建立一個簡單的客戶端類。類中獎公開一個方法用於發送一個RPC請求,而後阻塞知道收到應答,方法名稱叫作call:程序員
1 var rpcClient = new RPCClient(); 2 3 Console.WriteLine(" [x] Requesting fib(30)"); 4 var response = rpcClient.Call("30"); 5 Console.WriteLine(" [.] Got '{0}'", response); 6 7 rpcClient.Close();
RPC註記github
儘管RPC在計算機技術中是一種很是常見的模式,可是它卻飽受批判,問題發生在程序員不知道一個調用是本地的仍是一個耗時的RPC。這樣的混亂,致使不可預知的系統,並將沒必要要的複雜性調價到調試過程當中。誤用RPC將致使不可維護的混亂的代碼,而不是簡化軟件。json
銘記這些限制,考慮下面的建議:安全
應該在不能肯定的時候避免使用RPC,若是能夠的話,你可使用異步管道,而不是類RPC的阻塞,結果被異步推送到下一個計算階段。服務器
通常來講,在RabbitMQ之上構建RPC很是的容易,客戶端發送請求消息,服務返回應答消息。爲了可以接收到應答的消息,咱們須要在請求時指定一個回調隊列地址:網絡
1 var corrId = Guid.NewGuid().ToString(); 2 var props = channel.CreateBasicProperties(); 3 props.ReplyTo = replyQueueName; 4 props.CorrelationId = corrId; 5 6 var messageBytes = Encoding.UTF8.GetBytes(message); 7 channel.BasicPublish(exchange: "", 8 routingKey: "rpc_queue", 9 basicProperties: props, 10 body: messageBytes); 11 12 // ... 而後是從回調隊列中讀取消息的代碼 ...
AMQP協議預約義了一個包含14個屬性的屬性集做用於消息之上,大多數都不多使用,除了下面這些:app
在上面準備的方法中,咱們建議爲每個RPC請求建立一個回調隊列。這樣至關低效,辛運的是有更好的方法,讓咱們爲每個客戶端建立一個回調隊列。異步
這樣引出了一個新問題,當收到一個響應的時候,它沒法清楚的知道響應屬於哪個請求。這就是correlationId派上用場的時候。咱們將爲每個請求設置一個惟一的關聯ID,以後當咱們從回調隊列收到一個響應的時候,咱們將檢查這個屬性,基於此,便能將響應和請求關聯起來了。若是發現一個未知的關聯ID值,咱們能夠安全的銷燬消息,由於消息不屬於任何一個請求。
你可能會奇怪,爲何咱們忽略掉未知關聯ID值得消息,而不是用錯誤來標記失敗?這是由於在服務器端可能存在爭用條件。儘管不太可能,可是RPC服務器可能在發送了響應消息而未發送消息確認的狀況下出現故障,若是出現這樣的狀況,在RPC服務器重啓以後將再次處理該請求。這就是爲何咱們必須在客戶端優雅的捕獲重複的請求,而且RPC理論上應該是冪等的。
咱們的RPC將這樣工做:
斐波那契任務:
1 private static int fib(int n) 2 { 3 if (n == 0 || n == 1) return n; 4 return fib(n - 1) + fib(n - 2); 5 }
咱們定義斐波那契函數,它只採用正整數做爲輸入。(別期望它能在大數值的狀況下工做,並且這多是最慢的一種遞歸實現)
RPC服務器RPCServer.cs中的代碼看起來是這樣的:
1 using System; 2 using RabbitMQ.Client; 3 using RabbitMQ.Client.Events; 4 using System.Text; 5 6 class RPCServer 7 { 8 public static void Main() 9 { 10 var factory = new ConnectionFactory() { HostName = "localhost" }; 11 using(var connection = factory.CreateConnection()) 12 using(var channel = connection.CreateModel()) 13 { 14 channel.QueueDeclare(queue: "rpc_queue", 15 durable: false, 16 exclusive: false, 17 autoDelete: false, 18 arguments: null); 19 channel.BasicQos(0, 1, false); 20 var consumer = new QueueingBasicConsumer(channel); 21 channel.BasicConsume(queue: "rpc_queue", 22 noAck: false, 23 consumer: consumer); 24 Console.WriteLine(" [x] Awaiting RPC requests"); 25 26 while(true) 27 { 28 string response = null; 29 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 30 31 var body = ea.Body; 32 var props = ea.BasicProperties; 33 var replyProps = channel.CreateBasicProperties(); 34 replyProps.CorrelationId = props.CorrelationId; 35 36 try 37 { 38 var message = Encoding.UTF8.GetString(body); 39 int n = int.Parse(message); 40 Console.WriteLine(" [.] fib({0})", message); 41 response = fib(n).ToString(); 42 } 43 catch(Exception e) 44 { 45 Console.WriteLine(" [.] " + e.Message); 46 response = ""; 47 } 48 finally 49 { 50 var responseBytes = Encoding.UTF8.GetBytes(response); 51 channel.BasicPublish(exchange: "", 52 routingKey: props.ReplyTo, 53 basicProperties: replyProps, 54 body: responseBytes); 55 channel.BasicAck(deliveryTag: ea.DeliveryTag, 56 multiple: false); 57 } 58 } 59 } 60 } 61 62 /// <summary> 63 /// Assumes only valid positive integer input. 64 /// Don't expect this one to work for big numbers, 65 /// and it's probably the slowest recursive implementation possible. 66 /// </summary> 67 private static int fib(int n) 68 { 69 if(n == 0 || n == 1) 70 { 71 return n; 72 } 73 74 return fib(n - 1) + fib(n - 2); 75 } 76 }
服務端代碼至關簡單:
RPC客戶端RPCClient.cs中的代碼:
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 using RabbitMQ.Client; 7 using RabbitMQ.Client.Events; 8 9 class RPCClient 10 { 11 private IConnection connection; 12 private IModel channel; 13 private string replyQueueName; 14 private QueueingBasicConsumer consumer; 15 16 public RPCClient() 17 { 18 var factory = new ConnectionFactory() { HostName = "localhost" }; 19 connection = factory.CreateConnection(); 20 channel = connection.CreateModel(); 21 replyQueueName = channel.QueueDeclare().QueueName; 22 consumer = new QueueingBasicConsumer(channel); 23 channel.BasicConsume(queue: replyQueueName, 24 noAck: true, 25 consumer: consumer); 26 } 27 28 public string Call(string message) 29 { 30 var corrId = Guid.NewGuid().ToString(); 31 var props = channel.CreateBasicProperties(); 32 props.ReplyTo = replyQueueName; 33 props.CorrelationId = corrId; 34 35 var messageBytes = Encoding.UTF8.GetBytes(message); 36 channel.BasicPublish(exchange: "", 37 routingKey: "rpc_queue", 38 basicProperties: props, 39 body: messageBytes); 40 41 while(true) 42 { 43 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 44 if(ea.BasicProperties.CorrelationId == corrId) 45 { 46 return Encoding.UTF8.GetString(ea.Body); 47 } 48 } 49 } 50 51 public void Close() 52 { 53 connection.Close(); 54 } 55 } 56 57 class RPC 58 { 59 public static void Main() 60 { 61 var rpcClient = new RPCClient(); 62 63 Console.WriteLine(" [x] Requesting fib(30)"); 64 var response = rpcClient.Call("30"); 65 Console.WriteLine(" [.] Got '{0}'", response); 66 67 rpcClient.Close(); 68 } 69 }
客戶端的代碼要稍微複雜一些:
構建客戶端請求:
1 RPCClient fibonacciRpc = new RPCClient(); 2 3 System.out.println(" [x] Requesting fib(30)"); 4 String response = fibonacciRpc.call("30"); 5 System.out.println(" [.] Got '" + response + "'"); 6 7 fibonacciRpc.close();
如今是時候來看看完整示例的源代碼了(包含基本的異常處理)。RPCClient.cs和RPCServer.cs。
編譯(參見教程一):
1 $ csc /r:"RabbitMQ.Client.dll" RPCClient.cs 2 $ csc /r:"RabbitMQ.Client.dll" RPCServer.cs
如今RPC服務已經準備就緒,能夠啓動服務了:
1 $ RPCServer.exe 2 [x] Awaiting RPC requests
運行客戶端去請求斐波那契數列:
1 $ RPCClient.exe 2 [x] Requesting fib(30)
這裏介紹的設計並不是RPC服務的惟一實現方式,可是它有一些重要的優點:
咱們的代碼依然很是簡單,並無嘗試去解決一些複雜(可是重要)的問題,好比:
若是你想嘗試,你能夠找到有用的RabbitMQ管理插件去瀏覽隊列。
原文連接:http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html