RabbitMQ五:生產者--隊列--多消費者

1、生成者-隊列-多消費者(前言)

 

 上篇文章,咱們作了一個簡單的Demo,一個生產者對應一個消費者,本篇文章就介紹 生產者-隊列-多個消費者,下面簡單示意圖測試

 

 P 生產者    C 消費者  中間隊列

 需求背景:工廠某部門須要生產n個零件,部門下面有2個小組,每一個小組須要生產n/2個spa

公平派遣

每一個小組的狀況下,當全部奇怪的信息都很重,甚至信息很輕的時候,一個工做人員將不斷忙碌,另外一個工做人員幾乎不會作任何工做。那麼,RabbitMQ不知道什麼,還會平均分配消息。code

這是由於當消息進入隊列時,RabbitMQ只會分派消息。它不看消費者的未確認消息的數量。它只是盲目地向第n個消費者發送每一個第n個消息。blog

 

下面就由咱們擼代碼實現,這一需求::::隊列

2、代碼

P 生產者代碼::: 

 static void Main(string[] args)
        {
            using (var channel = HelpConnection.GetConnection().CreateModel())
            {
                //聲明隊列  
                channel.QueueDeclare("firstQueue", true, false, false, null);
                //聲明路由
                channel.ExchangeDeclare("firstExchange", "direct", true, false, null);
                //綁定 創建關係
                channel.QueueBind("firstQueue", "firstExchange", "firstQueue_Exchange");
                //內容的基本屬性
               var properties=channel.CreateBasicProperties();
                //設置消息內容持久化
               properties.Persistent = true;
                int j = 0;
                for (int i = 0; i < 100; i++)
                {
                    var msg = Encoding.UTF8.GetBytes("生產者-隊列-多個消費者" + i);
                    channel.BasicPublish(exchange: "firstExchange",
                                         routingKey: "firstQueue_Exchange",
                                         basicProperties: properties,
                                         body: msg);
                    j = i;
                    Console.WriteLine( i);
                }
                Console.WriteLine("添加成功" + j + "");
                Console.ReadKey();
            }
        }

成功添加100條路由

 

 C 消費者代碼::: 

 /// <summary>
        /// 
        /// </summary>
        /// <param name="args"></param>
        static void Main(string[] args)
        {
            using (var channel = HelpConnection.GetConnection().CreateModel())
            {
                //聲明隊列
                channel.QueueDeclare("firstQueue", true, false, false, null);
                //聲明路由
                channel.ExchangeDeclare("firstExchange", "direct", true, false, null);
                //綁定 創建關係
                channel.QueueBind("firstQueue", "firstExchange", "firstQueue_Exchange");

                //公平分發 同一時間只處理一個消息
                channel.BasicQos(0, 1, true);
                var conSumer = new EventingBasicConsumer(channel);
                conSumer.Received += (moede, e) =>
                {
                    var body = e.Body;
                    var msg = Encoding.UTF8.GetString(body);
                    Console.WriteLine("顯示結果:"+msg);
                    //進行交付,肯定此消息已經處理完成
                   // channel.BasicAck( e.DeliveryTag,  false);
                };
                //確認收到消息    進行消費
                channel.BasicConsume("firstQueue", true, conSumer);//false 手動應答;true:自動應答
              
                Console.ReadKey();
            }
        }

 

效果圖(特地創建好幾個項目,同事啓動進行測試)文檔

 

 

3、總結

 本章總結注意幾點:::博客

一、即便RabbitMQ從新啓動,task_queue隊列也不會丟失。如今咱們須要將咱們的消息標記爲持久性 - 將IBasicProperties.SetPersistent設置爲true。string

   var properties = channel.CreateBasicProperties();it

  properties.Persistent = true;

二、公平分發同一時間只處理一個消息

channel.BasicQos(0,1,false)
  • 博主是利用讀書、參考、引用、抄襲、複製和粘貼等多種方式打形成本身的純鍍 24k 文章,請原諒博主成爲一個無恥的文檔搬運工!
  • 小弟剛邁入博客編寫,文中若有不對,歡迎用板磚扶正,但願給你有所幫助。
相關文章
相關標籤/搜索