RabbitMQ (八) 隊列的參數詳解

代碼中,咱們一般這樣聲明一個隊列:node

                    //聲明隊列
                    channel.QueueDeclare
                    (
                        queue: QueueName, //隊列名稱
                        durable: false, //隊列是否持久化.false:隊列在內存中,服務器掛掉後,隊列就沒了;true:服務器重啓後,隊列將會從新生成.注意:只是隊列持久化,不表明隊列中的消息持久化!!!!
                        exclusive: false, //隊列是否專屬,專屬的範圍針對的是鏈接,也就是說,一個鏈接下面的多個信道是可見的.對於其餘鏈接是不可見的.鏈接斷開後,該隊列會被刪除.注意,不是信道斷開,是鏈接斷開.而且,就算設置成了持久化,也會刪除.
                        autoDelete: true, //若是全部消費者都斷開鏈接了,是否自動刪除.若是尚未消費者從該隊列獲取過消息或者監聽該隊列,那麼該隊列不會刪除.只有在有消費者從該隊列獲取過消息後,該隊列纔有可能自動刪除(當全部消費者都斷開鏈接,無論消息是否獲取完)
                        arguments: null //隊列的配置
                    );

 

對於第5個參數: arguments ,緩存

它的類型是一個鍵值對集合 : 服務器

 

它到底有哪些key呢?app

咱們能夠經過 RabbitMQ 的管理頁面看到:性能

 

 

一共10個:測試

  • Message TTL : 消息生存期
  • Auto expire : 隊列生存期
  • Max length : 隊列能夠容納的消息的最大條數
  • Max length bytes : 隊列能夠容納的消息的最大字節數
  • Overflow behaviour : 隊列中的消息溢出後如何處理
  • Dead letter exchange : 溢出的消息須要發送到綁定該死信交換機的隊列
  • Dead letter routing key : 溢出的消息須要發送到綁定該死信交換機,而且路由鍵匹配的隊列
  • Maximum priority : 最大優先級
  • Lazy mode : 懶人模式
  • Master locator : 

 

Message TTL

官方 : How long a message published to a queue can live before it is discarded (milliseconds). (Sets the "x-message-ttl" argument.) this

翻譯 : 一個隊列中的消息,在被丟棄以前可以存活多少毫秒.( key 爲 "x-message-ttl").通俗講就是,隊列中的消息的生存週期,單位毫秒.編碼

測試 :spa

    internal class Program
    {
        private static void Main(string[] args)
        {
            Dictionary<string, object> arguments = new Dictionary<string, object> { { "x-message-ttl", 10000 } };
            Producer.Send(arguments);
            Console.ReadKey();
        }
    }

    public class Producer
    {
        private const string QueueName = "test_queue";
        public static void Send(IDictionary<string,object> arguments)
        {
            using (IConnection connection = ConnectionHelper.GetConnection())
            using (IModel channel = connection.CreateModel())
            {
                channel.QueueDeclare(QueueName, false, false, false, arguments);
                string msg = "hello world ";
                channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg));
                Console.WriteLine($"{DateTime.Now} : send {msg}");
            }
        }
    }

 

咱們從管理頁面該隊列的消息概述中能夠看出,這條消息只存活了10秒鐘.翻譯

 

爲了加以對比,我還另外建立了一個沒有聲明任何參數的隊列:

能夠看出,測試的這條隊列的特徵(Features)一欄中,被打上了"TTL"標籤.

這裏有個很是重要的知識點須要注意!

隊列一旦聲明,參數將沒法更改,添加,刪除,也就是說,對上述"test_queue"隊列進行以下操做都會拋出異常:

  • 修改該隊列的 "x-message-ttl" 參數爲 "20000" 毫秒 : 
    Dictionary<string, object> arguments = new Dictionary<string, object> { { "x-message-ttl", 20000 } };
  • 試圖經過傳入 null 來刪除該隊列的全部已設置的參數 : 
    Dictionary<string, object> arguments = null;
  • 試圖添加新的參數 :
                Dictionary<string, object> arguments = new Dictionary<string, object> { { "x-message-ttl", 10000 } };
                arguments.Add("x-expires", 12345);

     

統統不行!!!

