RabbitMQ學習系列(五): RPC 遠程過程調用

前面講過一些RabbitMQ的安裝和用法,也說了說RabbitMQ在通常的業務場景下如何使用。不知道的能夠看我前面的博客,http://www.cnblogs.com/zhangweizhong/category/855479.htmlhtml

不過,最近有朋友問我,RabbitMQ RPC 是幹嗎的,有什麼用。編程

其實,RabbitMQ RPC 就是經過消息隊列(Message Queue)來實現rpc的功能,就是,客戶端向服務端發送定義好的Queue消息,其中攜帶的消息就應該是服務端將要調用的方法的參數 ,並使用Propertis告訴服務端將結果返回到指定的Queue服務器

1.RabbitMQ RPC的特色異步

  • Message Queue把全部的請求消息存儲起來,而後處理,和客戶端解耦。
  • Message Queue引入新的結點,系統的可靠性會受Message Queue結點的影響。
  • Message Queue是異步單向的消息。發送消息設計成是不須要等待消息處理的完成。

因此對於有同步返回需求,Message Queue是個不錯的方向。post

2.普通PRC的特色ui

  • 同步調用,對於要等待返回結果/處理結果的場景,RPC是能夠很是天然直覺的使用方式。固然RPC也能夠是異步調用。
  • 因爲等待結果,客戶端會有線程消耗。

若是以異步RPC的方式使用,客戶端線程消耗能夠去掉。但不能作到像消息同樣暫存消息請求,壓力會直接傳導到服務端。this

3.適用場合說明spa

  • 但願同步獲得結果的場合,RPC合適。
  • 但願使用簡單,則RPC;RPC操做基於接口,使用簡單,使用方式模擬本地調用。異步的方式編程比較複雜。
  • 不但願客戶端受限於服務端的速度等,可使用Message Queue。

4.RabbitMQ RPC工做流程:pwa

 

基本概念:
線程

Callback queue 回調隊列客戶端向服務器發送請求,服務器端處理請求後,將其處理結果保存在一個存儲體中。而客戶端爲了得到處理結果,那麼客戶在向服務器發送請求時,同時發送一個回調隊列地址reply_to。

Correlation id 關聯標識客戶端可能會發送多個請求給服務器,當服務器處理完後,客戶端沒法辨別在回調隊列中的響應具體和那個請求時對應的。爲了處理這種狀況,客戶端在發送每一個請求時,同時會附帶一個獨有correlation_id屬性,這樣客戶端在回調隊列中根據correlation_id字段的值就能夠分辨此響應屬於哪一個請求。

流程說明

  • 當客戶端啓動的時候,它建立一個匿名獨享的回調隊列。
  • 在 RPC 請求中,客戶端發送帶有兩個屬性的消息:一個是設置回調隊列的 reply_to 屬性,另外一個是設置惟一值的 correlation_id 屬性。
  • 將請求發送到一個 rpc_queue 隊列中。
  • 服務器等待請求發送到這個隊列中來。當請求出現的時候,它執行他的工做而且將帶有執行結果的消息發送給 reply_to 字段指定的隊列。
  • 客戶端等待回調隊列裏的數據。當有消息出現的時候,它會檢查 correlation_id 屬性。若是此屬性的值與請求匹配,將它返回給應用

 5.完整代碼:

  1. 建立兩個控制檯程序,做爲RPC Server和RPC Client, 引用 RabbitMQ.Client,

  2. RPC Server

    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "OrderQueue", UserName = "zhangweizhong", Password = "weizhong1988", Port = 5672 };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "rpc_queue",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                channel.BasicQos(0, 1, false);
                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queue: "rpc_queue",
                                     noAck: false,
                                     consumer: consumer);
                Console.WriteLine(" [x] Awaiting RPC requests");

                while (true)
                {
                    string response = null;
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                    var body = ea.Body;
                    var props = ea.BasicProperties;
                    var replyProps = channel.CreateBasicProperties();
                    replyProps.CorrelationId = props.CorrelationId;

                    try
                    {
                        var message = Encoding.UTF8.GetString(body);
                        int n = int.Parse(message);
                        Console.WriteLine(" [.] fib({0})", message);
                        response = fib(n).ToString();
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(" [.] " + e.Message);
                        response = "";
                    }
                    finally
                    {
                        var responseBytes = Encoding.UTF8.GetBytes(response);
                        channel.BasicPublish(exchange: "",
                                             routingKey: props.ReplyTo,
                                             basicProperties: replyProps,
                                             body: responseBytes);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag,
                                         multiple: false);
                    }
                }
            }
        }

        /// <summary>
        /// Assumes only valid positive integer input.
        /// Don't expect this one to work for big numbers,
        /// and it's probably the slowest recursive implementation possible.
        /// </summary>
        private static int fib(int n)
        {
            if (n == 0 || n == 1)
            {
                return n;
            }

            Thread.Sleep(1000 * 10);

            return n;
        }
    }

 

  3. RPC Client

    class Program
    {
        static void Main(string[] args)
        {
            for (int i = 0; i < 10; i++)
            {
                Stopwatch watch = new Stopwatch();

                watch.Start();

                var rpcClient = new RPCClient();

                Console.WriteLine(string.Format(" [x] Requesting fib({0})", i));

                var response = rpcClient.Call(i.ToString());

                Console.WriteLine(" [.] Got '{0}'", response);

                rpcClient.Close();

                watch.Stop();

                Console.WriteLine(string.Format(" [x] Requesting complete {0} ,cost {1} ms", i, watch.Elapsed.TotalMilliseconds));
            }

            Console.WriteLine(" complete!!!! ");


            Console.ReadLine();
        }
    }

    class RPCClient
    {
        private IConnection connection;
        private IModel channel;
        private string replyQueueName;
        private QueueingBasicConsumer consumer;

        public RPCClient()
        {
            var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "OrderQueue", UserName = "zhangweizhong", Password = "weizhong1988", Port = 5672 };
            connection = factory.CreateConnection();
            channel = connection.CreateModel();
            replyQueueName = channel.QueueDeclare().QueueName;
            consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queue: replyQueueName,
                                 noAck: true,
                                 consumer: consumer);
        }

        public string Call(string message)
        {
            var corrId = Guid.NewGuid().ToString();
            var props = channel.CreateBasicProperties();
            props.ReplyTo = replyQueueName;
            props.CorrelationId = corrId;

            var messageBytes = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "",
                                 routingKey: "rpc_queue",
                                 basicProperties: props,
                                 body: messageBytes);

            while (true)
            {
                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                if (ea.BasicProperties.CorrelationId == corrId)
                {
                    return Encoding.UTF8.GetString(ea.Body);
                }
            }
        }

        public void Close()
        {
            connection.Close();
        }
    }

  4.分別運行Server和Client

 

6.最後

  1.參照RabbitMQ官方教程的RPC,地址:http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html

  2.本文源代碼下載,http://files.cnblogs.com/files/zhangweizhong/Weiz.RabbitMQ.RPC.rar

  3.博客原地址:http://fpeach.com/post/2016/12/01/RabbitMQ%E5%AD%A6%E4%B9%A0%E7%B3%BB%E5%88%97%EF%BC%88%E4%BA%94%EF%BC%89-RPC-%E8%BF%9C%E7%A8%8B%E8%BF%87%E7%A8%8B%E8%B0%83%E7%94%A8.aspx

相關文章
相關標籤/搜索