,若是要保證消息的可靠性,須要對消息進行持久化處理,然而消息持久化除了須要代碼的設置以外,還有一個重要步驟是相當重要的,那就是保證你的消息順利進入Broker(代理服務器),如圖所示:java
正常狀況下,若是消息通過交換器進入隊列就能夠完成消息的持久化,但若是消息在沒有到達broker以前出現意外,那就形成消息丟失,有沒有辦法能夠解決這個問題?服務器
RabbitMQ有兩種方式來解決這個問題:異步
事務的實現主要是對信道(Channel)的設置,主要的方法有三個:ide
channel.txSelect()聲明啓動事務模式;性能
channel.txComment()提交事務;測試
channel.txRollback()回滾事務;spa
從上面的能夠看出事務都是以tx開頭的,tx應該是transaction extend(事務擴展模塊)的縮寫,若是有準確的解釋歡迎在博客下留言。代理
咱們來看具體的代碼實現:code
// 建立鏈接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 建立信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(_queueName, true, false, false, null); String message = String.format("時間 => %s", new Date().getTime()); try { channel.txSelect(); // 聲明事務 // 發送消息 channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); channel.txCommit(); // 提交事務 } catch (Exception e) { channel.txRollback(); } finally { channel.close(); conn.close(); }
注意:用戶需把config.xx配置成本身Rabbit的信息。orm
從上面的代碼咱們能夠看出,在發送消息以前的代碼和以前介紹的都是同樣的,只是在發送消息以前,須要聲明channel爲事務模式,提交或者回滾事務便可。
瞭解了事務的實現以後,那麼事務到底是怎麼執行的,讓咱們來使用wireshark抓個包看看,如圖所示:
輸入ip.addr==rabbitip && amqp查看客戶端和rabbit之間的通信,能夠看到交互流程:
以上就完成了事務的交互流程,若是其中任意一個環節出現問題,就會拋出IoException移除,這樣用戶就能夠攔截異常進行事務回滾,或決定要不要重複消息。
那麼,既然已經有事務了,沒什麼還要使用發送方確認模式呢,緣由是由於事務的性能是很是差的。事務性能測試:
事務模式,結果以下:
非事務模式,結果以下:
從上面能夠看出,非事務模式的性能是事務模式的性能高149倍,個人電腦測試是這樣的結果,不一樣的電腦配置略有差別,但結論是同樣的,事務模式的性能要差不少,那有沒有既能保證消息的可靠性又能兼顧性能的解決方案呢?那就是接下來要講的Confirm發送方確認模式。
咱們知道,消費者可使用消息自動或手動發送來確認消費消息,那若是咱們在消費者模式中使用事務(固然若是使用了手動確認消息,徹底用不到事務的),會發生什麼呢?
消費者模式使用事務
假設消費者模式中使用了事務,而且在消息確認以後進行了事務回滾,那麼RabbitMQ會產生什麼樣的變化?
結果分爲兩種狀況:
Confirm發送方確認模式使用和事務相似,也是經過設置Channel進行發送方確認的。
Confirm的三種實現方式:
方式一:channel.waitForConfirms()普通發送方確認模式;
方式二:channel.waitForConfirmsOrDie()批量確認模式;
方式三:channel.addConfirmListener()異步監聽發送方確認模式;
// 建立鏈接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 建立信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(config.QueueName, false, false, false, null); // 開啓發送方確認模式 channel.confirmSelect(); String message = String.format("時間 => %s", new Date().getTime()); channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); if (channel.waitForConfirms()) { System.out.println("消息發送成功" ); }
看代碼能夠知道,咱們只須要在推送消息以前,channel.confirmSelect()聲明開啓發送方確認模式,再使用channel.waitForConfirms()等待消息被服務器確認便可。
// 建立鏈接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 建立信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(config.QueueName, false, false, false, null); // 開啓發送方確認模式 channel.confirmSelect(); for (int i = 0; i < 10; i++) { String message = String.format("時間 => %s", new Date().getTime()); channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); } channel.waitForConfirmsOrDie(); //直到全部信息都發布,只要有一個未確認就會IOException System.out.println("所有執行完成");
以上代碼能夠看出來channel.waitForConfirmsOrDie(),使用同步方式等全部的消息發送以後纔會執行後面代碼,只要有一個消息未被確認就會拋出IOException異常。
// 建立鏈接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 建立信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(config.QueueName, false, false, false, null); // 開啓發送方確認模式 channel.confirmSelect(); for (int i = 0; i < 10; i++) { String message = String.format("時間 => %s", new Date().getTime()); channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); } //異步監聽確認和未確認的消息 channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("未確認消息,標識:" + deliveryTag); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println(String.format("已確認消息,標識:%d,多個消息:%b", deliveryTag, multiple)); } });
異步模式的優勢,就是執行效率高,不須要等待消息執行完,只須要監聽消息便可,以上異步返回的信息以下:
能夠看出,代碼是異步執行的,消息確認有多是批量確認的,是否批量確認在於返回的multiple的參數,此參數爲bool值,若是true表示批量執行了deliveryTag這個值之前的全部消息,若是爲false的話表示單條確認。
Confirm性能測試
測試前提:與事務同樣,咱們發送1w條消息。
方式一:Confirm普通模式
方式二:Confirm批量模式
方式三:Confirm異步監聽方式
綜合整體測試狀況來看:Confirm批量肯定和Confirm異步模式性能相差不大,Confirm模式要比事務快10倍左右