要改變一個隊列的參數,只有兩種辦法:

  • 刪除該隊列,從新建立;
  • 換個名字,建立一個新的隊列.

 

 

Auto expire

官方 : How long a queue can be unused for before it is automatically deleted (milliseconds).(Sets the "x-expires" argument.) 

翻譯 : 隊列多長時間(毫秒)沒有被使用(訪問)就會被刪除.換個說法就是,當隊列在指定的時間內沒有被使用(訪問)就會被刪除.

測試 : 

        private static void Main(string[] args)
        {
            Dictionary<string, object> arguments = new Dictionary<string, object>
            {
                {"x-message-ttl", 10000},//設置隊列中的消息存活期爲 10 秒
                { "x-expires", 20000}//設置隊列的存活期 20 秒
            };
            Producer.Send(arguments);
            Console.ReadKey();
        }

能夠看到,隊列特徵一欄中,多了一個"Exp"標籤

 

固然,10秒後,消息會被刪除,20秒後,隊列被刪除

可是,若是咱們同時建立一個消費者,監聽該隊列,以下:

    public class Consumer
    {
        private const string QueueName = "test_queue";
        public static void Receive()
        {
            IConnection connection = ConnectionHelper.GetConnection();
            IModel channel = connection.CreateModel();
 EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (s, e) =>
            {
                byte[] bytes = e.Body;
                string str = Encoding.Default.GetString(bytes);
                Console.WriteLine("consumer receive : " + str); };
            channel.BasicConsume(QueueName, true, "", false, false, null, consumer);
        }
    }

 

        private static void Main(string[] args)
        {
            Dictionary<string, object> arguments = new Dictionary<string, object>
            {
                {"x-message-ttl", 10000},//設置隊列中的消息存活期爲 10 秒
                { "x-expires", 20000}//設置隊列的存活期 20 秒
            };
            Producer.Send(arguments);
            Consumer.Receive();//建立一個消費者監聽該隊列
            Console.ReadKey();
        }

 

那該隊列永遠不會被刪除.由於雖然它裏面沒有消息,但一直有消費者在使用(訪問)它,因此它不會被刪除.

 

 

Max length

官方 : How many (ready) messages a queue can contain before it starts to drop them from its head.(Sets the "x-max-length" argument.) 

翻譯 : 隊列能夠容納的消息的最大條數,超過這個條數,隊列頭部的消息將會被丟棄.

測試 : 咱們設置隊列最多隻能容納 1 條消息,而後一次性發送10條,發送完畢後,建立一個消費者去消費,部分代碼以下:

生產者

                for (int i = 0; i < 10; i++)
                {
                    string msg = "hello world " + i;
                    channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg));
                    Console.WriteLine($"{DateTime.Now} : send {msg}");
                }

 

        private static void Main(string[] args)
        {
            Dictionary<string, object> arguments = new Dictionary<string, object>
            {
                {"x-message-ttl", 10000},//設置隊列中的消息存活期爲 10 秒
                { "x-expires", 20000},//設置隊列的存活期 20 秒
                {"x-max-length",1 },//設置隊列中的消息的最大條數爲 1 條,超過1條,則遵循隊列的"先進先出(丟)"原則.
            };
            Producer.Send(arguments);
            Thread.Sleep(1000);
            Consumer.Receive();//建立一個消費者監聽該隊列
            Console.ReadKey();
        }

運行結果:

能夠看到,消費者消費到的是"hello world 9", "hello world 0" - "hello world 8"都被隊列丟棄了.

標籤以下:

 

 

Max length bytes

官方 : Total body size for ready messages a queue can contain before it starts to drop them from its head.(Sets the "x-max-length-bytes" argument.) 

翻譯 : 隊列能夠容納的消息的最大字節數,超過這個字節數,隊列頭部的消息將會被丟棄.

測試 : 咱們首先設置隊列最多隻能容納12個字節.

            Dictionary<string, object> arguments = new Dictionary<string, object>
            {
                {"x-message-ttl", 10000},//設置隊列中的消息存活期爲 10 秒
                { "x-expires", 20000},//設置隊列的存活期 20 秒
                {"x-max-length-bytes",12 },//設置隊列中的消息的最大字節數
            };

