RabbitMQ.Net 應用(2)

//生產者

using
RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Producter { class Program { private static void Main() { //創建RabbitMQ鏈接和通道 var connectionFactory = new ConnectionFactory { HostName = "192.168.142.128", Port = 5672, //默認爲這個端口 開始誤覺得是 管理接口的 15672
UserName = "superrd", Password = "superrd", Protocol = Protocols.DefaultProtocol, AutomaticRecoveryEnabled = true, //自動重連 RequestedFrameMax = UInt32.MaxValue, RequestedHeartbeat = UInt16.MaxValue //心跳超時時間  }; try { using (var connection = connectionFactory.CreateConnection()) { using (var channel = connection.CreateModel()) { //建立一個新的,持久的交換區 channel.ExchangeDeclare("SISOExchange", ExchangeType.Direct, true, false, null); //建立一個新的,持久的隊列, 沒有排他性,與不自動刪除 channel.QueueDeclare("SISOqueue", true, false, false, null); // 綁定隊列到交換區 channel.QueueBind("SISOqueue", "SISOExchange", "optionalRoutingKey"); // 設置消息屬性 var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; //消息是持久的,存在並不會受服務器重啓影響 //準備開始推送 //發佈的消息能夠是任何一個(能夠被序列化的)字節數組,如序列化對象,一個實體的ID,或只是一個字符串 var encoding = new UTF8Encoding(); for (var i = 0; i < 10; i++) { var msg = string.Format("這是消息 #{0}?", i + 1); var msgBytes = encoding.GetBytes(msg); //RabbitMQ消息模型的核心思想就是,生產者不把消息直接發送給隊列。實際上,生產者在不少狀況下都不知道消息是否會被髮送到一個隊列中。取而代之的是,生產者將消息發送到交換區。交換區是一個很是簡單的東西,它一端接受生產者的消息,另外一端將他們推送到隊列中。交換區必需要明確的指導如何處理它接受到的消息。是放到一個隊列中,仍是放到多個隊列中,亦或是被丟棄。這些規則能夠經過交換區的類型來定義。 //可用的交換區類型有:direct,topic,headers,fanout。 //Exchange:用於接收消息生產者發送的消息,有三種類型的exchange:direct, fanout,topic,不一樣類型實現了不一樣的路由算法; //RoutingKey:是RabbitMQ實現路由分發到各個隊列的規則,並結合Binging提供於Exchange使用將消息推送入隊列; //Queue:是消息隊列,能夠根據須要定義多個隊列,設置隊列的屬性,好比:消息移除、消息緩存、回調機制等設置,實現與Consumer通訊; channel.BasicPublish("SISOExchange", "optionalRoutingKey", properties, msgBytes); } channel.Close(); } } } catch (Exception ex) { Console.WriteLine(ex.Message); } Console.WriteLine("消息發佈!"); Console.ReadKey(true); } } }
 
 
// 消費者

using
RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Client.MessagePatterns; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Comsumer { class Program { private static void Main() { // 創建RabbitMQ鏈接和通道 var connectionFactory = new ConnectionFactory { HostName = "192.168.142.128", Port = 5672, //默認爲這個端口 開始誤覺得是 管理接口的 15672 UserName = "superrd", Password = "superrd", Protocol = Protocols.AMQP_0_9_1, RequestedFrameMax = UInt32.MaxValue, RequestedHeartbeat = UInt16.MaxValue }; using (var connection = connectionFactory.CreateConnection()) using (var channel = connection.CreateModel()) { // 這指示通道不預取超過1個消息 channel.BasicQos(0, 1, false); //建立一個新的,持久的交換區 channel.ExchangeDeclare("SISOExchange", ExchangeType.Direct, true, false, null); //建立一個新的,持久的隊列 channel.QueueDeclare("sample-queue", true, false, false, null); //綁定隊列到交換區 channel.QueueBind("SISOqueue", "SISOExchange", "optionalRoutingKey"); using (var subscription = new Subscription(channel, "SISOqueue", false)) { Console.WriteLine("等待消息..."); var encoding = new UTF8Encoding(); while (channel.IsOpen) { BasicDeliverEventArgs eventArgs; var success = subscription.Next(2000, out eventArgs); if (success == false) continue; var msgBytes = eventArgs.Body; var message = encoding.GetString(msgBytes); Console.WriteLine(message); channel.BasicAck(eventArgs.DeliveryTag, false); } } } } } }
相關文章
相關標籤/搜索