在正常的服務器運行過程當中,時常會面臨服務器宕機重啓的狀況,那麼咱們的消息此時會如何呢?很不幸的事情就是,咱們的消息可能會消失,這確定不是咱們但願見到的結果。因此咱們但願AMQP服務器崩潰了也能夠將消息恢復,這稱之爲消息持久化。RabbitMQ天然存在這種策略能夠幫助咱們完成這件事情。html
當RabbitMQ服務器重啓後,原先的隊列和交換器會隨同裏面的消息一同消失。緣由在於每一個隊列和交換器都有durable屬性,該屬性默認是false,它決定了RabbitMQ是否須要在崩潰或者重啓以後從新建立隊列或者交換器。將它設置爲true就表明了持久性,在服務器重啓以後就會從新持久的建立隊列和交換器。安全
固然作到這點還不夠,咱們須要的是持久化的消息,因此在消息發佈前,經過將消息的「投遞模式」(delivery mode)屬性設置爲2將消息標記爲持久化。到目前爲止,消息還只是被表示爲持久化,還須要被髮布到持久化的交換器中併到達持久化的隊列中才行。若是不是這樣,包含持久化消息的隊列或者交換器揮着Rabbit崩潰重啓後不復存在,致使消息成爲一個孤兒。所以,總結起來須要作到如下三點:服務器
(1)將消息的投遞模式選項設置爲2(持久);異步
(2)將消息發送到持久化的交換器;性能
(3)消息到達持久化的隊列。spa
注意,若是原先有非持久的交換器或者隊列,須要刪除後纔可從新建立,不然就建立其餘名稱的交換器或者隊列,代碼以下:code
//聲明持久交換器 channel.ExchangeDeclare( "HelloExchange", //交換器名稱 ExchangeType.Direct,//交換器類型 true, //是否持久話 false, //是否自動刪除 null //關於交換器的詳細設置,鍵值對形式 ); //聲明持久隊列 channel.QueueDeclare( "HelloQueue",//隊列名稱 true, //是否持久化 false, //是否只對首次聲明的隊列可見 false, //是否自動刪除 null ////關於隊列和隊列內消息的詳細設置,鍵值對形式 ); //發佈持久消息 string msg_str = "這是生產者第一次發佈的消息"; IBasicProperties msg_pro = channel.CreateBasicProperties(); msg_pro.ContentType = "text/plain";//發佈的數據類型 msg_pro.DeliveryMode = 2;//標記持久化
目前爲止,咱們已經將消息、隊列和交換器設置爲持久化。可是事實上還存在着'最後一英里'的距離,就是在把消息寫入磁盤前,消息因爲服務器宕機而消失該如何?這時候就須要使用到事務,說到事務就會想到SQL中的事務,可是不能搞混了。AMQP中,在把信道設置爲事務模式後,經過信道發送消息後還有多個其餘的AMQP命令,這些命令是執行仍是忽略,取決於消息的發送是否成功,消息發送成功信道會在事務中完成其餘AMQP命令,就能夠提交事務了,發送失敗則其餘AMQP命令將不會執行,咱們也會知道發送失敗,而採起相應的措施。事務保證瞭解決這最後的問題。orm
代碼以下:htm
using (IConnection conn = conn_factory.CreateConnection()) { //2.建立信道 using (IModel channel = conn.CreateModel()) { try { channel.TxSelect();//聲明事務 //3.發佈消息 string msg_str = "這是生產者發佈的消息"; IBasicProperties msg_pro = channel.CreateBasicProperties(); msg_pro.ContentType = "text/plain";//發佈的數據類型 msg_pro.DeliveryMode = 2; channel.BasicPublish( "HelloExchange", //消息發送目標交換器名稱 "hola", //路由鍵 msg_pro, //消息的發佈屬性 Encoding.UTF8.GetBytes(msg_str) //消息 ); channel.TxCommit();//提交事務 } catch(Exception ex) { channel.TxRollback();//回滾事務 } } }
雖然經過事務和持久化的消息、隊列和交換器能夠確保消息不會丟失,可是對消息的吞吐量有着很是嚴重的影響,並且使用消息通訊就是爲了不同步,但是事務卻會致使生產者程序產生同步。因此,有一個更好的方法保證消息投遞:發送方確認模式。和事務相似,咱們須要將信道channel設置爲confirm模式,並且只能經過從新建立信道來關閉該設置。一旦信道進入confirm模式,全部的信道上發佈的消息都會被指派一個惟一的ID。當消息被投遞到隊列後,信道就會發送一個發送方確認模式給生產者程序,使得生產者知道消息安全到達隊列了。blog
發送發確認模式最大的好處是它們是異步的,沒有回滾的概念,更加輕量級,對性能的影響也幾乎忽略不計。
代碼以下:
channel.ConfirmSelect();//開啓發送確認模式 //3.發佈消息 IBasicProperties msg_pro = channel.CreateBasicProperties(); msg_pro.ContentType = "text/plain";//發佈的數據類型 msg_pro.DeliveryMode = 2; for(int i = 0; i < 5; i++) { string msg_str = string.Format("這是生產者發佈的消息{0}", i); channel.BasicPublish( "HelloExchange", //消息發送目標交換器名稱 "hola", //路由鍵 msg_pro, //消息的發佈屬性 Encoding.UTF8.GetBytes(msg_str) //消息 ); if (channel.WaitForConfirms()) Console.WriteLine(i); else Console.WriteLine("發送失敗"); }
能夠看到channel.WaitForConfirms()方法是同步的,這樣的話效率會低一點,咱們能夠發送完全部的消息,而後用channel.WaitForConfirmsOrDie()一次性提交,若是中途有一個消息提交失敗或者超時,就會報錯Exception,須要所有從新提交。
消息持久化的策略大體就是以上幾種,咱們能夠根據本身的實際需求來選擇相應的策略。若是有問題歡迎指出!
原文出處:https://www.cnblogs.com/xwc1996/p/10041116.html