相關博文,推薦查看:html
前一篇基本使用篇的博文中,介紹了rabbitmq的三種使用姿式,能夠知道如何向RabbitMQ發送消息以及如何消費,但遺留下幾個疑問,本篇則主要但願弄清楚這幾點java
<!--more-->git
將前面的消息發送代碼撈出來,幹掉Exchange的聲明,以下github
public class DefaultProducer { public static void publishMsg(String queue, String message) throws IOException, TimeoutException { ConnectionFactory factory = RabbitUtil.getConnectionFactory(); //建立鏈接 Connection connection = factory.newConnection(); //建立消息通道 Channel channel = connection.createChannel(); channel.queueDeclare(queue, true, false, true, null); // 發佈消息 channel.basicPublish("", queue, null, message.getBytes()); channel.close(); connection.close(); } public static void main(String[] args) throws IOException, TimeoutException { for (int i = 0; i < 20; i++) { publishMsg("hello", "msg" + i); } } }
在發佈消息時,傳入的Exchange名爲「」,再到控制檯查看,發現數據被投遞到了(AMQP default)這個交換器,對應的截圖以下api
看一下上面的綁定描述內容,重點以下ide
上面的代碼爲了演示數據的流向,在發佈消息的同時也定義了一個同名的Queue,所以能夠在控制檯上看到同名的 "hello" queue,且內部有20條數據學習
當咱們去掉queue的聲明時,會發現另外一個問題,投入的數據好像並無存下來(由於沒有queue來接收這些數據,而以後再聲明queue時,以前的數據也不會分配過來)測試
首先是將控制檯中的hello這個queue刪掉,而後再次執行下面的代碼(相對於前面的就是註釋了queue的聲明)ui
public class DefaultProducer { public static void publishMsg(String queue, String message) throws IOException, TimeoutException { ConnectionFactory factory = RabbitUtil.getConnectionFactory(); //建立鏈接 Connection connection = factory.newConnection(); //建立消息通道 Channel channel = connection.createChannel(); // channel.queueDeclare(queue, true, false, true, null); // 發佈消息 channel.basicPublish("", queue, null, message.getBytes()); channel.close(); connection.close(); } public static void main(String[] args) throws IOException, TimeoutException { for (int i = 0; i < 20; i++) { publishMsg("hello", "msg" + i); } } }
而後從控制檯上看,能夠看到有數據寫入Exchange,可是沒有queue來接收這些數據code
而後開啓消費進程,而後再次執行上面的塞入數據,新後面從新塞入的數據能夠被消費;可是以前塞入的數據則沒有,消費消息的代碼以下:
public class MyDefaultConsumer { public void consumerMsg(String queue) throws IOException, TimeoutException { ConnectionFactory factory = RabbitUtil.getConnectionFactory(); //建立鏈接 Connection connection = factory.newConnection(); //建立消息通道 Channel channel = connection.createChannel(); channel.queueDeclare(queue, true, false, true, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); try { System.out.println(" [ " + queue + " ] Received '" + message); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 取消自動ack channel.basicConsume(queue, false, consumer); } public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { MyDefaultConsumer consumer = new MyDefaultConsumer(); consumer.consumerMsg("hello"); Thread.sleep(1000 * 60 * 10); } }
小結:
在定義Queue時,能夠指定這兩個參數,這兩個參數的區別是什麼呢?
持久化,保證RabbitMQ在退出或者crash等異常狀況下數據沒有丟失,須要將queue,exchange和Message都持久化。
如果將queue的持久化標識durable設置爲true,則表明是一個持久的隊列,那麼在服務重啓以後,也會存在,由於服務會把持久化的queue存放在硬盤上,當服務重啓的時候,會從新什麼以前被持久化的queue。隊列是能夠被持久化,可是裏面的消息是否爲持久化那還要看消息的持久化設置。也就是說,重啓以前那個queue裏面尚未發出去的消息的話,重啓以後那隊列裏面是否是還存在原來的消息,這個就要取決於發生着在發送消息時對消息的設置
自動刪除,若是該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列
這個比較容易演示了,當一個Queue被設置爲自動刪除時,當消費者斷掉以後,queue會被刪除,這個主要針對的是一些不是特別重要的數據,不但願出現消息積累的狀況
// 倒數第二個參數,true表示開啓自動刪除 // 正數第二個參數,true表示持久化 channel.queueDeclare(queue, true, false, true, null);
durable=true, autoDeleted=false
autoDeleted=true
的隊列,當沒有消費者以後,隊列會自動被刪除執行一個任務可能須要花費幾秒鐘,你可能會擔憂若是一個消費者在執行任務過程當中掛掉了。一旦RabbitMQ將消息分發給了消費者,就會從內存中刪除。在這種狀況下,若是正在執行任務的消費者宕機,會丟失正在處理的消息和分發給這個消費者但還沒有處理的消息。 可是,咱們不想丟失任何任務,若是有一個消費者掛掉了,那麼咱們應該將分發給它的任務交付給另外一個消費者去處理。
爲了確保消息不會丟失,RabbitMQ支持消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收而且處理完畢了。RabbitMQ就能夠刪除它了。
所以手動ACK的常見手段
// 接收消息以後,主動ack/nak Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); try { System.out.println(" [ " + queue + " ] Received '" + message); channel.basicAck(envelope.getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(envelope.getDeliveryTag(), false, true); } } }; // 取消自動ack channel.basicConsume(queue, false, consumer);
手動ack時,有個multiple
,其含義表示:
能夠理解爲每一個Channel維護一個unconfirm的消息序號集合,每publish一條數據,集合中元素加1,每回調一次handleAck方法,unconfirm集合刪掉相應的一條(multiple=false)或多條(multiple=true)記錄
一灰灰的我的博客,記錄全部學習和工做中的博文,歡迎你們前去逛逛
盡信書則不如,已上內容,純屬一家之言,因我的能力有限,不免有疏漏和錯誤之處,如發現bug或者有更好的建議,歡迎批評指正,不吝感激