接下來要分兩種狀況了:

.發送一條 15個字節的消息:"新年快樂啊"(咱們採用UTF8編碼,1個漢字佔3個字節),發送完畢後,建立一個消費者去消費.(固然,消費者也要用UTF8來接收)

                string msg = "新年快樂啊";
                channel.BasicPublish("", QueueName, null, Encoding.UTF8.GetBytes(msg));
                Console.WriteLine($"{DateTime.Now} : send {msg}");

 

 

能夠看到,消費者並無收到消息.說明整條消息都被丟棄了,而不是想象中的只丟棄"新年快樂" ,剩個"啊".

.連續發送2條累計15個字節的消息. 

                channel.BasicPublish("", QueueName, null, Encoding.UTF8.GetBytes("新年快樂"));
                channel.BasicPublish("", QueueName, null, Encoding.UTF8.GetBytes(""));
                Console.WriteLine($"{DateTime.Now} : 消息發送完畢 ");

 

 

 

能夠看到, 消費者收到了"啊".

隊列標籤以下 :

 

 

 

Overflow behaviour

官方 : Sets the queue overflow behaviour. This determines what happens to messages when the maximum length of a queue is reached. Valid values are drop-head or reject-publish

翻譯 : 隊列中的消息溢出時,如何處理這些消息.要麼丟棄隊列頭部的消息,要麼拒絕接收後面生產者發送過來的全部消息.( 從上面兩個參數的測試中能夠看出,"drop-head" 應該是默認行爲) ,官方只給了 value,沒給 key . key 爲 "x-overflow".

測試 : 沿用  Max length 參數解釋中的示例:

        private static void Main(string[] args)
        {
            Dictionary<string, object> arguments = new Dictionary<string, object>
            {
                {"x-max-length",1 },//設置隊列中的消息的最大條數爲 1 條,超過1條,則遵循隊列的"先進先出(丟)"原則.         
                {"x-overflow","reject-publish" },//設置隊列中的消息溢出後,該隊列的行爲:"拒絕接收"(全部消息)
            };
            Producer.Send(arguments);
            Thread.Sleep(1000);
            Consumer.Receive();//建立一個消費者監聽該隊列
            Console.ReadKey();
        }

運行結果:

 

能夠看到,此次消費者消費的是" hello world 0" ,再也不是 "hello world 9" 了,由於生產者發送的"hello world 1" - "hello world 9"被隊列拒絕了.

標籤以下:

 

 

Dead letter exchange

官方 : Optional name of an exchange to which messages will be republished if they are rejected or expire.(Sets the "x-dead-letter-exchange" argument.) 

翻譯 : 該參數值爲一個(死信)交換機的名稱,當隊列中的消息的生存期到了,或者因長度限制被丟棄時,消息會被推送到(綁定到)這臺交換機(的隊列中),而不是直接丟掉. 

對於這個參數,有兩點須要特別注意:

一.前面的文章中提到過:

因此,在測試前,咱們須要建立一個隊列,並綁定到該交換機.固然,交換機也須要提早建立好.

爲了方便,咱們在管理頁面建立交換機和隊列,並完成"綁定"動做.

建立交換機

1.先選擇"Exchanges"標籤

 

2.點擊"Add a new exchange"標籤.

 

這裏又要注意了,交換機類型咱們要選擇"fanout"模式.這種模式下,交換機會將生產者發送的消息分發到全部綁定到它的隊列.細心的朋友應該能看出來爲何.

由於這個參數只是傳入了交換機的名稱,沒有傳入"Routing Key".

建立隊列

建立一個新隊列 : "test_dead_letter_queue" ,圖就不上了,開篇就已經提到了.

綁定交換機

有兩種方式:

1.在交換機詳情頁面輸入要綁定的隊列.

2.在隊列詳情頁面輸入要綁定的交換機.

 

上述工做都完成後,咱們開始正式測試該參數.

