生產者和消費者啓動之後,都有一個接收事件,消費者是接收事件是處理調用方法之後等待生產者的返回,生產者的接收事件是處理接收生產者發送的消息,進行處理。消費者發送的時候要在回調隊列中加入一個標識,標明是哪一個方法進行的調用 。生產者接收到消費之後,若是發現有消息標識 ,把消息標識繼續返回去,這樣消費者能夠保證接收到的消息是哪一個方法調用的c#
主方法經過實例化rpcclient,而後調用rpcclient裏面的方法,得到結果之後關閉安全
rpcclient的邏輯以下函數
static void Main(string[] args) { //TopicMessageTest(); RpcClient rpc = new RpcClient(); Console.WriteLine("開始啓動"); var response = rpc.Call("30"); Console.WriteLine("the result is :"+response); rpc.Close(); Console.WriteLine("調用結束"); Console.ReadLine(); }
public class RpcClient { private readonly IConnection connection; private readonly IModel channel; private readonly string replyQueueName; private readonly EventingBasicConsumer consumer; private readonly IBasicProperties props; private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>(); //線程安全的集合 public RpcClient() { var factory = new ConnectionFactory() { HostName="localhost"}; //建立一個實例 connection = factory.CreateConnection(); //建立鏈接 channel = connection.CreateModel(); //建立信道 replyQueueName = channel.QueueDeclare().QueueName; //建立隊列 consumer = new EventingBasicConsumer(channel);//經過指定的model初臺化消費者 props = channel.CreateBasicProperties(); var relationId = Guid.NewGuid().ToString(); props.CorrelationId = relationId;//應用相關標識 props.ReplyTo = replyQueueName; //回覆隊列指定 consumer.Received += (sender,e)=> { var body = e.Body; var response = Encoding.UTF8.GetString(body); if (e.BasicProperties.CorrelationId == relationId) { respQueue.Add(response); } }; } public string Call(string message) { var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "rpcqueue", basicProperties: props, body: messageBytes); channel.BasicConsume(consumer: consumer, queue: replyQueueName, autoAck: true); return respQueue.Take(); }
static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "rpcqueue",durable:false,exclusive:false,autoDelete:false,arguments:null); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "rpcqueue", autoAck: false,consumer:consumer); Console.WriteLine("Waiting rpc requesting"); consumer.Received += (sender, e) => { string response = null; var body = e.Body; var props = e.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine("request message is :" + message); response = fib(n).ToString(); var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false); }; Console.WriteLine("over"); Console.ReadLine(); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static int fib(int n) { if (n == 0 || n == 1) return n; return fib(n - 1) + fib(n - 2); }
我對個人文章負責,發現好多網上的文章 沒有實踐,都發出來的,讓人走不少彎路,若是你在個人文章中遇到沒法實現,或者沒法走通的問題。能夠直接在公衆號《愛碼農愛生活 》留言。一定會再次複查緣由。讓每一篇 文章的流程都能順利實現。測試