一、什麼是消息確認ACK。html
答:若是在處理消息的過程當中,消費者的服務器在處理消息的時候出現異常,那麼可能這條正在處理的消息就沒有完成消息消費,數據就會丟失。爲了確保數據不會丟失,RabbitMQ支持消息肯定-ACK。web
二、ACK的消息確認機制。spring
答:ACK機制是消費者從RabbitMQ收到消息並處理完成後,反饋給RabbitMQ,RabbitMQ收到反饋後纔將此消息從隊列中刪除。瀏覽器
若是一個消費者在處理消息出現了網絡不穩定、服務器異常等現象,那麼就不會有ACK反饋,RabbitMQ會認爲這個消息沒有正常消費,會將消息從新放入隊列中。
若是在集羣的狀況下,RabbitMQ會當即將這個消息推送給這個在線的其餘消費者。這種機制保證了在消費者服務端故障的時候,不丟失任何消息和任務。
消息永遠不會從RabbitMQ中刪除,只有當消費者正確發送ACK反饋,RabbitMQ確認收到後,消息纔會從RabbitMQ服務器的數據中刪除。
消息的ACK確認機制默認是打開的。服務器
三、ACK機制的開發注意事項。
網絡
答:若是忘記了ACK,那麼後果很嚴重。當Consumer退出時候,Message會一直從新分發。而後RabbitMQ會佔用愈來愈多的內容,因爲RabbitMQ會長時間運行,所以這個"內存泄漏"是致命的。 app
四、結合項目實例進行,理解一下ACK機制。以前寫過RabbitMQ的交換器Exchange之direct(發佈與訂閱 徹底匹配),這裏藉助這個進行消息持久化測試。生產者的代碼不發生改變。控制層的觸發生產者生產消息,這裏只生產一條消息。方便觀察現象。ide
1 package com.example.bie.controller; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.stereotype.Controller; 5 import org.springframework.web.bind.annotation.RequestMapping; 6 import org.springframework.web.bind.annotation.ResponseBody; 7 8 import com.example.bie.provider.RabbitMqLogErrorProduce; 9 import com.example.bie.provider.RabbitMqLogInfoProduce; 10 11 /** 12 * 13 * @author biehl 14 * 15 */ 16 @Controller 17 public class RabbitmqController { 18 19 @Autowired 20 private RabbitMqLogInfoProduce rabbitMqLogInfoProduce; 21 22 @Autowired 23 private RabbitMqLogErrorProduce rabbitMqLogErrorProduce; 24 25 @RequestMapping(value = "/logInfo") 26 @ResponseBody 27 public String rabbitmqSendLogInfoMessage() { 28 String msg = "生產者===>生者的LogInfo消息message: "; 29 for (int i = 0; i < 1; i++) { 30 rabbitMqLogInfoProduce.producer(msg + i); 31 } 32 return "生產===> LogInfo消息message ===> success!!!"; 33 } 34 35 @RequestMapping(value = "/logError") 36 @ResponseBody 37 public String rabbitmqSendLogErrorMessage() { 38 String msg = "生產者===>生者的LogError消息message: "; 39 for (int i = 0; i < 1; i++) { 40 rabbitMqLogErrorProduce.producer(msg + i); 41 } 42 return "生產===> LogError消息message ===> success!!!"; 43 } 44 45 }
消費者消費消息,打印輸出後面手動拋出運行時異常,觀察現象。測試
1 package com.example.bie.consumer; 2 3 import org.springframework.amqp.core.ExchangeTypes; 4 import org.springframework.amqp.rabbit.annotation.Exchange; 5 import org.springframework.amqp.rabbit.annotation.Queue; 6 import org.springframework.amqp.rabbit.annotation.QueueBinding; 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler; 8 import org.springframework.amqp.rabbit.annotation.RabbitListener; 9 import org.springframework.stereotype.Component; 10 11 /** 12 * 13 * @author biehl 14 * 15 * 消息接收者 16 * 17 * 一、@RabbitListener bindings:綁定隊列 18 * 19 * 二、@QueueBinding 20 * value:綁定隊列的名稱、exchange:配置交換器、key:路由鍵routing-key綁定隊列和交換器 21 * 22 * 三、@Queue value:配置隊列名稱、autoDelete:是不是一個可刪除的臨時隊列 23 * 24 * 四、@Exchange value:爲交換器起個名稱、type:指定具體的交換器類型 25 * 26 * 27 */ 28 @Component 29 @RabbitListener(bindings = @QueueBinding( 30 31 value = @Queue(value = "${rabbitmq.config.queue.error}", autoDelete = "true"), 32 33 exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.DIRECT), 34 35 key = "${rabbitmq.config.queue.error.routing.key}")) 36 public class LogErrorConsumer { 37 38 /** 39 * 接收消息的方法,採用消息隊列監聽機制. 40 * 41 * @RabbitHandler意思是將註解@RabbitListener配置到類上面 42 * 43 * @RabbitHandler是指定這個方法能夠進行消息的接收而且消費. 44 * 45 * @param msg 46 */ 47 @RabbitHandler 48 public void consumer(String msg) { 49 // 打印消息 50 System.out.println("ERROR消費者===>消費<===消息message: " + msg); 51 throw new RuntimeException(); 52 } 53 54 }
觀察現象,以下所示:spa
在RabbitMQ的瀏覽器界面,能夠看到一條消息未被進行ACK的消息確認機制,這條消息被鎖定Unacked,因此一直在控制檯進行報錯。
控制檯效果以下所示,一直進行消息的發送,由於消費方一直沒有返回ACK確認,RabbitMQ認爲消息未進行正常的消費,會將消息再次放入到隊列中,再次讓你消費,可是仍是沒有返回ACK確認,依次循環,造成了死循環。
如何解決問題呢,若是消息發送的時候,程序出現異常,後果很嚴重的,會致使內存泄漏的,因此在程序處理中能夠進行異常捕獲,保證消費者的程序正常執行,這裏不進行介紹了。第二種方式可使用RabbitMQ的ack確認機制。開啓重試,而後重試次數,默認爲3次。這裏設置爲5次。
1 # 給當前項目起名稱. 2 spring.application.name=rabbitmq-ack-direct-consumer 3 4 # 配置端口號 5 server.port=8080 6 7 # 配置rabbitmq的參數. 8 # rabbitmq服務器的ip地址. 9 spring.rabbitmq.host=192.168.110.133 10 # rabbitmq的端口號5672,區別於瀏覽器訪問界面的15672端口號. 11 spring.rabbitmq.port=5672 12 # rabbitmq的帳號. 13 spring.rabbitmq.username=guest 14 # rabbitmq的密碼. 15 spring.rabbitmq.password=guest 16 17 # 設置交換器的名稱,方便修改. 18 # 路由鍵是將交換器和隊列進行綁定的,隊列經過路由鍵綁定到交換器. 19 rabbitmq.config.exchange=log.exchange.direct 20 21 # info級別的隊列名稱. 22 rabbitmq.config.queue.info=log.info.queue 23 # info的路由鍵. 24 rabbitmq.config.queue.info.routing.key=log.info.routing.key 25 26 # error級別的隊列名稱. 27 rabbitmq.config.queue.error=log.error.queue 28 # error的路由鍵. 29 rabbitmq.config.queue.error.routing.key=log.error.routing.key 30 31 # 開啓重試 32 spring.rabbitmq.listener.simple.retry.enabled=true 33 # 重試次數,默認爲3次 34 spring.rabbitmq.listener.simple.retry.max-attempts=5
效果以下所示:
能夠看到控制檯嘗試了5次之後就再也不進行重試了。
RabbitMQ的界面能夠看到,開始的效果和上面的一致,可是5次嘗試之後,就變成了0條。RabbitMQ將這條消息丟棄了。
做者:別先生
博客園:https://www.cnblogs.com/biehongli/
若是您想及時獲得我的撰寫文章以及著做的消息推送,能夠掃描上方二維碼,關注我的公衆號哦。