前面咱們介紹了RabbitMQ的安裝、各大消息中間件的對比、AMQP核心概念、管控臺的使用、快速入門RabbitMQ。本章將介紹RabbitMQ的高級特性。分兩篇(上/下)進行介紹。html
前三步不必定能保障消息可以100%投遞成功。所以要加上第四步java
BAT/TMD 互聯網大廠的解決方案: - 消息落庫,對消息狀態進行打標 在發送消息的時候,須要將消息持久化到數據庫中,並給這個消息設置一個狀態(未發送、發送中、到達)。當消息狀態發生了變化,須要對消息作一個變動。針對沒有到達的消息作一個輪訓操做,從新發送。對輪訓次數也須要作一個限制3-5次。確保消息可以成功的發送.mysql
具體採用哪一種方案,還須要根據業務與消息的併發量而定。git
生產端-可靠性投遞github
圖解:面試
藍色部分表示:生產者負責發送消息發送至Broker端 Biz DB:訂單數據庫 MSG DB: 消息數據 面對小規模的應用能夠採用加事務的方式,保證事務的一致性。但在大廠中面對高併發,並無加事務,事務的性能拼接很是嚴重,而是作補償。算法
好比:以下發一條訂單消息。sql
step1:存儲訂單消息(建立訂單),業務數據入庫,消息也入庫。缺點:須要持久化兩次。(status:0) step2:在step1成功的前提下,發送消息 step3:Broker收到消息後,confirm給咱們的生產端。Confirm Listener異步監聽Broker回送的消息。 step4:抓取出指定的消息,更新(status=1),表示消息已經投遞成功。數據庫
step5:分佈式定時任務獲取消息狀態,若是等於0則抓取數據出來。 step6:從新發送消息 step7:重試限制設置3次。若是消息重試了3次仍是失敗,那麼(status=2),認爲這個消息就是失敗的。編程
查詢這些消息爲何失敗,可能須要人工去查詢。
假設step2執行成功,step3因爲網絡閃斷。那麼confirm將永遠收不到消息,那麼咱們須要設定一個規則: 例如:在消息入庫的時候,設置一個臨界值 timeout=5min,當超過5min以後,就將這條數據抓取出來。 或者寫一個定時任務每隔5分鐘就將status=0的消息抓取出來。可能存在小問題:消息發送出去,定時任務又正好剛執行,Confirm還未收到,定時任務就會執行,會致使消息執行兩次。 更精細化操做:消息超時容忍限制。confirm在2-3分鐘內未收到消息,則從新發送。
第一種方案對數據有兩次入庫,一次業務數據入庫,一次消息入庫。這樣對數據的入庫是一個瓶頸。 其實咱們只須要對業務進行入庫。
這種方式並不必定能保證100%成功,可是也能保證99.99%的消息成功。若是遇到特別極端的狀況,那麼就只能須要人工去補償,或者定時任務去作。 第二種方式主要是爲了減小對數據庫的操做。
看下第二種方式:
圖解:
Upstream service:生產端 DownStream service:消費端 Callback service:回調服務
step1:業務消息入庫成功後,第一次消息發送。 step2:一樣在消息入庫成功後,發送第二次消息,這兩條消息是同時發送的。第二條消息是延遲檢查,能夠設置2min、5min 延遲發送。 step3:消費端監聽指定隊列。 step4:消費端處理完消息後,內部生成新的消息send confirm。投遞到MQ Broker。 step5: Callback Service 回調服務監聽MQ Broker,若是收到Downstream service發送的消息,則能夠肯定消息發送成功,執行消息存儲到MSG DB。 step6:Check Detail檢查監聽step2延遲投遞的消息。此時兩個監聽的隊列不是同一個,5分鐘後,Callback service收到消息,檢查MSG DB。若是發現以前的消息已經投遞成功,則不須要作其餘事情。若是檢查發現失敗,則Callback 進行補償,主動發送RPC 通訊。通知上游生產端從新發送消息。
這樣作的目的:少作了一次DB存儲。關注點並非百分百的投遞成功,而是性能。
冪等(idempotent、idempotence)是一個數學與計算機學概念,常見於抽象代數中,即f(f(x)) = f(x)。簡單的來講就是一個操做屢次執行產生的結果與一次執行產生的結果一致。
利用加版本號Version的方式來保證冪等性。
推薦文章:面試必備的數據庫悲觀鎖與樂觀鎖
在海量訂單產生的業務高峯期,如何避免消息的重複消費問題?
在高併發的狀況下,會有大量的消息到達MQ,消費端須要監聽大量的消息。這樣的狀況下,不免會出現消息的重複投遞,網絡閃斷等等。若是不去作冪等,則會出現消息的重複消費。 -消費端實現冪等性,就意味着,咱們的消息永遠不會被消費屢次,即便咱們收到了多條同樣的消息,也只會執行一次。
看下互聯網大廠主流的冪等性操做: -惟一ID+指紋嗎機制,利用數據庫主鍵去重。 -利用Redis的原子性實現 -其餘的技術實現冪等性
最簡單使用Redis的自增。
理解Confirm 消息確認機制:
藍色:producer 生產者 紅色:MQ Broker 服務器
生產者把消息發送到Broker端,Broker收到消息以後回送給producer。Confirm Listener 監聽應答。
操做是異步操做,當生產者發送完消息以後,就不須要管了。Confirm Listener 監聽MQ Broker的應答。
第一步:在channel上開啓確認模式:channel.confirmSelect() 第二步;在chanel上 添加監聽:addConfirmListener,監聽成功和失敗的返回結果,根據具體的結果對消息進行從新發送、或記錄日誌等後續處理!
生產者:
/** * * @ClassName: Producer * @Description: 生產者 * @author Coder編程 * @date 2019年7月30日 上午21:27:02 * */ public class Producer { public static void main(String[] args) throws Exception { //1 建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); //2 經過Connection建立一個新的Channel Channel channel = connection.createChannel(); //3 指定咱們的消息投遞模式: 消息的確認模式 channel.confirmSelect(); String exchangeName = "test_confirm_exchange"; String routingKey = "confirm.save"; //4 發送一條消息 String msg = "Hello RabbitMQ Send confirm message!"; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); //5 添加一個確認監聽 用於發送消息到Broker端以後,回送消息的監聽 channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------no ack!-----------"); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------ack!-----------"); } }); } }
消費者:
/** * * @ClassName: Consumer * @Description: 消費者 * @author Coder編程 * @date 2019年7月30日 上午21:32:02 * */ public class Consumer { public static void main(String[] args) throws Exception { //1 獲取一個鏈接 Connection connection = ConnectionUtils.getConnection(); //2經過Connection建立一個新的Channel Channel channel = connection.createChannel(); String exchangeName = "test_confirm_exchange"; String routingKey = "confirm.#"; String queueName = "test_confirm_queue"; //3 聲明交換機和隊列 而後進行綁定設置, 最後制定路由Key channel.exchangeDeclare(exchangeName, "topic", true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //4 建立消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, queueingConsumer); while(true){ Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消費端: " + msg); } } }
工具類:
/** * * @ClassName: ConnectionUtils * @Description: 鏈接工具類 * @author Coder編程 * @date 2019年6月21日 上午22:28:22 * */ public class ConnectionUtils { public static Connection getConnection() throws IOException, TimeoutException { //定義鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務地址 factory.setHost("127.0.0.1"); //端口 factory.setPort(5672);//amqp協議 端口 相似與mysql的3306 //設置帳號信息,用戶名、密碼、vhost factory.setVirtualHost("/vhost_cp"); factory.setUsername("user_cp"); factory.setPassword("123456"); // 經過工程獲取鏈接 Connection connection = factory.newConnection(); return connection; } }
先啓動消費端=》再啓動生產端
能夠觀察到消費端先接收到消息,以後生產端再接收到回調信息。若是出現磁盤已滿、RabbitMQ出現異常、queue容量到達上限均可能接收到no ack
若是ack和no ack消息都未接收到,這就是以前所說的。RabbitMQ出現網絡閃斷,能夠採用上面所說的消息補償。
在基礎API中有一個關鍵的配置項:
Producer生產端將消息發送到MQ Broker端,可是出現NotFind Exchange,發送的消息的Exchange,在Broker端未能找到。或者找到了,可是路由key路由不到指定的隊列。所以是一個錯誤的消息。 這個時候,生產端應該知道發送的這條消息,並不會被處理。所以MQ Broker提供了這種Return機制,將這些不可達的消息發送給生產端,這時候生產端就須要設置Return Listener去接收這些不可達的消息。而後及時記錄日誌,去處理這些消息。
生產者:
/** * * @ClassName: Producer * @Description: 生產者 * @author Coder編程 * @date 2019年7月30日 上午22:03:22 * */ public class Producer { public static void main(String[] args) throws Exception { //1 建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); String exchange = "test_return_exchange"; String routingKey = "return.save"; String routingKeyError = "abc.save"; String msg = "Hello RabbitMQ Return Message"; channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("---------handle return----------"); //響應碼 System.err.println("replyCode: " + replyCode); //響應文本 System.err.println("replyText: " + replyText); System.err.println("exchange: " + exchange); System.err.println("routingKey: " + routingKey); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }); //第三個參數mandatory=true,意味着路由不到的話mq也不會刪除消息,false則會自動刪除 channel.basicPublish(exchange, routingKey, true, null, msg.getBytes()); //修改routingkey,測試是否可以收到消息 //channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes()); } }
消費者:
/** * * @ClassName: Consumer * @Description: 消費者 * @author Coder編程 * @date 2019年7月30日 上午22:33:34 * */ public class Consumer { public static void main(String[] args) throws Exception { //1 建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_return_exchange"; String routingKey = "return.#"; String queueName = "test_return_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, queueingConsumer); while(true){ Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消費者: " + msg); } } }
ConnectionUtils 工具代碼在上面。
啓動消費端,並查看管控臺。
放開消費端代碼:channel.basicPublish(exchange, routingKey, true, null, msg.getBytes()); 消費端打印結果:
能夠看到打印結果正常,此時再改代碼爲: channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
能夠看到生產端接收到了不可達的消息。
歡迎關注我的微信公衆號:Coder編程 獲取最新原創技術文章和免費學習資料,更有大量精品思惟導圖、面試資料、PMP備考資料等你來領,方便你隨時隨地學習技術知識! 新建了一個qq羣:315211365,歡迎你們進羣交流一塊兒學習。謝謝了!也能夠介紹給身邊有須要的朋友。
文章收錄至 Github: https://github.com/CoderMerlin/coder-programming Gitee: https://gitee.com/573059382/coder-programming 歡迎關注並star~
參考文章:
《RabbitMQ消息中間件精講》
推薦文章:
消息中間件——RabbitMQ(四)命令行與管控臺的基本操做!
消息中間件——RabbitMQ(五)快速入門生產者與消費者,SpringBoot整合RabbitMQ!
消息中間件——RabbitMQ(六)理解Exchange交換機核心概念!
原文出處:https://www.cnblogs.com/coder-programming/p/11412048.html