RabbitMQ九:遠程過程調用RPC

定義

RPC(Remote Procedure Call Protocol)——遠程過程調用協議:它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,爲通訊程序之間攜帶信息數據。在OSI網絡通訊模型中,RPC跨越了傳輸層和應用層。RPC使得開發包括網絡分佈式多程序在內的應用程序更加容易。編程

 

PRC採用客戶端/服務端模式,請求程序就是一個客戶機,而服務提供就是一個服務器。首先,客戶機調用進程發送一個有進程參數的調用信息到服務進程,而後等待應答信息,在服務器,進程保持睡眠狀態直到調用信息到達爲止。當一個調用信息到達,服務器得到進程參數,計算結果,發送答覆消息,而後等待下一個調用信息,最後,客戶端調用進程接收答覆信息,得到進程結果,而後調用執行繼續前進。(整個過程有點相似:你到某大醫院看病,你先到櫃檯交錢拿卡(醫師費),拿卡去找醫生(卡表明你的認證至關參數),醫生根據卡給你把脈看病進行詳談溝通,醫診結束後給你開藥,下一位患者進入。。。。。,醫生有是服務端,患者是客戶端,舉例可能有點牽強,就是表達那個意思,)服務器

 

RPC是在計算機中一種常見的模式,是一般我要用消息隊列3個關鍵點:網絡

1、服務的尋址;

2、消息的接受;

3、消息的關聯。

RPC調用的順序簡述:

 

 

 1、當客戶端啓動時,它會建立一個匿名的獨佔會回調隊列;

 2、對於一個RPC請求,客戶端經過兩個屬性發送一條消息(從圖中咱們也能夠看到):relayTo 設置回調隊列;correlationId,爲每一個請求設置惟一的標識ID;

 3、消息將發送到一個Rpc_queue  隊列;

 4、RPC工程線程(服務器)在該隊列上等待請求,當請求出現,他將處理請求並把結果發回到客戶端,使用隊列在replayTo中設置;

 5、客戶端在回調隊列上等待響應,當消息出現,它檢查關聯ID,若是匹配來自請求的關聯ID值,返回隊列消息到該應用程序。

重點解釋

correlationId 和 relayTo 參數異步

首先客戶端經過RPC向服務端發送請求分佈式

我這裏有一堆東西須要你給我處理一下,correlationId :這是個人請求標識,relayTo :你處理完事後把結果返回到這個隊列中。

服務端拿到請求,並開始處理並返回結果ui

correlationId :這是你的請求標識 ,原封不動的給你。 這時候客戶端用本身的correlationId 與服務端返回的id進行對比。是個人,就接收。

適合RPC場合說明

但願同步獲得數據的場合,RPC合適;

但願使用簡單,則RPC;RPC操做基於接口,使用簡單,使用方式模式本地調用。異步的方式編程比較複雜。

不但願客戶端受限於服務端的速度等,可使用Message Queue

RabbitMQ RPC的特色spa

Message Queue  把全部的請求消息存儲起來,而後處理,和客戶端解耦;

Message Queue  引用新的結點,系統的可靠性會受Message Queue 結點的影響;

Meaage  Queue 是異步單向的消息,發送消息設計成是不須要等待消息處理的完成。

因此對於有同步返回需求,Message Queue 是個不錯的方向

普通RPC的特色

同步調用,對於要等待返回結果、處理結果的場景,RPC是能夠很是天然直覺的使用方式,固然RPC也能夠異步調用。

因爲等待結果,客戶端會有線程消耗。

若是以異步RPC的方式使用,客戶端線程消耗能夠去掉,但不能作到像消息同樣暫存消息請求,壓力會直接傳導到服務端。

 

代碼塊

