RabbitMQ入門學習系列(七) 遠程調用RPC

快速閱讀

生產者和消費者啓動之後,都有一個接收事件,消費者是接收事件是處理調用方法之後等待生產者的返回,生產者的接收事件是處理接收生產者發送的消息,進行處理。消費者發送的時候要在回調隊列中加入一個標識,標明是哪一個方法進行的調用 。生產者接收到消費之後,若是發現有消息標識 ,把消息標識繼續返回去,這樣消費者能夠保證接收到的消息是哪一個方法調用的c#

關於RPC調用的建議

  1. 明確哪一個函數是調用本地的,哪一個函數是調用遠程的
  2. 組合之間的依賴要清晰明瞭
  3. 應該能處理當遠程服務掛掉的時的錯誤

消費者代碼

主方法經過實例化rpcclient,而後調用rpcclient裏面的方法,得到結果之後關閉安全

rpcclient的邏輯以下函數

  1. 聲明鏈接和信道
  2. 建立一個回調的隊列 。
  3. 定義一個消費者的事件,綁定信道。
  4. 爲信道建立一個消息頭,在裏面標識消息id,和回調隊列的名稱
  5. 消費者接收事件處理,當收到消息之後,判斷消息頭,若是是發送的消息id,則加入到返回的消息集合中。
  6. 從消息集合中取值
static void Main(string[] args)
{
    //TopicMessageTest();
    RpcClient rpc = new RpcClient();
    Console.WriteLine("開始啓動");
    var response = rpc.Call("30");
    Console.WriteLine("the result is :"+response);
    rpc.Close();
    Console.WriteLine("調用結束");
    Console.ReadLine();
}
public class RpcClient
 {

     private readonly IConnection connection;
     private readonly IModel channel;
     private readonly string replyQueueName;
     private readonly EventingBasicConsumer consumer;
     private readonly IBasicProperties props;

     private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>(); //線程安全的集合
     public RpcClient()
     {
         var factory = new ConnectionFactory() { HostName="localhost"}; //建立一個實例

         connection = factory.CreateConnection(); //建立鏈接

         channel = connection.CreateModel(); //建立信道

         replyQueueName = channel.QueueDeclare().QueueName; //建立隊列

         consumer = new EventingBasicConsumer(channel);//經過指定的model初臺化消費者

         props = channel.CreateBasicProperties();

         var relationId = Guid.NewGuid().ToString();
         props.CorrelationId = relationId;//應用相關標識 
         props.ReplyTo = replyQueueName;  //回覆隊列指定 

         consumer.Received += (sender,e)=>
         {
             var body = e.Body;
             var response = Encoding.UTF8.GetString(body);
             if (e.BasicProperties.CorrelationId == relationId)
             {
                 respQueue.Add(response);
             }
         };



     }

     public string Call(string message)
     {
         var messageBytes = Encoding.UTF8.GetBytes(message);
         channel.BasicPublish(exchange: "", routingKey: "rpcqueue", basicProperties: props, body: messageBytes);
         channel.BasicConsume(consumer: consumer, queue: replyQueueName, autoAck: true);
         return respQueue.Take();
     }

生產者代碼

  1. 建立連接和信道
  2. 聲明一個隊列,指定隊列名稱。
  3. 配置Qos,每次取幾條消息
  4. 建立消費者在接收事件中對消費者發送的消息進行處理。
  5. 事件處理中,body表示接收到的消息 ,basicProperties是消息頭,對消息進行處理之後,再把消息以及消息的隊列發送給消費者
static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
    {

        channel.QueueDeclare(queue: "rpcqueue",durable:false,exclusive:false,autoDelete:false,arguments:null);
        channel.BasicQos(0, 1, false);

        var consumer = new EventingBasicConsumer(channel);
        channel.BasicConsume(queue: "rpcqueue", autoAck: false,consumer:consumer);
        Console.WriteLine("Waiting rpc requesting");

        consumer.Received += (sender, e) =>
        {
            string response = null;
            var body = e.Body;
            var props = e.BasicProperties;
            var replyProps = channel.CreateBasicProperties();
            replyProps.CorrelationId = props.CorrelationId;

            var message = Encoding.UTF8.GetString(body);
            int n = int.Parse(message);
            Console.WriteLine("request message is :" + message);
            response = fib(n).ToString();

            var responseBytes = Encoding.UTF8.GetBytes(response);
            channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes);
            channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
        };
        Console.WriteLine("over");
        Console.ReadLine();

    }


    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
}


private static int fib(int n)
{
    if (n == 0 || n == 1) return n;
    return fib(n - 1) + fib(n - 2);
}

測試結果

友情提示

​ 我對個人文章負責,發現好多網上的文章 沒有實踐,都發出來的,讓人走不少彎路,若是你在個人文章中遇到沒法實現,或者沒法走通的問題。能夠直接在公衆號《愛碼農愛生活 》留言。一定會再次複查緣由。讓每一篇 文章的流程都能順利實現。測試

相關文章
相關標籤/搜索