咱們設置隊列能夠容納的消息最多1條,多的拒絕接收.

        private static void Main(string[] args)
        {
            Dictionary<string, object> arguments = new Dictionary<string, object>
            {
                {"x-max-length",1 },//設置隊列中的消息的最大條數爲 1 條,超過1條,則遵循隊列的"先進先出(丟)"原則. {"x-overflow","reject-publish" },//設置隊列中的消息溢出後,隊列的行爲:"拒絕接收"(任何消息)
                {"x-dead-letter-exchange","test_dead_letter_exchange" },
            };
            Producer.Send(arguments);
            Thread.Sleep(1000);
            Consumer.Receive();
            Console.ReadKey();
        }

 

生產者的代碼咱們沿用 Max length 參數解釋中的代碼.

運行後,咱們去看後臺.

咦!?"test_dead_letter_queue"隊列怎麼一條消息都沒有呢?

爲何交換機沒有把 "test_queue" 丟棄的9條消息推送到該隊列呢?

這是什麼狀況?

這就是這個參數第2個須要注意的點了.

二.咱們再讀一次官方對該參數的解釋:

Optional name of an exchange to which messages will be republished if they are rejected or expire.

我的以爲,官方這裏用 rejected 不是太準確,固然,也多是我理解得還不夠深刻,再加上英語太差.

爲何這麼說呢?

我最開始的理解是 : 被拒絕到期的消息會被推送到新的交換機,因此我在參數中傳入了  {"x-overflow","reject-publish" } 

我認爲隊列拒絕的這9條消息,就應該被交換機推送到綁定到它的隊列去!

實際上,官方用詞 "rejected" 應該被翻譯成 : "丟棄"或者"拋棄",而不是"拒絕",也就是說,這裏的 rejected 和 expire 是隊列裏面的消息的狀態.而不是隊列的動做.

當咱們註釋掉 {"x-overflow","reject-publish" } ,改用默認的  "drop-head" 行爲時,運行一切正常: 

 

關於到期( expire )的測試,這裏就不上圖了.

這個參數的標籤是 : DLX

 

 

Dead letter routing key

官方 : Optional replacement routing key to use when a message is dead-lettered. If this is not set, the message's original routing key will be used.(Sets the "x-dead-letter-routing-key" argument.) 

翻譯 : 略......

測試 : 

咱們先刪除上述 "Dead letter exchange" 參數解釋中建立的交換機及隊列.

而後從新建立交換機 "test_dead_letter_exchange",並將其類型設置爲"direct".(不少帖子叫它"直連模式"或者"路由模式",叫"topic"爲"主題模式",但我更新喜歡叫"direct"爲"精確匹配模式",叫"topic"爲"模糊匹配模式",感受好理解一些)

接着從新建立兩個隊列 "test_dead_letter_queue1" 和 "test_dead_letter_queue2",並將它們綁定到新的交換機.綁定的時候,將隊列"test_dead_letter_queue1"的路由鍵設置爲"test".

 

        private static void Main(string[] args)
        {
            Dictionary<string, object> arguments = new Dictionary<string, object>
            {
                {"x-max-length",1 },//設置隊列中的消息的最大條數爲 1 條,超過1條,則遵循隊列的"先進先出(丟)"原則.
                {"x-dead-letter-exchange","test_dead_letter_exchange" },
                {"x-dead-letter-routing-key","test" },
            };
            Producer.Send(arguments);
            Console.ReadKey();
        }
    }

 

運行結果及標籤:

 

 

Maximum priority

官方 : Maximum number of priority levels for the queue to support; if not set, the queue will not support message priorities.(Sets the "x-max-priority" argument.) 

翻譯 : 設置該隊列中的消息的優先級最大值.發佈消息的時候,能夠指定消息的優先級,優先級高的先被消費.若是沒有設置該參數,那麼該隊列不支持消息優先級功能.也就是說,就算髮布消息的時候傳入了優先級的值,也不會起什麼做用.

測試 : 

生產者

    public class Producer
    {
        private const string QueueName = "test_queue";
        public static void Send(IDictionary<string, object> arguments)
        {        
          using (IConnection connection = ConnectionHelper.GetConnection())
            using (IModel channel = connection.CreateModel())
            {
          var pros = channel.CreateBasicProperties();//構造消息的屬性 channel.QueueDeclare(QueueName,
false, false, false, arguments); for (byte i = 0; i < 10; i++) { string msg = "hello world " + i; pros.Priority = i; channel.BasicPublish("", QueueName, pros, Encoding.Default.GetBytes(msg)); Console.WriteLine($"{DateTime.Now} : send {msg}"); } } } }

