代碼中,咱們一般這樣聲明一個隊列:node
//聲明隊列 channel.QueueDeclare ( queue: QueueName, //隊列名稱 durable: false, //隊列是否持久化.false:隊列在內存中,服務器掛掉後,隊列就沒了;true:服務器重啓後,隊列將會從新生成.注意:只是隊列持久化,不表明隊列中的消息持久化!!!! exclusive: false, //隊列是否專屬,專屬的範圍針對的是鏈接,也就是說,一個鏈接下面的多個信道是可見的.對於其餘鏈接是不可見的.鏈接斷開後,該隊列會被刪除.注意,不是信道斷開,是鏈接斷開.而且,就算設置成了持久化,也會刪除. autoDelete: true, //若是全部消費者都斷開鏈接了,是否自動刪除.若是尚未消費者從該隊列獲取過消息或者監聽該隊列,那麼該隊列不會刪除.只有在有消費者從該隊列獲取過消息後,該隊列纔有可能自動刪除(當全部消費者都斷開鏈接,無論消息是否獲取完) arguments: null //隊列的配置 );
對於第5個參數: arguments ,緩存
它的類型是一個鍵值對集合 : 服務器
它到底有哪些key呢?app
咱們能夠經過 RabbitMQ 的管理頁面看到:性能
一共10個:測試
官方 : How long a message published to a queue can live before it is discarded (milliseconds). (Sets the "x-message-ttl" argument.) this
翻譯 : 一個隊列中的消息,在被丟棄以前可以存活多少毫秒.( key 爲 "x-message-ttl").通俗講就是,隊列中的消息的生存週期,單位毫秒.編碼
測試 :spa
internal class Program { private static void Main(string[] args) { Dictionary<string, object> arguments = new Dictionary<string, object> { { "x-message-ttl", 10000 } }; Producer.Send(arguments); Console.ReadKey(); } } public class Producer { private const string QueueName = "test_queue"; public static void Send(IDictionary<string,object> arguments) { using (IConnection connection = ConnectionHelper.GetConnection()) using (IModel channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, false, false, false, arguments); string msg = "hello world "; channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg)); Console.WriteLine($"{DateTime.Now} : send {msg}"); } } }
咱們從管理頁面該隊列的消息概述中能夠看出,這條消息只存活了10秒鐘.翻譯
爲了加以對比,我還另外建立了一個沒有聲明任何參數的隊列:
能夠看出,測試的這條隊列的特徵(Features)一欄中,被打上了"TTL"標籤.
這裏有個很是重要的知識點須要注意!
隊列一旦聲明,參數將沒法更改,添加,刪除,也就是說,對上述"test_queue"隊列進行以下操做都會拋出異常:
Dictionary<string, object> arguments = new Dictionary<string, object> { { "x-message-ttl", 20000 } };
Dictionary<string, object> arguments = null;
Dictionary<string, object> arguments = new Dictionary<string, object> { { "x-message-ttl", 10000 } }; arguments.Add("x-expires", 12345);
統統不行!!!
要改變一個隊列的參數,只有兩種辦法:
官方 : How long a queue can be unused for before it is automatically deleted (milliseconds).(Sets the "x-expires" argument.)
翻譯 : 隊列多長時間(毫秒)沒有被使用(訪問)就會被刪除.換個說法就是,當隊列在指定的時間內沒有被使用(訪問)就會被刪除.
測試 :
private static void Main(string[] args) { Dictionary<string, object> arguments = new Dictionary<string, object> { {"x-message-ttl", 10000},//設置隊列中的消息存活期爲 10 秒 { "x-expires", 20000}//設置隊列的存活期 20 秒 }; Producer.Send(arguments); Console.ReadKey(); }
能夠看到,隊列特徵一欄中,多了一個"Exp"標籤
固然,10秒後,消息會被刪除,20秒後,隊列被刪除
可是,若是咱們同時建立一個消費者,監聽該隊列,以下:
public class Consumer { private const string QueueName = "test_queue"; public static void Receive() { IConnection connection = ConnectionHelper.GetConnection(); IModel channel = connection.CreateModel(); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (s, e) => { byte[] bytes = e.Body; string str = Encoding.Default.GetString(bytes); Console.WriteLine("consumer receive : " + str); }; channel.BasicConsume(QueueName, true, "", false, false, null, consumer); } }
private static void Main(string[] args) { Dictionary<string, object> arguments = new Dictionary<string, object> { {"x-message-ttl", 10000},//設置隊列中的消息存活期爲 10 秒 { "x-expires", 20000}//設置隊列的存活期 20 秒 }; Producer.Send(arguments); Consumer.Receive();//建立一個消費者監聽該隊列 Console.ReadKey(); }
那該隊列永遠不會被刪除.由於雖然它裏面沒有消息,但一直有消費者在使用(訪問)它,因此它不會被刪除.
官方 : How many (ready) messages a queue can contain before it starts to drop them from its head.(Sets the "x-max-length" argument.)
翻譯 : 隊列能夠容納的消息的最大條數,超過這個條數,隊列頭部的消息將會被丟棄.
測試 : 咱們設置隊列最多隻能容納 1 條消息,而後一次性發送10條,發送完畢後,建立一個消費者去消費,部分代碼以下:
for (int i = 0; i < 10; i++) { string msg = "hello world " + i; channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg)); Console.WriteLine($"{DateTime.Now} : send {msg}"); }
private static void Main(string[] args) { Dictionary<string, object> arguments = new Dictionary<string, object> { {"x-message-ttl", 10000},//設置隊列中的消息存活期爲 10 秒 { "x-expires", 20000},//設置隊列的存活期 20 秒 {"x-max-length",1 },//設置隊列中的消息的最大條數爲 1 條,超過1條,則遵循隊列的"先進先出(丟)"原則. }; Producer.Send(arguments); Thread.Sleep(1000); Consumer.Receive();//建立一個消費者監聽該隊列 Console.ReadKey(); }
運行結果:
能夠看到,消費者消費到的是"hello world 9", "hello world 0" - "hello world 8"都被隊列丟棄了.
標籤以下:
官方 : Total body size for ready messages a queue can contain before it starts to drop them from its head.(Sets the "x-max-length-bytes" argument.)
翻譯 : 隊列能夠容納的消息的最大字節數,超過這個字節數,隊列頭部的消息將會被丟棄.
測試 : 咱們首先設置隊列最多隻能容納12個字節.
Dictionary<string, object> arguments = new Dictionary<string, object> { {"x-message-ttl", 10000},//設置隊列中的消息存活期爲 10 秒 { "x-expires", 20000},//設置隊列的存活期 20 秒 {"x-max-length-bytes",12 },//設置隊列中的消息的最大字節數 };
接下來要分兩種狀況了:
string msg = "新年快樂啊"; channel.BasicPublish("", QueueName, null, Encoding.UTF8.GetBytes(msg)); Console.WriteLine($"{DateTime.Now} : send {msg}");
能夠看到,消費者並無收到消息.說明整條消息都被丟棄了,而不是想象中的只丟棄"新年快樂" ,剩個"啊".
channel.BasicPublish("", QueueName, null, Encoding.UTF8.GetBytes("新年快樂")); channel.BasicPublish("", QueueName, null, Encoding.UTF8.GetBytes("啊")); Console.WriteLine($"{DateTime.Now} : 消息發送完畢 ");
能夠看到, 消費者收到了"啊".
隊列標籤以下 :
官方 : Sets the queue overflow behaviour. This determines what happens to messages when the maximum length of a queue is reached. Valid values are drop-head
or reject-publish
.
翻譯 : 隊列中的消息溢出時,如何處理這些消息.要麼丟棄隊列頭部的消息,要麼拒絕接收後面生產者發送過來的全部消息.( 從上面兩個參數的測試中能夠看出,"drop-head" 應該是默認行爲) ,官方只給了 value,沒給 key . key 爲 "x-overflow".
測試 : 沿用 Max length 參數解釋中的示例:
private static void Main(string[] args) { Dictionary<string, object> arguments = new Dictionary<string, object> { {"x-max-length",1 },//設置隊列中的消息的最大條數爲 1 條,超過1條,則遵循隊列的"先進先出(丟)"原則. {"x-overflow","reject-publish" },//設置隊列中的消息溢出後,該隊列的行爲:"拒絕接收"(全部消息) }; Producer.Send(arguments); Thread.Sleep(1000); Consumer.Receive();//建立一個消費者監聽該隊列 Console.ReadKey(); }
運行結果:
能夠看到,此次消費者消費的是" hello world 0" ,再也不是 "hello world 9" 了,由於生產者發送的"hello world 1" - "hello world 9"被隊列拒絕了.
標籤以下:
官方 : Optional name of an exchange to which messages will be republished if they are rejected or expire.(Sets the "x-dead-letter-exchange" argument.)
翻譯 : 該參數值爲一個(死信)交換機的名稱,當隊列中的消息的生存期到了,或者因長度限制被丟棄時,消息會被推送到(綁定到)這臺交換機(的隊列中),而不是直接丟掉.
對於這個參數,有兩點須要特別注意:
因此,在測試前,咱們須要建立一個隊列,並綁定到該交換機.固然,交換機也須要提早建立好.
爲了方便,咱們在管理頁面建立交換機和隊列,並完成"綁定"動做.
1.先選擇"Exchanges"標籤
2.點擊"Add a new exchange"標籤.
這裏又要注意了,交換機類型咱們要選擇"fanout"模式.這種模式下,交換機會將生產者發送的消息分發到全部綁定到它的隊列.細心的朋友應該能看出來爲何.
由於這個參數只是傳入了交換機的名稱,沒有傳入"Routing Key".
建立一個新隊列 : "test_dead_letter_queue" ,圖就不上了,開篇就已經提到了.
有兩種方式:
1.在交換機詳情頁面輸入要綁定的隊列.
2.在隊列詳情頁面輸入要綁定的交換機.
上述工做都完成後,咱們開始正式測試該參數.
咱們設置隊列能夠容納的消息最多1條,多的拒絕接收.
private static void Main(string[] args) { Dictionary<string, object> arguments = new Dictionary<string, object> { {"x-max-length",1 },//設置隊列中的消息的最大條數爲 1 條,超過1條,則遵循隊列的"先進先出(丟)"原則. {"x-overflow","reject-publish" },//設置隊列中的消息溢出後,隊列的行爲:"拒絕接收"(任何消息) {"x-dead-letter-exchange","test_dead_letter_exchange" }, }; Producer.Send(arguments); Thread.Sleep(1000); Consumer.Receive(); Console.ReadKey(); }
生產者的代碼咱們沿用 Max length 參數解釋中的代碼.
運行後,咱們去看後臺.
咦!?"test_dead_letter_queue"隊列怎麼一條消息都沒有呢?
爲何交換機沒有把 "test_queue" 丟棄的9條消息推送到該隊列呢?
這是什麼狀況?
這就是這個參數第2個須要注意的點了.
Optional name of an exchange to which messages will be republished if they are rejected or expire.
我的以爲,官方這裏用 rejected 不是太準確,固然,也多是我理解得還不夠深刻,再加上英語太差.
爲何這麼說呢?
我最開始的理解是 : 被拒絕或到期的消息會被推送到新的交換機,因此我在參數中傳入了 {"x-overflow","reject-publish" }
我認爲隊列拒絕的這9條消息,就應該被交換機推送到綁定到它的隊列去!
實際上,官方用詞 "rejected" 應該被翻譯成 : "丟棄"或者"拋棄",而不是"拒絕",也就是說,這裏的 rejected 和 expire 是隊列裏面的消息的狀態.而不是隊列的動做.
當咱們註釋掉 {"x-overflow","reject-publish" } ,改用默認的 "drop-head" 行爲時,運行一切正常:
關於到期( expire )的測試,這裏就不上圖了.
這個參數的標籤是 : DLX
官方 : Optional replacement routing key to use when a message is dead-lettered. If this is not set, the message's original routing key will be used.(Sets the "x-dead-letter-routing-key" argument.)
翻譯 : 略......
測試 :
咱們先刪除上述 "Dead letter exchange" 參數解釋中建立的交換機及隊列.
而後從新建立交換機 "test_dead_letter_exchange",並將其類型設置爲"direct".(不少帖子叫它"直連模式"或者"路由模式",叫"topic"爲"主題模式",但我更新喜歡叫"direct"爲"精確匹配模式",叫"topic"爲"模糊匹配模式",感受好理解一些)
接着從新建立兩個隊列 "test_dead_letter_queue1" 和 "test_dead_letter_queue2",並將它們綁定到新的交換機.綁定的時候,將隊列"test_dead_letter_queue1"的路由鍵設置爲"test".
private static void Main(string[] args) { Dictionary<string, object> arguments = new Dictionary<string, object> { {"x-max-length",1 },//設置隊列中的消息的最大條數爲 1 條,超過1條,則遵循隊列的"先進先出(丟)"原則. {"x-dead-letter-exchange","test_dead_letter_exchange" }, {"x-dead-letter-routing-key","test" }, }; Producer.Send(arguments); Console.ReadKey(); } }
運行結果及標籤:
官方 : Maximum number of priority levels for the queue to support; if not set, the queue will not support message priorities.(Sets the "x-max-priority" argument.)
翻譯 : 設置該隊列中的消息的優先級最大值.發佈消息的時候,能夠指定消息的優先級,優先級高的先被消費.若是沒有設置該參數,那麼該隊列不支持消息優先級功能.也就是說,就算髮布消息的時候傳入了優先級的值,也不會起什麼做用.
測試 :
public class Producer { private const string QueueName = "test_queue"; public static void Send(IDictionary<string, object> arguments) { using (IConnection connection = ConnectionHelper.GetConnection()) using (IModel channel = connection.CreateModel()) {
var pros = channel.CreateBasicProperties();//構造消息的屬性 channel.QueueDeclare(QueueName, false, false, false, arguments); for (byte i = 0; i < 10; i++) { string msg = "hello world " + i; pros.Priority = i; channel.BasicPublish("", QueueName, pros, Encoding.Default.GetBytes(msg)); Console.WriteLine($"{DateTime.Now} : send {msg}"); } } } }
上述代碼代表,"hello world 0-9" 的優先級爲 9 - 0.
須要注意的是,優先級屬性的類型爲 byte.
private static void Main(string[] args) { Dictionary<string, object> arguments = new Dictionary<string, object> { {"x-max-priority",255 }, }; Producer.Send(arguments); Thread.Sleep(1000); Consumer.Receive();//建立一個消費者監聽該隊列 Console.ReadKey(); }
運行結果:
問題又來了,若是聲明隊列時,優先級最大值設置的是 5 ,那麼這10條消息的消費順序應該是怎樣的呢?咱們直接看結果:
這個有點意思......
該參數的標籤爲 : Pri
官方 : Set the queue into lazy mode, keeping as many messages as possible on disk to reduce RAM usage; if not set, the queue will keep an in-memory cache to deliver messages as fast as possible.(Sets the "x-queue-mode" argument.)
翻譯 : 設置隊列爲懶人模式.該模式下的隊列會先將交換機推送過來的消息(儘量多的)保存在磁盤上,以減小內存的佔用.當消費者開始消費的時候才加載到內存中;若是沒有設置懶人模式,隊列則會直接利用內存緩存,以最快的速度傳遞消息.
測試 : 使用簡單隊列公平分發測試.
public class Producer { private const string QueueName = "test_queue"; public static void Send(IDictionary<string, object> arguments) { using (IConnection connection = ConnectionHelper.GetConnection()) using (IModel channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, false, false, false, arguments); channel.BasicQos(0, 1, false); for (byte i = 0; i < 10; i++) { string msg = "hello world " + i; channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg)); Console.WriteLine($"{DateTime.Now} : send {msg}"); } } } }
public class Consumer { private const string QueueName = "test_queue"; public static void Receive() { IConnection connection = ConnectionHelper.GetConnection(); IModel channel = connection.CreateModel(); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (s, e) => { byte[] bytes = e.Body; string str = Encoding.Default.GetString(bytes); Console.WriteLine("consumer receive : " + str); channel.BasicAck(e.DeliveryTag, false); Thread.Sleep(5000);//5秒確認一條消息 }; channel.BasicConsume(QueueName, false, "", false, false, null, consumer); } }
private static void Main(string[] args) { Dictionary<string, object> arguments = new Dictionary<string, object> { {"x-queue-mode","lazy" }, }; Producer.Send(arguments); Thread.Sleep(10000); Consumer.Receive();//生產者消息發送完畢10秒後再建立消費者 Console.ReadKey(); }
運行結果:
當10條消息發送完畢後,咱們看管理後臺的隊列詳情:
能夠很是清楚看到,內存中是沒有任何消息的.總數和已準備都是130B.
130B怎麼來的? "hello world 0" 一共13個字節,一共10條,13*10=130.
10秒後,消費者建立,並開始消費.
參數的標籤爲 : Args (這個標籤有點特別...)
其實關於懶人模式和默認模式還有不少細節,各自的優缺點等.好比上面的例子,我故意讓生產者一次性發了10條消息到隊列,而且隊列每次只傳遞一條到消費者,可是消費者開始消費的時候,隊列就一次性把10條消息都讀取到了內存.
再好比,持久化的消息與非持久化的消息,結合懶人模式等等.
還有默認模式和懶人模式的效率,性能比較等.
估計須要單獨寫個帖子了.
終於寫到最後一個參數了
官方 : Set the queue into master location mode, determining the rule by which the queue master is located when declared on a cluster of nodes.(Sets the "x-queue-master-locator" argument.)
集羣相關設置,暫時放一邊去!