輕鬆搞定RabbitMQ(三)——消息應答與消息持久化

       這個官網的第二個例子中的消息應答和消息持久化部分。我把它摘出來做爲單獨的一起來分享。html


Message acknowledgment(消息應答)

       執行一個任務可能須要花費幾秒鐘,你可能會擔憂若是一個消費者在執行任務過程當中掛掉了。基於如今的代碼,一旦RabbitMQ將消息分發給了消費者,就會從內存中刪除。在這種狀況下,若是殺死正在執行任務的消費者,會丟失正在處理的消息,也會丟失已經分發給這個消費者但還沒有處理的消息。
java

       可是,咱們不想丟失任何任務,若是有一個消費者掛掉了,那麼咱們應該將分發給它的任務交付給另外一個消費者去處理。
服務器

       爲了確保消息不會丟失,RabbitMQ支持消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收而且處理完畢了。RabbitMQ能夠刪除它了。ide

       若是一個消費者掛掉卻沒有發送應答,RabbitMQ會理解爲這個消息沒有處理徹底,而後交給另外一個消費者去從新處理。這樣,你就能夠確認即便消費者偶爾掛掉也不會不丟失任何消息了。
spa

       沒有任何消息超時限制;只有當消費者掛掉時,RabbitMQ纔會從新投遞。即便處理一條消息會花費很長的時間。
code

       消息應答是默認打開的。咱們明確地把它們關掉了(autoAck=true)。如今將應答打開,一旦咱們完成任務,消費者會自動發送消息應答。
htm

boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

       修改一下Worker.javarabbitmq

channel.basicQos(1);//保證一次只分發一個
		// 建立隊列消費者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");

			    System.out.println(" [x] Received '" + message + "'");
			    System.out.println(" [x] Proccessing... at " +new Date().toLocaleString());
			    try {
			    	for (char ch: message.toCharArray()) {
				        if (ch == '.') {
				        	Thread.sleep(1000);
				        }
				    }
				} catch (InterruptedException e) {
				} finally {
			      System.out.println(" [x] Done! at " +new Date().toLocaleString());
			      channel.basicAck(envelope.getDeliveryTag(), false);
			    }
			  }
			};
       咱們仍是運行1個生產者,2個消費者,在消息處理過程當中,人爲讓一個消費者掛掉,而後會看到剩下的任務都會被另外的消費者執行。

       運行結果以下:隊列


       若是你關閉了自動消息應答,手動也未設置應答,這是一個很簡單錯誤,可是後果倒是極其嚴重的。消息在分發出去之後,得不到迴應,因此不會在內存中刪除,結果RabbitMQ會愈來愈佔用內存,最終的結果,你懂得。。。內存


Message durability(消息持久化)

       咱們已經瞭解瞭如何確保即便消費者死亡,任務也不會丟失。可是若是RabbitMQ服務器中止,咱們的任務仍將失去!
       當RabbitMQ退出或者崩潰,將會丟失隊列和消息。除非你不要隊列和消息。兩件事兒必須保證消息不被丟失:咱們必須把「隊列」和「消息」設爲持久化。 
    

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
       儘管這行代碼是正確的,但他不會在咱們當前的設置中起做用。由於咱們已經定義了一個名叫hello的未持久化的隊列。RabbitMQ不容許使用不一樣的參數設定從新定義已經存在的隊列,而且會向嘗試如此作的程序返回一個錯誤。一個快速的解決方案——就是聲明一個不一樣名字的隊列,好比task_queue。

       (固然,咱們也能夠登陸到RabbitMQ的服務管理頁面,RabbitMQ默認的端口是5672,管理頁面默認端口是15672,頁面地址爲:http://localhost:15672,使用是用戶名和密碼登陸。RabbitMQ的默認密碼和用戶名都是guest。點開「queue」那欄,能夠看到隊列列表,點擊「hello」杜列,會展開隊列的詳細信息。把頁面拉到最後,有一項「Delete / purge」,點開,點擊「Delete」按鈕,就能夠把隊列刪除掉了。 而後再運行代碼的時候,就會建立一個支持持久化的hello隊列。)

       上述的代碼須要在生產者和消費者都要做出一樣的修改。

       在這一點上咱們確信task_queue的隊列不會丟失,即便RabbitMQ服務重啓。如今咱們須要將消息標記爲持久性的——經過設置 MessageProperties(實現BasicProperties)爲PERSISTENT_TEXT_PLAIN。

       如今你能夠啓動RabbitMQ服務器,執行一次生產者NewTask的程序,而後關閉服務器,再從新啓動服務器,運行消費者Work作下實驗。能夠發現消費者依舊能夠讀出消息來。說明在RabbitMQ服務器關閉後,消息和隊列信息都已經作了持久化。再次啓動後,會從新加載到服務器中,消費者運行後,就能夠正常的從隊列中獲取消息了。

相關文章
相關標籤/搜索