在應用一中,基本的消息隊列使用已經完成了,在實際項目中,必定會出現各類各樣的需求和問題,rabbitmq內置的不少強大機制和功能會幫助咱們解決不少的問題,下面就一個一個的一塊兒學習一下。數組
消息響應機制服務器
應用一的列子,在消費者從指定隊列獲取消息的時候,把通知參數no_ack給設成true了,這樣就不須要給rabbitMq服務發送已經處理完畢的通知,rabbitmq把消息發出去後,就會直接刪除掉,不去管消費者是否處理成功,這樣在實際項目中存在很大的風險,出現代碼的健壯性不好的錯誤。因此必定要把no_ack參數設成false:負載均衡
channel.BasicConsume("newQueue", false, customer);
在接受邏輯所有處理成功後加上一句代碼,通知rabbitmq,接到通知後纔會刪除學習
var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue(); //將消息二進制轉回字符串 var msg = Encoding.UTF8.GetString(ea.Body); //通知隊列,已經處理完成 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine(msg);
消息持久化測試
響應保證了消息不會被錯誤刪除,假如rabbitmq掛了,全部消息所有會丟掉,rabbitmq一個普遍使用的機制就是能夠持久化,作持久化要兩步spa
1.隊列持久化code
//隊列是否持久化 bool durable = true; channel.QueueDeclare("firstQueue",durable,false,false,null);
2.消息持久化,經過設置IBasicProperties.SetPersistent來作對象
//消息持久化 var properties = channel.CreateBasicProperties(); properties.SetPersistent(true); properties.DeliveryMode = 2; //消息是持久的,存在並不會受服務器重啓影響
上面的持久化,大部分時候不會出現問題,可是假如在寫入隊列的時候rabbitmq掛了,仍是不會持久上,這種狀況,咱們就要用到咱們代碼的邏輯來強制進行持久化了。。。。blog
負載均衡分發消息rabbitmq
若是有兩個接收端消費者同時訂閱一個隊列,會出現不固定的分發流程,某個消費者可能會出現過多的消息流入形成壓力,而另外一個空閒的蛋疼。因此,若是能公平的接受消息,處理完一個,接受另外一個,同時保證壓力的均衡。代碼在消費者端設置:
channel.BasicQos(0, 1, false);
上面是幾個rabbitmq比較重要的機制,下面開始是rabbitmq的核心牛逼的東西路由
這裏涉及2個概念:
1.exchange,這是交換機,也叫路由器,在消息生產者發送消息的時候,實際上不是直接發送到queue隊列中,由於他不知道發送到哪一個隊列,他會先發送到路由器中exchange裏,exchange再經過路由匹配把消息發送到匹配的隊列當中。
2.routingKey這個是路由的匹配規則,當消息發送到exchange裏後,會根據routingkey來匹配到底發送到哪一個隊列,若是沒匹配到,則消息丟失
exchange的四種類型:
1.direct:按routingkey的名稱匹配
2.fanout:廣播,無需匹配routingkey消息會發送到全部隊列
3.topic:這個是貪婪匹配,也是最靈活的匹配方式,有兩種符號#,*.,......*號的意思是
#符號的意思是好比a_#,能夠匹配的隊列能夠是a_a,a_aa,a_aaaaaa,a_a_b.......多詞
*符號的意識是好比a_*,能夠匹配的隊列能夠是a_a,a_b,a_c.......單詞
這個是應用一中發送消息給隊列的代碼,
channel.BasicPublish("", "firstQueue", null, body);
經過查看這個方法的參數中可看到第一個參數是exchange路由,第二個是routingkey匹配規則,而發送的代碼第一個參數是"",第二個參數是firstQueue,開始覺得是隊列實際並非,緣由是若是用空字符串去申明一個exchange,那麼系統就會使用"amq.direct"這個exchange。咱們在建立一個queue的時候,默認的都會有一個和新建queue同名的routingKey綁定到這個默認的exchange上去,由於在第一個參數選擇了默認的exchange,而咱們申明的隊列叫firstQueue,因此默認的,它在新建一個也叫firstQueue的routingKey,並綁定在默認的exchange上,致使了咱們能夠在第二個參數routingKey中寫firstQueue,這樣它就會找到定義的同名的queue,並把消息放進去。
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
基本概念已經差很少了,我是很擅長排版解釋,往下是各類匹配規則的代碼和運行狀況,直接上代碼:
1.路由類型direct,匹配規則rroutingKey相同,一個生產者,兩個消費者,採用負載均衡方式分發:
生產者
//建立連接工廠,設置目標,用戶,密碼 var factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "feiyang", Password = "123456", AutomaticRecoveryEnabled = true, //自動重連 RequestedHeartbeat = UInt16.MaxValue//心跳超時時間 }; //開啓當前服務設置的用戶的連接 using (var connection = factory.CreateConnection()) { //開啓一個頻道 using (var channel = connection.CreateModel()) { //建立一個隊列 //隊列是否持久化 bool durable = true; //已經存在的隊列,不能再定義持久化 // channel.QueueDeclare("firstQueue",false,false,false,null); //建立一個新的,持久的交換區 channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null); //持久的隊列, 沒有排他性,與不自動刪除 channel.QueueDeclare("newQueue", durable, false, false, null); // 綁定隊列到交換區 channel.QueueBind("newQueue", "NewExchange", "newRoutingKey"); //消息持久化 var properties = channel.CreateBasicProperties(); properties.SetPersistent(true); properties.DeliveryMode = 2; //消息是持久的,存在並不會受服務器重啓影響 byte[] body = null; //消息是以二進制數組的形式傳輸的,因此若是消息是實體對象的話,須要序列化和而後轉化爲二進制數組。 for (int i = 0; i < 100; i++) { body = Encoding.UTF8.GetBytes("這是第-----"+i+"-----條消息"); //channel.BasicPublish("", "firstQueue", null, body); channel.BasicPublish("NewExchange", "newRoutingKey", properties, body); Console.Write("成功發送第-----"+i+"-----條消息!"); } Console.ReadKey(); } }
消費者a:
static void Main(string[] args) { var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.UserName = "feiyang"; factory.Password = "123456"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //建立一個新的,持久的交換區 channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null); //仍是鏈接到哪一個隊列 channel.QueueDeclare("newQueue",true,false,false,null); // 綁定隊列到交換區 channel.QueueBind("newQueue", "NewExchange", "newRoutingKey"); //定義消息接受者 var customer = new QueueingBasicConsumer(channel); //從指定隊列獲取消息, //中間這個參數實際必須打開,爲false,意思是是否不通知rabbitm已經處理完畢,咱們這裏要設成false,要通知 //channel.BasicConsume("firstQueue",true,customer); channel.BasicConsume("newQueue", false, customer); //因爲隊列分發不公平致使一個壓力很大,一個很小,在這設置下,公平q分發,也就是一個消費者處理完通知隊列後,纔會繼續分發一個 channel.BasicQos(0, 1, false); //開始不斷循環出隊列的消息 while (true) { var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue(); //將消息二進制轉回字符串 var msg = Encoding.UTF8.GetString(ea.Body); //通知隊列,已經處理完成 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine(msg); } //sw.Stop(); //Console.WriteLine("共用時" + sw.ElapsedTicks + "毫秒"); //Console.ReadKey(); } } }
消費者b:
var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.UserName = "feiyang"; factory.Password = "123456"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //建立一個新的,持久的交換區 //channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null); //仍是鏈接到哪一個隊列 channel.QueueDeclare("newQueue", true, false, false, null); // 綁定隊列到交換區 //channel.QueueBind("newQueue", "NewExchange", "newRoutingKey"); //定義消息接受者 var customer = new QueueingBasicConsumer(channel); //從指定隊列獲取消息, //中間這個參數實際必須打開,爲false,意思是是否不通知rabbitm已經處理完畢,咱們這裏要設成false,要通知 //channel.BasicConsume("firstQueue",true,customer); channel.BasicConsume("newQueue", false, customer); //因爲隊列分發不公平致使一個壓力很大,一個很小,在這設置下,公平q分發,也就是一個消費者處理完通知隊列後,纔會繼續分發一個 channel.BasicQos(0, 1, false); //開始不斷循環出隊列的消息 while (true) { var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue(); //將消息二進制轉回字符串 var msg = Encoding.UTF8.GetString(ea.Body); //通知隊列,已經處理完成 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine(msg); } //sw.Stop(); //Console.WriteLine("共用時" + sw.ElapsedTicks + "毫秒"); //Console.ReadKey(); } }
運行結果
2.1個生產者,2個消費者,路由類型direct,匹配規則routingKey相同,匹配不一樣的隊列,一次發送到2個隊列各個消費者取出各自的隊列消息。
生產者,建立一個交換區,建立一個隊列,
//建立一個隊列 //隊列是否持久化 bool durable = true; //已經存在的隊列,不能再定義持久化 // channel.QueueDeclare("firstQueue",false,false,false,null); //建立一個新的,持久的交換區 channel.ExchangeDeclare("queueExchange", ExchangeType.Direct, true, false, null); //持久的隊列, 沒有排他性,與不自動刪除 channel.QueueDeclare("queue_a", durable, false, false, null); // 綁定隊列到交換區 channel.QueueBind("queue_a", "queueExchange", "queueRoutingKey"); //消息持久化 var properties = channel.CreateBasicProperties(); properties.SetPersistent(true); properties.DeliveryMode = 2; //消息是持久的,存在並不會受服務器重啓影響 byte[] body = null; //消息是以二進制數組的形式傳輸的,因此若是消息是實體對象的話,須要序列化和而後轉化爲二進制數組。 for (int i = 0; i < 100; i++) { body = Encoding.UTF8.GetBytes("這是第-----"+i+"-----條消息"); //channel.BasicPublish("", "firstQueue", null, body); channel.BasicPublish("queueExchange", "queueRoutingKey", properties, body); Console.Write("成功發送第-----"+i+"-----條消息!"); }
消費者a,建立一個新隊列,綁定到和生產者同一個交換區,讀取剛剛建立的新隊列數據。
//建立一個新的,持久的交換區 //channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null); //仍是鏈接到哪一個隊列 channel.QueueDeclare("queue_a_b", true, false, false, null); // 綁定隊列到交換區 channel.QueueBind("queue_a_b", "queueExchange", "queueRoutingKey"); //定義消息接受者 var customer = new QueueingBasicConsumer(channel); //從指定隊列獲取消息, //中間這個參數實際必須打開,爲false,意思是是否不通知rabbitm已經處理完畢,咱們這裏要設成false,要通知 //channel.BasicConsume("firstQueue",true,customer); channel.BasicConsume("queue_a_b", false, customer); //因爲隊列分發不公平致使一個壓力很大,一個很小,在這設置下,公平q分發,也就是一個消費者處理完通知隊列後,纔會繼續分發一個 channel.BasicQos(0, 1, false); //開始不斷循環出隊列的消息 while (true) { var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue(); //將消息二進制轉回字符串 var msg = Encoding.UTF8.GetString(ea.Body); //通知隊列,已經處理完成 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine(msg); }
消費b,直接讀取生產者建立的queue_a隊列消息
//建立一個新的,持久的交換區 //channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null); //仍是鏈接到哪一個隊列 //channel.QueueDeclare("newQueue", true, false, false, null); // 綁定隊列到交換區 //channel.QueueBind("newQueue", "NewExchange", "newRoutingKey"); //定義消息接受者 var customer = new QueueingBasicConsumer(channel); //從指定隊列獲取消息, //中間這個參數實際必須打開,爲false,意思是是否不通知rabbitm已經處理完畢,咱們這裏要設成false,要通知 //channel.BasicConsume("firstQueue",true,customer); channel.BasicConsume("queue_a", false, customer); //因爲隊列分發不公平致使一個壓力很大,一個很小,在這設置下,公平q分發,也就是一個消費者處理完通知隊列後,纔會繼續分發一個 channel.BasicQos(0, 1, false); //開始不斷循環出隊列的消息 while (true) { var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue(); //將消息二進制轉回字符串 var msg = Encoding.UTF8.GetString(ea.Body); //通知隊列,已經處理完成 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine(msg); }
運行結果
能夠看見,經過路由匹配,一次發送消息,發送到匹配到的兩個隊列中,兩個消費者各自讀取各自的隊列。
3.篇幅有限,再來一個路由類型爲Topic的代碼例子。
生產者,因爲已經建立了一個queueexChange類型爲direct的交換區,不能更改類型,因此從新建立一個交換區
//建立一個隊列 //隊列是否持久化 bool durable = true; //已經存在的隊列,不能再定義持久化 // channel.QueueDeclare("firstQueue",false,false,false,null); //建立一個新的,持久的交換區 channel.ExchangeDeclare("queueTopicExchange", ExchangeType.Topic, true, false, null); //持久的隊列, 沒有排他性,與不自動刪除 channel.QueueDeclare("queue.a", durable, false, false, null); // 綁定隊列到交換區 channel.QueueBind("queue.a", "queueTopicExchange", "queue.#"); //消息持久化 var properties = channel.CreateBasicProperties(); properties.SetPersistent(true); properties.DeliveryMode = 2; //消息是持久的,存在並不會受服務器重啓影響 byte[] body = null; //消息是以二進制數組的形式傳輸的,因此若是消息是實體對象的話,須要序列化和而後轉化爲二進制數組。 for (int i = 0; i < 100; i++) { body = Encoding.UTF8.GetBytes("這是第-----"+i+"-----條消息"); //channel.BasicPublish("", "firstQueue", null, body); channel.BasicPublish("queueTopicExchange", "queue.#", properties, body); Console.Write("成功發送第-----"+i+"-----條消息!"); }
消費者a:
//建立一個新的,持久的交換區 //channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null); //仍是鏈接到哪一個隊列 channel.QueueDeclare("queue.a.b", true, false, false, null); // 綁定隊列到交換區 channel.QueueBind("queue.a.b", "queueTopicExchange", "queue.#"); //定義消息接受者 var customer = new QueueingBasicConsumer(channel); //從指定隊列獲取消息, //中間這個參數實際必須打開,爲false,意思是是否不通知rabbitm已經處理完畢,咱們這裏要設成false,要通知 //channel.BasicConsume("firstQueue",true,customer); channel.BasicConsume("queue.a.b", false, customer); //因爲隊列分發不公平致使一個壓力很大,一個很小,在這設置下,公平q分發,也就是一個消費者處理完通知隊列後,纔會繼續分發一個 channel.BasicQos(0, 1, false); //開始不斷循環出隊列的消息 while (true) { var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue(); //將消息二進制轉回字符串 var msg = Encoding.UTF8.GetString(ea.Body); //通知隊列,已經處理完成 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine(msg); }
消費者b:
//建立一個新的,持久的交換區 //channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null); //仍是鏈接到哪一個隊列 //channel.QueueDeclare("newQueue", true, false, false, null); // 綁定隊列到交換區 //channel.QueueBind("newQueue", "NewExchange", "newRoutingKey"); //定義消息接受者 var customer = new QueueingBasicConsumer(channel); //從指定隊列獲取消息, //中間這個參數實際必須打開,爲false,意思是是否不通知rabbitm已經處理完畢,咱們這裏要設成false,要通知 //channel.BasicConsume("firstQueue",true,customer); channel.BasicConsume("queue.a", false, customer); //因爲隊列分發不公平致使一個壓力很大,一個很小,在這設置下,公平q分發,也就是一個消費者處理完通知隊列後,纔會繼續分發一個 channel.BasicQos(0, 1, false); //開始不斷循環出隊列的消息 while (true) { var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue(); //將消息二進制轉回字符串 var msg = Encoding.UTF8.GetString(ea.Body); //通知隊列,已經處理完成 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine(msg); }
運行結果:
代碼例子就不一一寫出來了,還有不少種狀況,實際項目根據不一樣的需求靈活運用,有興趣的能夠本身搭配測試一下。