RabbitMQ學習之RPC(6)

 在第二個教程中,咱們瞭解到如何在多個worker中使用Work Queues分發費時的任務。html

可是,若是咱們須要在遠程運行一個函數而且等待結果該怎麼辦呢?這個時候,咱們須要另一個模式了。這種模式一般被叫作Remote Procedure Call 或者RPC.json

在這個教程中,咱們將使用RabbitMQ來創建一個RPC系統:a clienta scalable RPC server.安全

Client interface

 爲了說明RPC服務怎樣被使用,咱們將建立一個簡單的Client class(客戶端類)。它會暴露一個發送RPC請求的名叫Call的方法而且會阻塞到接收到answer. 服務器

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close();

Callback queue

通常來講,經過RabbitMQ來作RPC是簡單的。客戶端(client)發送request message而且服務端(server)返回response message. 爲了接收到response,咱們須要在request上發送一個callback queue address(回調隊列地址). app

var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;  //設置callback queue name
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
                     routingKey: "rpc_queue",
                     basicProperties: props,
                     body: messageBytes);
// ... then code to read a response message from the callback_queue ...
Message properties

AMQP協議在message上預約義了14個屬性的集合。 大部分屬性不多使用,下面的使用比較多:函數

  • Persistent : 使message持久化ui

  • DeliveryMode : 那些熟悉這個協議的可能會使用這個屬性而不是Persistent.來作持久化。this

  • ContentType : 用來描述編碼類型。例如,常常使用的JSON編碼,一般設置屬性爲:application/json編碼

  • ReplyTo : 用來命名callback queue(回調隊列)spa

  • CorrelationId : 用來關聯RPC Response 和request

Correlation Id

在以前咱們講的方法中,咱們建議爲每個RPC request創建一個callback queue. 那樣很沒有效率,幸運的,還有一種更好的方法:咱們爲每一個client建立單獨的一個callback queue.

這個時候咱們須要CorrelationId屬性來關聯responserequest. 每一個request都有惟一的correlationId. 當咱們在隊列中收到一個message,咱們看下這個屬性,而且根據它咱們來匹配responserequest. 若是咱們看到一個不知道的CorrelationId值,咱們會安全的丟掉這個message. 它不屬於咱們的requests. 

你可能會問,爲何咱們忽視callback queue中不知道的messageunknow messages),而不是報錯呢?那是服務端有 競態資源 的可能性。儘管不太可能,但它是可能的,RPC服務器在發送給咱們answer以後,但尚未發送an acknowledgement message以前死掉了。若是這種狀況發生了,重啓的RPC服務器將會再處理這個request. 那就是客戶端爲何要優雅的處理兩次responses. (能夠對比第二個教程,會在接收端確認,若是接收端沒有確認,以後隊列會再次發送request,服務端須要再次處理)

Summary(總結)

咱們的RPC像圖中這樣工做:

  • 當一個client啓動時,它建立一個匿名的專用的callback queue.
  • 對於一個RPC request,client將會發送帶有兩個屬性的messageReplyTo,用來設置callback queue;而且CorrelationId,用來爲每一個request設置惟一的值。
  • Request會被髮送到rpc_queue.
  • RPC worker(這裏就是server)將會等待接收rpc_queue隊列裏的requests。當request出現時,它會作這個job,而且發送一個帶結果的messageclient,使用被ReplyTo屬性命名的隊列。
  • Client會等待callback queue的數據。當一個message出現時,它會檢查CorrealtionId屬性。若是它匹配request裏的這個值(CorrealationId),將會返回response到這個應用(client)

代碼

The Fibonacci task:

private static int fib(int n)
{
    if (n == 0 || n == 1) return n;
    return fib(n - 1) + fib(n - 2);
}

RPCServer.cs

using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;
class RPCServer
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "rpc_queue", durable: false,  //聲明queue
              exclusive: false, autoDelete: false, arguments: null);
            channel.BasicQos(0, 1, false); //公平調度策略
            var consumer = new EventingBasicConsumer(channel);
            channel.BasicConsume(queue: "rpc_queue",  //接收消息
              autoAck: false, consumer: consumer);
            Console.WriteLine(" [x] Awaiting RPC requests");

            consumer.Received += (model, ea) =>
            {
                string response = null;

                var body = ea.Body;
                var props = ea.BasicProperties;
                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;//設置返回的CorrealationId

                try
                {
                    var message = Encoding.UTF8.GetString(body);
                    int n = int.Parse(message);
                    Console.WriteLine(" [.] fib({0})", message);
                    response = fib(n).ToString(); //設置響應數據
                }
                catch (Exception e)
                {
                    Console.WriteLine(" [.] " + e.Message);
                    response = "";
                }
                finally
                {
                    var responseBytes = Encoding.UTF8.GetBytes(response);
                    channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,//發送響應到callback queue
                      basicProperties: replyProps, body: responseBytes);
                    channel.BasicAck(deliveryTag: ea.DeliveryTag,
                      multiple: false);
                }
            };

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }

    /// 
    /// Assumes only valid positive integer input.
    /// Don't expect this one to work for big numbers, and it's
    /// probably the slowest recursive implementation possible.
    /// 
    private static int fib(int n)
    {
        if (n == 0 || n == 1)
        {
            return n;
        }

        return fib(n - 1) + fib(n - 2);
    }
}

RPCClient.cs

using System;using System.Collections.Concurrent;using System.Text;using RabbitMQ.Client;using RabbitMQ.Client.Events;
public class RpcClient
{
    private readonly IConnection connection;
    private readonly IModel channel;
    private readonly string replyQueueName;
    private readonly EventingBasicConsumer consumer;
    private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
    private readonly IBasicProperties props;
public RpcClient()
{
        var factory = new ConnectionFactory() { HostName = "localhost" };

        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        replyQueueName = channel.QueueDeclare().QueueName;
        consumer = new EventingBasicConsumer(channel);

        props = channel.CreateBasicProperties();
        var correlationId = Guid.NewGuid().ToString();
        props.CorrelationId = correlationId;
        props.ReplyTo = replyQueueName;

        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var response = Encoding.UTF8.GetString(body);
            if (ea.BasicProperties.CorrelationId == correlationId)
            {
                respQueue.Add(response);
            }
        };
    }

    public string Call(string message)
    {
        var messageBytes = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(
            exchange: "",
            routingKey: "rpc_queue",
            basicProperties: props,
            body: messageBytes);

        channel.BasicConsume(
            consumer: consumer,
            queue: replyQueueName,
            autoAck: true);

        return respQueue.Take(); ;
    }

    public void Close()
    {
        connection.Close();
    }
}
public class Rpc
{
    public static void Main()
    {
        var rpcClient = new RpcClient();

        Console.WriteLine(" [x] Requesting fib(30)");
        var response = rpcClient.Call("30");

        Console.WriteLine(" [.] Got '{0}'", response);
        rpcClient.Close();
    }
}

 參考網址:

https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html

相關文章
相關標籤/搜索