上篇文章,咱們作了一個簡單的Demo,一個生產者對應一個消費者,本篇文章就介紹 生產者-隊列-多個消費者,下面簡單示意圖測試
需求背景:工廠某部門須要生產n個零件,部門下面有2個小組,每一個小組須要生產n/2個spa
每一個小組的狀況下,當全部奇怪的信息都很重,甚至信息很輕的時候,一個工做人員將不斷忙碌,另外一個工做人員幾乎不會作任何工做。那麼,RabbitMQ不知道什麼,還會平均分配消息。code
這是由於當消息進入隊列時,RabbitMQ只會分派消息。它不看消費者的未確認消息的數量。它只是盲目地向第n個消費者發送每一個第n個消息。blog
下面就由咱們擼代碼實現,這一需求::::隊列
static void Main(string[] args) { using (var channel = HelpConnection.GetConnection().CreateModel()) { //聲明隊列 channel.QueueDeclare("firstQueue", true, false, false, null); //聲明路由 channel.ExchangeDeclare("firstExchange", "direct", true, false, null); //綁定 創建關係 channel.QueueBind("firstQueue", "firstExchange", "firstQueue_Exchange"); //內容的基本屬性 var properties=channel.CreateBasicProperties(); //設置消息內容持久化 properties.Persistent = true; int j = 0; for (int i = 0; i < 100; i++) { var msg = Encoding.UTF8.GetBytes("生產者-隊列-多個消費者" + i); channel.BasicPublish(exchange: "firstExchange", routingKey: "firstQueue_Exchange", basicProperties: properties, body: msg); j = i; Console.WriteLine( i); } Console.WriteLine("添加成功" + j + "條"); Console.ReadKey(); } }
成功添加100條路由
/// <summary> /// /// </summary> /// <param name="args"></param> static void Main(string[] args) { using (var channel = HelpConnection.GetConnection().CreateModel()) { //聲明隊列 channel.QueueDeclare("firstQueue", true, false, false, null); //聲明路由 channel.ExchangeDeclare("firstExchange", "direct", true, false, null); //綁定 創建關係 channel.QueueBind("firstQueue", "firstExchange", "firstQueue_Exchange"); //公平分發 同一時間只處理一個消息 channel.BasicQos(0, 1, true); var conSumer = new EventingBasicConsumer(channel); conSumer.Received += (moede, e) => { var body = e.Body; var msg = Encoding.UTF8.GetString(body); Console.WriteLine("顯示結果:"+msg); //進行交付,肯定此消息已經處理完成 // channel.BasicAck( e.DeliveryTag, false); }; //確認收到消息 進行消費 channel.BasicConsume("firstQueue", true, conSumer);//false 手動應答;true:自動應答 Console.ReadKey(); } }
效果圖(特地創建好幾個項目,同事啓動進行測試)文檔
本章總結注意幾點:::博客
一、即便RabbitMQ從新啓動,task_queue隊列也不會丟失。如今咱們須要將咱們的消息標記爲持久性 - 將IBasicProperties.SetPersistent設置爲true。string
var properties = channel.CreateBasicProperties();it
properties.Persistent = true;
二、公平分發同一時間只處理一個消息
channel.BasicQos(0,1,false)