備註(建立兩個解決方案:服務端和客戶端)pwa

 服務端

 static void Main(string[] args)
        {
            using (var channel = GetConnection().CreateModel())
            {
                channel.QueueDeclare("rpc_queue", true, false, false, null);
                channel.BasicQos(0, 1, false);
                var consumer = new EventingBasicConsumer(channel);
                // var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume("rpc_queue", false, consumer);
                Console.WriteLine("等待 RPC 隊列");
                consumer.Received += (model, ea) =>
                {
                    // while (true)
                    // {
                    string response = null;
                    //出列
                    // var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    var body = ea.Body;
                    var props = ea.BasicProperties;
//內容的基本屬性
var replyProps = channel.CreateBasicProperties();
//注意這裏的correlationId replyProps.CorrelationId
= props.CorrelationId; try { var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine("顯示內容" + message); response = fib(n).ToString(); } catch (Exception e) { Console.WriteLine("報錯" + e.ToString()); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish("", props.ReplyTo, replyProps, responseBytes); channel.BasicAck(ea.DeliveryTag, false); } // }; }; Console.WriteLine("發佈成功!!!"); Console.ReadLine(); } } /// <summary> /// 私有方法 /// </summary> /// <param name="n"></param> /// <returns></returns> private static int fib(int n) { if (n == 0 || n == 1) { return n; } //Thread.Sleep()方法用於將當前線程休眠必定時間 時間單位是毫秒 1000毫秒= 1秒 //System.Threading.Thread.Sleep(2000);當前休眠2秒 //suspen()掛起當前線程。也能夠指定掛起時間。 //close() 關閉當前線程。 Thread.Sleep(100 * 10); return n; // return fib(n - 1) + fib(n - 2); }

客戶端(兩個類:Consumer,HelpConnection)

Consumer代碼塊:

  static void Main(string[] args)
        {
            for (int i = 0; i < 30; i++)
            {
                Stopwatch watch = new Stopwatch();
                watch.Start();
                var rpcClient = new HelpConnection();
                Console.WriteLine("顯示內容" + i.ToString());
                var response = rpcClient.Call(i);
                Console.WriteLine("顯示內容" + response);
                //當前鏈接關閉
                rpcClient.Close();
                watch.Stop();
                Console.WriteLine(string.Format(" [x] Requesting complete {0} ,cost {1} ms", i, watch.Elapsed.TotalMilliseconds));
            }
            Console.WriteLine(" complete!!!! ");
            Console.ReadLine();
        }

HelpConnection代碼塊:

        /// <summary>
        /// 成員變量
        /// </summary>
        private static IConnection connection { get; set; }
        private IModel channel { get; set; }
        private string replyQueueName { get; set; }
        private QueueingBasicConsumer consumer { get; set; }

        /// <summary>
        /// 構造方法:鏈接配置
        /// </summary>
        public HelpConnection()
        {
            var factory = new ConnectionFactory()
            {
                //計算機名稱,帳號,密碼,
                HostName = "localhost",
                UserName = "zhangguangpo",
                Password = "guangpo1992",
                //RequestedHeartbeat = 60,
                AutomaticRecoveryEnabled = true   //要啓用自動鏈接恢復
            };
            //建立鏈接
            connection = factory.CreateConnection();
            channel = connection.CreateModel();
            //而客戶端爲了得到處理結果,那麼客戶在向服務器發送請求時,同時發送一個回調隊列地址reply_to
            replyQueueName = channel.QueueDeclare().QueueName;
            consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queue: replyQueueName,
                                 noAck: true,
                                 consumer: consumer);
            //  return Connection;
        }

        /// <summary>
        /// 消息判斷
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        public string Call(int message)
        {
            var corrId = Guid.NewGuid().ToString();
            var props = channel.CreateBasicProperties();
            props.ReplyTo = replyQueueName;
            props.CorrelationId = corrId;
            var messageBates = Encoding.UTF8.GetBytes(message.ToString());
            channel.BasicPublish("", "rpc_queue", props, messageBates);
            while (true)
            {
                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                if (ea.BasicProperties.CorrelationId == corrId)
                {
                    var body = Encoding.UTF8.GetString(ea.Body);
                    return body;
                }
            }
        }

        /// <summary>
        /// 當前鏈接關閉
        /// </summary>
        public void Close()
        {
            connection.Close();
        }

效果圖線程

 

 

 

  • 博主是利用讀書、參考、引用、抄襲、複製和粘貼等多種方式打形成本身的純鍍 24k 文章,請原諒博主成爲一個無恥的文檔搬運工!
  • 小弟剛邁入博客編寫,文中若有不對,歡迎用板磚扶正,但願給你有所幫助。
相關文章
相關標籤/搜索