RabbitMQ 實現遠程過程調用RPC

RPC調用的順序
1. 在客戶端初始化的時候,也就是SimpleRpcClient類初始化的時候,它會隨機的建立一個callback隊列,用於存放服務的返回值,這個隊列是exclusive的。鏈接斷開就沒有了。
2. 客戶端在發送Request的時候,會加上兩個參數:ReplyTo和CorrelationId,前者用於告訴服務返回值放在哪一個隊列裏面或路由,後者用於配對每次的Request。這兩個屬性都放在客戶端發送消息的附帶的IBasicProperties字典中。
3. 把消息放入服務的監控隊列裏,消息裏面天然有調用方法的參數。
4. 服務在所監控的隊列中收到數據後,進行運算,並把返回值放入到客戶端指定的callback隊列中去。
5. 客戶端在發送完Request後,便去本身建立的callback隊列監聽,若是得到到數據,則查看裏面的CorrelationId,若是和調用Request一致,則返回結果。ide

Server 端:oop

 class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "192.168.254.40",
                UserName = "admin",
                Password = "admin",
            };

            //第一步:建立connection 
            var connection = factory.CreateConnection();

            //第二步:建立一個channel
            var channel = connection.CreateModel();

            channel.QueueDeclare("rpc_queue", true, false, false, null);

            Subscription subscription = new Subscription(channel, "rpc_queue");

            MySimpleRpcServer server = new MySimpleRpcServer(subscription);

            Console.WriteLine("server 端啓動完畢!!!");

            server.MainLoop();

            Console.Read();
        }
    }

    public class MySimpleRpcServer : SimpleRpcServer
    {
        public MySimpleRpcServer(Subscription subscription) : base(subscription)
        {

        }

        public override byte[] HandleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)
        {
            return base.HandleCall(isRedelivered, requestProperties, body, out replyProperties);
        }

        public override byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)
        {
            replyProperties = null;

            var msg = string.Format("當前文字長度爲:{0}", Encoding.UTF8.GetString(body).Length);

            return Encoding.UTF8.GetBytes(msg);
            //return base.HandleSimpleCall(isRedelivered, requestProperties, body, out replyProperties);
        }

        public override void ProcessRequest(BasicDeliverEventArgs evt)
        {
            base.ProcessRequest(evt);
        }
    }

Client 端:spa

static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "192.168.254.40",
                UserName = "admin",
                Password = "admin",
            };

            //第一步:建立connection
            var connection = factory.CreateConnection();

            //第二步:建立一個channel
            var channel = connection.CreateModel();

            SimpleRpcClient client = new SimpleRpcClient(channel, string.Empty, ExchangeType.Direct, "rpc_queue");


            var bytes = client.Call(Encoding.UTF8.GetBytes("hello world!!!!"));

            var result = Encoding.UTF8.GetString(bytes);
        }
相關文章
相關標籤/搜索