在第二個教程中,咱們瞭解到如何在多個worker中使用Work Queues分發費時的任務。html
可是,若是咱們須要在遠程運行一個函數而且等待結果該怎麼辦呢?這個時候,咱們須要另一個模式了。這種模式一般被叫作Remote Procedure Call 或者RPC.json
在這個教程中,咱們將使用RabbitMQ來創建一個RPC系統:a client和a scalable RPC server.安全
爲了說明RPC服務怎樣被使用,咱們將建立一個簡單的Client class(客戶端類)。它會暴露一個發送RPC請求的名叫Call的方法而且會阻塞到接收到answer. 服務器
var rpcClient = new RPCClient(); Console.WriteLine(" [x] Requesting fib(30)");var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close();
通常來講,經過RabbitMQ來作RPC是簡單的。客戶端(client)發送request message而且服務端(server)返回response message. 爲了接收到response,咱們須要在request上發送一個callback queue address(回調隊列地址). app
var props = channel.CreateBasicProperties(); props.ReplyTo = replyQueueName; //設置callback queue name var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); // ... then code to read a response message from the callback_queue ...
AMQP協議在message上預約義了14個屬性的集合。 大部分屬性不多使用,下面的使用比較多:函數
Persistent : 使message持久化ui
DeliveryMode : 那些熟悉這個協議的可能會使用這個屬性而不是Persistent.來作持久化。this
ContentType : 用來描述編碼類型。例如,常常使用的JSON編碼,一般設置屬性爲:application/json編碼
ReplyTo : 用來命名callback queue(回調隊列)spa
CorrelationId : 用來關聯RPC Response 和request
在以前咱們講的方法中,咱們建議爲每個RPC request創建一個callback queue. 那樣很沒有效率,幸運的,還有一種更好的方法:咱們爲每一個client建立單獨的一個callback queue.
這個時候咱們須要CorrelationId屬性來關聯response和request. 每一個request都有惟一的correlationId. 當咱們在隊列中收到一個message,咱們看下這個屬性,而且根據它咱們來匹配response和request. 若是咱們看到一個不知道的CorrelationId值,咱們會安全的丟掉這個message. 它不屬於咱們的requests.
你可能會問,爲何咱們忽視callback queue中不知道的message(unknow messages),而不是報錯呢?那是服務端有 競態資源 的可能性。儘管不太可能,但它是可能的,RPC服務器在發送給咱們answer以後,但尚未發送an acknowledgement message以前死掉了。若是這種狀況發生了,重啓的RPC服務器將會再處理這個request. 那就是客戶端爲何要優雅的處理兩次responses. (能夠對比第二個教程,會在接收端確認,若是接收端沒有確認,以後隊列會再次發送request,服務端須要再次處理)
咱們的RPC像圖中這樣工做:
The Fibonacci task:
private static int fib(int n) { if (n == 0 || n == 1) return n; return fib(n - 1) + fib(n - 2); }
RPCServer.cs
using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text; class RPCServer { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "rpc_queue", durable: false, //聲明queue exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(0, 1, false); //公平調度策略 var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "rpc_queue", //接收消息 autoAck: false, consumer: consumer); Console.WriteLine(" [x] Awaiting RPC requests"); consumer.Received += (model, ea) => { string response = null; var body = ea.Body; var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId;//設置返回的CorrealationId try { var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine(" [.] fib({0})", message); response = fib(n).ToString(); //設置響應數據 } catch (Exception e) { Console.WriteLine(" [.] " + e.Message); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,//發送響應到callback queue basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } }; Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } /// /// Assumes only valid positive integer input. /// Don't expect this one to work for big numbers, and it's /// probably the slowest recursive implementation possible. /// private static int fib(int n) { if (n == 0 || n == 1) { return n; } return fib(n - 1) + fib(n - 2); } }
RPCClient.cs
using System;using System.Collections.Concurrent;using System.Text;using RabbitMQ.Client;using RabbitMQ.Client.Events; public class RpcClient { private readonly IConnection connection; private readonly IModel channel; private readonly string replyQueueName; private readonly EventingBasicConsumer consumer; private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>(); private readonly IBasicProperties props; public RpcClient() { var factory = new ConnectionFactory() { HostName = "localhost" }; connection = factory.CreateConnection(); channel = connection.CreateModel(); replyQueueName = channel.QueueDeclare().QueueName; consumer = new EventingBasicConsumer(channel); props = channel.CreateBasicProperties(); var correlationId = Guid.NewGuid().ToString(); props.CorrelationId = correlationId; props.ReplyTo = replyQueueName; consumer.Received += (model, ea) => { var body = ea.Body; var response = Encoding.UTF8.GetString(body); if (ea.BasicProperties.CorrelationId == correlationId) { respQueue.Add(response); } }; } public string Call(string message) { var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish( exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); channel.BasicConsume( consumer: consumer, queue: replyQueueName, autoAck: true); return respQueue.Take(); ; } public void Close() { connection.Close(); } } public class Rpc { public static void Main() { var rpcClient = new RpcClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close(); } }
參考網址: