轉自:http://m.blog.csdn.net/article/details?id=54340711html
上一篇博客咱們介紹了使用RabbitMQ可能會遇到的一個問題,即生產者不知道消息是否真正到達broker代理服務器,隨後經過AMQP協議層面爲咱們提供的事務機制解決了這個問題,可是採用事務機制實現會下降RabbitMQ的消息吞吐量,那麼有沒有更加高效的解決方式呢?RabbitMQ團隊爲咱們拿出了更好的方案,即採用發送方確認模式;編程
生產者確認模式實現原理:服務器
生產者將信道設置成confirm模式,一旦信道進入confirm模式,全部在該信道上面發佈的消息都將會被指派一個惟一的ID(從1開始),一旦消息被投遞到全部匹配的隊列以後,broker就會發送一個確認給生產者(包含消息的惟一ID),這就使得生產者知道消息已經正確到達目的隊列了,若是消息和隊列是可持久化的,那麼確認消息會在將消息寫入磁盤以後發出,broker回傳給生產者的確認消息中delivery-tag域包含了確認消息的序列號,此外broker也能夠設置basic.ack的multiple域,表示到這個序列號以前的全部消息都已經獲得了處理;異步
confirm模式最大的好處在於他是異步的,一旦發佈一條消息,生產者應用程序就能夠在等信道返回確認的同時繼續發送下一條消息,當消息最終獲得確認以後,生產者應用即可以經過回調方法來處理該確認消息,若是RabbitMQ由於自身內部錯誤致使消息丟失,就會發送一條nack消息,生產者應用程序一樣能夠在回調方法中處理該nack消息;ide
開啓confirm模式的方法:性能
生產者經過調用channel的confirmSelect方法將channel設置爲confirm模式,(注意一點,已經在transaction事務模式的channel是不能再設置成confirm模式的,即這兩種模式是不能共存的),若是沒有設置no-wait標誌的話,broker會返回confirm.select-ok表示贊成發送者將當前channel信道設置爲confirm模式(從目前RabbitMQ最新版本3.6來看,若是調用了channel.confirmSelect方法,默認狀況下是直接將no-wait設置成false的,也就是默認狀況下broker是必須回傳confirm.select-ok的,並且我也沒找到咱們本身可以設置no-wait標誌的方法);測試
生產者實現confiem模式有三種編程方式:this
(1):普通confirm模式,每發送一條消息,調用waitForConfirms()方法等待服務端confirm,這其實是一種串行的confirm,每publish一條消息以後就等待服務端confirm,若是服務端返回false或者超時時間內未返回,客戶端進行消息重傳;spa
(2):批量confirm模式,每發送一批消息以後,調用waitForConfirms()方法,等待服務端confirm,這種批量確認的模式極大的提升了confirm效率,可是若是一旦出現confirm返回false或者超時的狀況,客戶端須要將這一批次的消息所有重發,這會帶來明顯的重複消息,若是這種狀況頻繁發生的話,效率也會不升反降;.net
講完了基本的原理以後,代碼級別咱們該怎麼設置channel信道爲confirm模式呢?以及咱們該怎麼獲取broker返回給咱們的確認消息呢?
測試1:普通confirm模式
首先從最簡單的開始,僅僅將channel設置成confirm模式,而且生產者每發送一條消息就等待broker迴應確認消息,至於確認消息是什麼咱們不去作任何處理,爲了測試方便,此處生產者只發送了5條消息,實現代碼以下:
public class ProducerTest { public static void main(String[] args) { String exchangeName = "confirmExchange"; String queueName = "confirmQueue"; String routingKey = "confirmRoutingKey"; String bindingKey = "confirmRoutingKey"; int count = 5; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("172.16.151.74"); factory.setUsername("test"); factory.setPassword("test"); factory.setPort(5672); //建立生產者 Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey); producer.run(); } } class Sender { private ConnectionFactory factory; private int count; private String exchangeName; private String queueName; private String routingKey; private String bindingKey; public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) { this.factory = factory; this.count = count; this.exchangeName = exchangeName; this.queueName = queueName; this.routingKey = routingKey; this.bindingKey = bindingKey; } public void run() { Channel channel = null; try { Connection connection = factory.newConnection(); channel = connection.createChannel(); //建立exchange channel.exchangeDeclare(exchangeName, "direct", true, false, null); //建立隊列 channel.queueDeclare(queueName, true, false, false, null); //綁定exchange和queue channel.queueBind(queueName, exchangeName, bindingKey); channel.confirmSelect(); //發送持久化消息 for(int i = 0;i < count;i++) { //第一個參數是exchangeName(默認狀況下代理服務器端是存在一個""名字的exchange的, //所以若是不建立exchange的話咱們能夠直接將該參數設置成"",若是建立了exchange的話 //咱們須要將該參數設置成建立的exchange的名字),第二個參數是路由鍵 channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes()); if(channel.waitForConfirms()) { System.out.println("發送成功"); } } final long start = System.currentTimeMillis(); System.out.println("執行waitForConfirmsOrDie耗費時間: "+(System.currentTimeMillis()-start)+"ms"); } catch (Exception e) { e.printStackTrace(); } } }
在第50行調用Channel信道的confirmSelect方法將當前信道設置成了confirm模式,第57行經過for循環調用Channel的basicPublish方法發送了5條消息到消息隊列中,第58行調用waitForConfirms方法等待broker服務端返回ack或者nack消息,這種模式每發送一條消息就會等待broker代理服務器返回消息,這點咱們能夠從抓包的角度觀察結果:
能夠看到上面生產者經過Confirm.Select將當前Channel信道設置成confirm模式,broker代理服務器收到以後回傳Confirm.Select-Ok同一將當前Channel設置成confirm模式,此外看到返回5條Basic.Ack消息;
測試2:批量confirm模式
這種模式生產者不是每發送一條就等待broker確認,而是發送一批,實現代碼見下:
public class ProducerTest { public static void main(String[] args) { String exchangeName = "confirmExchange"; String queueName = "confirmQueue"; String routingKey = "confirmRoutingKey"; String bindingKey = "confirmRoutingKey"; int count = 100; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("172.16.151.74"); factory.setUsername("test"); factory.setPassword("test"); factory.setPort(5672); //建立生產者 Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey); producer.run(); } } class Sender { private ConnectionFactory factory; private int count; private String exchangeName; private String queueName; private String routingKey; private String bindingKey; public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) { this.factory = factory; this.count = count; this.exchangeName = exchangeName; this.queueName = queueName; this.routingKey = routingKey; this.bindingKey = bindingKey; } public void run() { Channel channel = null; try { Connection connection = factory.newConnection(); channel = connection.createChannel(); //建立exchange channel.exchangeDeclare(exchangeName, "direct", true, false, null); //建立隊列 channel.queueDeclare(queueName, true, false, false, null); //綁定exchange和queue channel.queueBind(queueName, exchangeName, bindingKey); channel.confirmSelect(); //發送持久化消息 for(int i = 0;i < count;i++) { //第一個參數是exchangeName(默認狀況下代理服務器端是存在一個""名字的exchange的, //所以若是不建立exchange的話咱們能夠直接將該參數設置成"",若是建立了exchange的話 //咱們須要將該參數設置成建立的exchange的名字),第二個參數是路由鍵 channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes()); } long start = System.currentTimeMillis(); channel.waitForConfirmsOrDie(); System.out.println("執行waitForConfirmsOrDie耗費時間: "+(System.currentTimeMillis()-start)+"ms"); } catch (Exception e) { e.printStackTrace(); } } }
第50行調用channel.confirmSelect將當前channel信道設置成confirm模式,接着在第57行經過for循環發送了100條消息,第60行調用了channel的waitForConfirmsOrDie,從waitForConfirmsOrDie方法的註釋上能夠看出,該方法會等到最後一條消息獲得確認或者獲得nack纔會結束,也就是說在waitForConfirmsOrDie處會形成當前程序的阻塞,以測試1程序發送100條消息爲例,阻塞時間是135ms,咱們再來看看對測試1的抓包狀況:
從紅色箭頭的標號1出能夠看到:首先是24向74發送了Confirm.Select消息表示請求將當前信道設置爲confirm模式,接着74向24回送了Confirm.Select-Ok消息表示贊成將信道設置成confirm模式,從紅色標號2處NoWait字段的值爲false也印證了咱們若是直接調用Channel信道的confirmSelect()方法的話,實際上默認是開啓broker回傳Confirm.Select-Ok確認消息的;
接下來咱們看看broker回傳給客戶端的確認消息數據包是什麼樣子的呢?一樣經過抓包看看結果:
你會發現,在上面測試1中咱們經過for循環發送了100條消息,可是在抓包的時候咱們僅僅看到有兩個Basic.Ack確認消息回傳回來,緣由在於上面截圖的標號3處,你會發現Multiple域的值是True的,以前咱們已經講過broker能夠設置Multiple域表示broker已經收到當前確認消息的Delivery-Tag域以前標號的消息,以上面截圖爲例的話表示broker告訴發送者編號4以前的消息已經所有收到了,從這點咱們看出broker端默認狀況下是進行批量回復的,並非針對每條消息都發送一條ack消息;
測試2:
測試1咱們僅僅是測試發送者可以收到broker的確認消息以及知道了broker對消息默認是採用批量回複方式的,那麼在程序中咱們該怎麼獲取到broker回傳回來的確認消息呢,假如咱們有時候須要在收到確認消息以後作一些提示性操做該怎麼辦呢?測試1中,咱們採用的是Channel信道的waitForConfirmsOrDie等待broker端回傳回ack確認消息的,但咱們無法拿到這個ack消息進行後期操做,要想拿到ack消息的話,咱們能夠給當前Channel信道綁定監聽器,具體來講就是調用Channel信道的addConfirmListener方法進行設置,Channel信道在收到broker的ack消息以後會回調設置在該信道監聽器上的handleAck方法,在收到nack消息以後會回調設置在該信道監聽器上的handleNack方法。
實現代碼:
public class ProducerTest { public static void main(String[] args) { String exchangeName = "confirmExchange"; String queueName = "confirmQueue"; String routingKey = "confirmRoutingKey"; String bindingKey = "confirmRoutingKey"; int count = 100; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("172.16.151.74"); factory.setUsername("test"); factory.setPassword("test"); factory.setPort(5672); //建立生產者 Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey); producer.run(); } } class Sender { private ConnectionFactory factory; private int count; private String exchangeName; private String queueName; private String routingKey; private String bindingKey; public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) { this.factory = factory; this.count = count; this.exchangeName = exchangeName; this.queueName = queueName; this.routingKey = routingKey; this.bindingKey = bindingKey; } public void run() { Channel channel = null; try { Connection connection = factory.newConnection(); channel = connection.createChannel(); //建立exchange channel.exchangeDeclare(exchangeName, "direct", true, false, null); //建立隊列 channel.queueDeclare(queueName, true, false, false, null); //綁定exchange和queue channel.queueBind(queueName, exchangeName, bindingKey); channel.confirmSelect(); //發送持久化消息 for(int i = 0;i < count;i++) { //第一個參數是exchangeName(默認狀況下代理服務器端是存在一個""名字的exchange的, //所以若是不建立exchange的話咱們能夠直接將該參數設置成"",若是建立了exchange的話 //咱們須要將該參數設置成建立的exchange的名字),第二個參數是路由鍵 channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes()); } long start = System.currentTimeMillis(); channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("nack: deliveryTag = "+deliveryTag+" multiple: "+multiple); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("ack: deliveryTag = "+deliveryTag+" multiple: "+multiple); } }); System.out.println("執行waitForConfirmsOrDie耗費時間: "+(System.currentTimeMillis()-start)+"ms"); } catch (Exception e) { e.printStackTrace(); } } }
第60行咱們調用了Channel信道的addConfirmListener設置了監聽器,而且在監聽器的handleAck和handleNack方法中打印了信息,運行程序查看輸出:
能夠看到,雖然咱們仍是發送了100條消息,一樣咱們並無收到100個ack消息 ,只收到兩個ack消息,而且這兩個ack消息的multiple域都爲true,這點和測試1是相同的,你屢次運行程序會發現每次發送回來的ack消息中的deliveryTag域的值並非同樣的,說明broker端批量回傳給發送者的ack消息並非以固定的批量大小回傳的;
也就是咱們經過信道Channel的waitForConfirmsOrDie方法或者爲信道設置監聽器均可以保證發送者收到broker回傳的ack或者nack消息,那麼這兩種方式有什麼區別呢?從測試一的第61行代碼以及測試2的第72行代碼處你就能找到答案啦,測試1中調用waitForConfirmsOrDie方法發送100條消息而且所有收到確認須要135ms,測試2中經過監聽器的方式僅僅須要1ms,說明調用waitForConfirmsOrDie會形成程序的阻塞,經過監聽器並不會形成程序的阻塞,下一篇博客我會試着從RabbitMQ的源碼層面來分析這兩種方式形成這種區別的緣由啦啦;
參考資料: