發佈訂閱模式c#
把消息發送給多個訂閱者。也就是有多個消費端都完整的接收生產者的消息測試
換句話說 把消息廣播給多個消費者this
RabbitMQ不發送消息給隊列,生產者也不知道消息發送到隊列code
生產者只發送消息到exchange 交換器,blog
exchange一方面從生產者接收消息,另外一方面把消息推送到隊列中。隊列
exchange必須知道如何處理接收到的消息 。是加到特定隊列中,仍是添加到多個隊列中,仍是放棄。這個是由他的類型來決定 。ip
而消息的類型有四種,分別是string
Direct,Topic,headers,fanout
fanout消息類型能夠把消息廣播到全部隊列中。it
指定exchange的名稱爲logs,routingkey設置爲空。io
channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
爲隊列的名稱指定一個隨機數
var queueName = channel.QueueDeclare().QueueName; channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
通過對以前代碼的改造
咱們定義了一個能夠廣播類型的exchange和一個隨機名字的隊列 ,
如今咱們須要把他們綁定起來。
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); for (var i = 0; i < 10; i++) { string message = "Hello World!this is message "+i; var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0},id={1}", message,i); Thread.Sleep(1000); } } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); //如下是區別生產者的 var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { var body = e.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine("Received {0}", message); Thread.Sleep(3000);//模擬耗時任務 , Console.WriteLine("Received over"); channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); Console.WriteLine(""); Console.ReadLine(); } } }
啓了一個生產者,兩個消費者,生產者發送10條消息 ,兩個消費者都收到了10條消息
友情提示 我對個人文章負責,發現好多網上的文章 沒有實踐,都發出來的,讓人走不少彎路,若是你在個人文章中遇到沒法實現,或者沒法走通的問題。能夠直接在公衆號《愛碼農愛生活 》留言。一定會再次複查緣由。讓每一篇 文章的流程都能順利實現。