上述代碼代表,"hello world 0-9" 的優先級爲 9 - 0.

須要注意的是,優先級屬性的類型爲 byte.

        private static void Main(string[] args)
        {
            Dictionary<string, object> arguments = new Dictionary<string, object>
            {
                {"x-max-priority",255 },
            };
            Producer.Send(arguments);
            Thread.Sleep(1000);
            Consumer.Receive();//建立一個消費者監聽該隊列
            Console.ReadKey();
        }

 

運行結果:

 

問題又來了,若是聲明隊列時,優先級最大值設置的是 5 ,那麼這10條消息的消費順序應該是怎樣的呢?咱們直接看結果:

 

這個有點意思......

該參數的標籤爲 : Pri

 

 

Lazy mode

官方 : Set the queue into lazy mode, keeping as many messages as possible on disk to reduce RAM usage; if not set, the queue will keep an in-memory cache to deliver messages as fast as possible.(Sets the "x-queue-mode" argument.) 

翻譯 : 設置隊列爲懶人模式.該模式下的隊列會先將交換機推送過來的消息(儘量多的)保存在磁盤上,以減小內存的佔用.當消費者開始消費的時候才加載到內存中;若是沒有設置懶人模式,隊列則會直接利用內存緩存,以最快的速度傳遞消息.

測試 : 使用簡單隊列公平分發測試.

生產者

    public class Producer
    {
        private const string QueueName = "test_queue";
        public static void Send(IDictionary<string, object> arguments)
        {
            using (IConnection connection = ConnectionHelper.GetConnection())
            using (IModel channel = connection.CreateModel())
            {
                channel.QueueDeclare(QueueName, false, false, false, arguments);
                channel.BasicQos(0, 1, false);
                for (byte i = 0; i < 10; i++)
                {
                    string msg = "hello world " + i;
                    channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg));
                    Console.WriteLine($"{DateTime.Now} : send {msg}");
                }
            }
        }
    }

 

消費者

    public class Consumer
    {
        private const string QueueName = "test_queue";
        public static void Receive()
        {
            IConnection connection = ConnectionHelper.GetConnection();
            IModel channel = connection.CreateModel();
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (s, e) =>
            {
                byte[] bytes = e.Body;
                string str = Encoding.Default.GetString(bytes);
                Console.WriteLine("consumer receive : " + str);
                channel.BasicAck(e.DeliveryTag, false);
                Thread.Sleep(5000);//5秒確認一條消息
            };
            channel.BasicConsume(QueueName, false, "", false, false, null, consumer);
        }
    }

 

控制檯

        private static void Main(string[] args)
        {
            Dictionary<string, object> arguments = new Dictionary<string, object>
            {
                {"x-queue-mode","lazy" },
            };
            Producer.Send(arguments);
            Thread.Sleep(10000);
            Consumer.Receive();//生產者消息發送完畢10秒後再建立消費者
            Console.ReadKey();
        }

 

運行結果:

當10條消息發送完畢後,咱們看管理後臺的隊列詳情:

 

能夠很是清楚看到,內存中是沒有任何消息的.總數和已準備都是130B.

130B怎麼來的? "hello world 0" 一共13個字節,一共10條,13*10=130.

10秒後,消費者建立,並開始消費.

 

參數的標籤爲 : Args (這個標籤有點特別...)

其實關於懶人模式和默認模式還有不少細節,各自的優缺點等.好比上面的例子,我故意讓生產者一次性發了10條消息到隊列,而且隊列每次只傳遞一條到消費者,可是消費者開始消費的時候,隊列就一次性把10條消息都讀取到了內存.

再好比,持久化的消息與非持久化的消息,結合懶人模式等等.

還有默認模式和懶人模式的效率,性能比較等.

估計須要單獨寫個帖子了.

 

 

終於寫到最後一個參數了

Master locator

官方 : Set the queue into master location mode, determining the rule by which the queue master is located when declared on a cluster of nodes.(Sets the "x-queue-master-locator" argument.) 

集羣相關設置,暫時放一邊去!

相關文章
相關標籤/搜索