上一篇最後提到了mandatory這個參數,對於設置mandatory參數我的感受仍是很重要的,尤爲在RabbitMQ鏡像隊列發生故障轉移時。html
模擬個測試環境以下:app
首先在集羣隊列中增長兩個鏡像隊列的策略:測試
對於ha-promote-on-shutdown這個參數,能夠參考文檔,其做用就是當集羣中master出現故障時強制進行故障轉移從而選出新的master節點,這裏的master出現故障表示的是人爲的故障好比經過命令行rabbitmqctl.bat start_app之類的關閉RabbitMQ實例或者說是關閉電腦之類的。由於這種強制切換master節點的狀況一般發生在斷電之類的非可控因素上,因此經過設置這個參數爲always模擬非可控因素。spa
固然設置這個參數會存在必定風險,文檔裏也說了,會發生消息不一樣步也就是會丟消息。命令行
而後建立四個隊列和兩個Echange,採用綁定Exchange的topic模式3d
而後先貼一下測試代碼在進行說明調試
C#代碼code
List<string> hosts = new List<string>(); hosts.Add("192.168.1.1"); hosts.Add("192.168.1.2"); int curHostIndex = 0; string exchange = "always.exchange"; string touteKey = "yu.1"; byte[] msg = Encoding.UTF8.GetBytes("hello"); ConnectAgain: ConnectionFactory factory = new ConnectionFactory(); factory.UserName = "admin"; factory.Password = "admin"; factory.VirtualHost = "/"; factory.HostName = hosts[curHostIndex]; IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); IBasicProperties props = channel.CreateBasicProperties(); props.ContentType = "text/plain"; props.DeliveryMode = 2; for (int i = 0; i < 5000000; i++) { try { channel.ConfirmSelect(); channel.BasicAcks += (sender, eventArgs) => { }; channel.BasicReturn += (sender, eventArgs) => Console.WriteLine("消息投遞失敗 " + eventArgs.ReplyText); channel.BasicPublish(exchange, touteKey, true, props, msg); bool success = channel.WaitForConfirms(new TimeSpan(0, 0, 0, 0, 1)); if (!success) Console.WriteLine("表示消息投遞失敗 "); } catch (Exception ex) { //發生連接異常時換個IP進行鏈接 channel.Close(); conn.Close(); if (curHostIndex == 0) curHostIndex = 1; else curHostIndex = 0; goto ConnectAgain; } }
Java代碼:orm
public static void publish() throws Exception { List<String> hosts = new ArrayList<String>(); hosts.add("192.168.1.1"); hosts.add("192.168.1.2"); int curHostIndex = 1; String exchange = "common.exchange"; String routeKey = "yu.1"; byte[] msg = "hello".getBytes("UTF-8"); ConnectAgain: while (true) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(hosts.get(curHostIndex)); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); // 建立一個新的鏈接 Connection connection = factory.newConnection(); // 建立一個頻道 Channel channel = connection.createChannel(); channel.addConfirmListener(new ConfirmListener() { public void handleAck(long l, boolean b) throws IOException { System.out.println(l); } public void handleNack(long l, boolean b) throws IOException { System.out.println(l); } }); channel.addReturnListener(new ReturnListener() { public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException { System.out.println("響應狀態碼-ReplyCode:" + i); System.out.println("響應內容-ReplyText:" + s); System.out.println("Exchange:" + s1); System.out.println("RouteKey" + s2); System.out.println("投遞失敗的消息:" + new String(bytes, "UTF-8")); } }); for (int i = 0; i < 5000000; i++) { try { channel.confirmSelect(); channel.basicPublish(exchange, routeKey, true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg); boolean sucess = channel.waitForConfirms(10); System.out.println(sucess); } catch (Exception ex) { try { connection.abort(); } catch (Exception e) { e.printStackTrace(); } if (curHostIndex == 0) curHostIndex = 1; else curHostIndex = 0; continue ConnectAgain; } } } }
先測試下always.exchange也就是非人爲因素致使的故障轉移的狀況,先開啓客戶端讓客戶端跑着,而後經過命令行中止master節點(也就是Node爲WinServer12-1),中止時消息隊列的消息狀態爲htm
圖片消息的總數雖然不許確(頁面存在延遲) ,但截取的是master中止時刻的消息狀態,也夠用了,這時候發現slaver節點會切換爲master節點(也就是Node爲DESKTOP-078UA43),並繼續接受消息,客戶點也沒有發生異常通知(由於訂閱了BasicReturn事件而且開啓了madartory,若是消息投遞失敗,咱們能夠獲得通知,待會也會測試到)。
而後咱們讓集羣多跑會而後在消息有明顯變化的時候在開啓老的掛掉的當前爲slaver的節點,當前隊列消息的狀態以下,master爲DESKTOP-078UA43
在開啓slaver後咱們在當即中止當前的master節點(也就是Node爲WinServer12-1),這時候發現集羣的master又切回到了DESKTOP-078UA43同時隊列中的消息也跟着清除了。。也就是說在DESKTOP-078UA43以前掛掉到重啓啓動期間WinServer12-1接收到的消息所有丟掉了。。。由此咱們可知,RabbitMQ鏡像集羣發生非可控因素形成的master故障爲了保證可用性,會丟消息。
而對於客戶端而已,消息都是可靠投遞的,因此監聽事件並不會觸發。
固然也能夠經過設置ha-sync-mode參數進行調整,默認狀況下,新加入的節點不會同步已存在節點內的消息,設置爲automatic後會進行同步。不過若是沒同步完master掛掉的話消息仍是會丟掉的
而後測試下common.exchange會發生的狀況,測試這個的時候就是體現mandatory做用的時刻了!
仍是先在集羣正常的狀況下選取個時間點關掉主節點,當前master爲DESKTOP-078UA43
而後WinServer12-1變爲新的master,此刻發現正常接收消息,並且對客戶端而言,消息也是正常投遞的。而後打開被關閉的DESKTOP-078UA43節點,它會以slaver身份迴歸集羣,開啓前觀察下當前隊列狀態
而後開始操做!發現隊列狀態以下,NaN,難道說隊列中止接受數據了麼!!!(若是中止接受數據,客戶端同步調用發送時會發送失敗麼?)並且沒法將master進行切換了。
這時候若是在啓動WinServer12-1會發現,消息仍是WinServer12-1關閉時刻的消息,WinServer12-1關閉期間DESKTOP-078UA43儘管在接受消息,但實際消息並無被RabbitMQ可靠存儲(比較master都沒有了。。);
觀察下調試的代碼,發現消息仍是在正常向RabbitMQ投遞。
客戶端爲了保證向RabbitMQ投遞消息的可靠,及開啓了Conform模式,但此刻同步返回的RabbitMQ處理結果是消息處理完成。那豈不是NaN期間RabbitMQ把消息都吞了?而客戶端還傻傻的覺得發送成功了。。
這時候就體現開啓mandatory同時訂閱 channel.BasicReturn += (sender, eventArgs) => Console.WriteLine("消息投遞失敗 " + eventArgs.ReplyText);事件的做用了。。由於這時候RabbitMQ會反饋給你消息實際上並無投遞成功的信息。
這裏包含了持久化失敗的緣由,同時包含發送消息的詳細信息,方便客戶端對消息進行在處理。
其實說了這麼多,最後想說的是對於消息的一致性,最好仍是不要所有依賴於RabbitMQ,實現最終一致性並保證冪等性纔是相對可靠的方案。