目錄java
什麼是生產端的可靠性投遞?redis
若是想保障消息百分百投遞成功,只作到前三步不必定可以保障。有些時候或者說有些極端狀況,好比生產端在投遞消息時可能就失敗了,或者說生產端投遞了消息,MQ也收到了,MQ在返回確認應答時,因爲網絡閃斷致使生產端沒有收到應答,此時這條消息就不知道投遞成功了仍是失敗了,因此針對這些狀況咱們須要作一些補償機制。算法
(confirm)
給生產端生產端有一個Confirm Listener
,去異步的監聽Broker
回送的響應,從而判斷消息是否投遞成功,若是成功,去數據庫查詢該消息,並將消息狀態更新爲1,表示消息投遞成功
假設第二步OK了,在第三步回送響應時,網絡忽然出現了閃斷,致使生產端的Listener就永遠收不到這條消息的confirm應答了,也就是說這條消息的狀態就一直爲0了數據庫
(Retry Send)
,也就是從第二步開始繼續往下走固然有些消息可能就是因爲一些實際的問題沒法路由到Broker,好比routingKey設置不對,對應的隊列被誤刪除了,那麼這種消息即便重試屢次也仍然沒法投遞成功,因此須要對重試次數作限制,好比限制3次,若是投遞次數大於三次,那麼就將消息狀態更新爲2,表示這個消息最終投遞失敗。網絡
針對這種狀況如何去作補償呢,能夠有一個補償系統去查詢這些最終失敗的消息,而後給出失敗的緣由,固然這些可能都須要人工去操做。併發
第一種可靠性投遞,在高併發的場景下是否適合?異步
對於第一種方案,咱們須要作兩次數據庫的持久化操做,在高併發場景下顯然數據庫存在着性能瓶頸。其實在咱們的核心鏈路中只須要對業務進行入庫就能夠了,消息就不必先入庫了,咱們能夠作消息的延遲投遞,作二次確認,回調檢查。分佈式
固然這種方案不必定能保障百分百投遞成功,可是基本上能夠保障大概99.9%的消息是OK的,有些特別極端的狀況只能是人工去作補償了,或者使用定時任務去作均可以。ide
Upstream Service
上游服務也就是生產端,Downstream service
下游服務也就是消費端,Callback service
就是回調服務。高併發
(Second Send Delay Check)
,即延遲消息投遞檢查,這裏須要設置一個延遲時間,好比5分鐘以後進行投遞。confirm
消息,也就是回送響應,可是這裏響應不是正常的ACK,而是從新生成一條消息,投遞到MQ中。Callback service
是一個單獨的服務,其實它扮演了第一種方案的存儲消息的DB角色,它經過MQ去監聽下游服務發送的confirm
消息,若是Callback service
收到confirm
消息,那麼就對消息作持久化存儲,即將消息持久化到DB中。Callback service
仍是去監聽延遲消息所對應的隊列,收到Check消息後去檢查DB中是否存在消息,若是存在,則不須要作任何處理,若是不存在或者消費失敗了,那麼Callback service
就須要主動發起RPC通訊給上游服務,告訴它延遲檢查的這條消息我沒有找到,你須要從新發送,生產端收到信息後就會從新查詢業務消息而後將消息發送出去。這麼作的目的是少作了一次DB的存儲,在高併發場景下,最關心的不是消息100%投遞成功,而是必定要保證性能,保證能抗得住這麼大的併發量。因此能節省數據庫的操做就儘可能節省,能夠異步的進行補償。
其實在主流程裏面是沒有這個Callback service的,它屬於一個補償的服務,整個核心鏈路就是生產端入庫業務消息,發送消息到MQ,消費端監聽隊列,消費消息。其餘的步驟都是一個補償機制。
第二種方案也是互聯網大廠更爲經典和主流的解決方案。可是若對性能要求不是那麼高,第一種方案要更簡單
簡單來講就是用戶對於同一操做發起的一次請求或者屢次請求的結果是一致的。
咱們能夠借鑑數據庫的樂觀鎖機制來舉個例子:
首先爲表添加一個版本字段version
在執行更新操做前呢,會先去數據庫查詢這個version
而後執行更新語句,以version做爲條件,例如:
UPDATE T_REPS SET COUNT = COUNT -1,VERSION = VERSION + 1 WHERE VERSION = 1
若是執行更新時有其餘人先更新了這張表的數據,那麼這個條件就不生效了,也就不會執行操做了,經過這種樂觀鎖的機制來保障冪等性。
重複消費問題:
當消費者消費完消息時,在給生產端返回ack時因爲網絡中斷,致使生產端未收到確認信息,該條消息會從新發送並被消費者消費,但實際上該消費者已成功消費了該條消息,這就是重複消費問題。
惟一ID:業務表惟一的主鍵,如商品ID
指紋碼:爲了區別每次正常操做的碼,每次操做時生成指紋碼;能夠用時間戳+業務編號或者標誌位(具體視業務場景而定)
整個思路就是首先咱們須要根據消息生成一個全局惟一的ID,而後還須要加上一個指紋碼。這個指紋碼它並不必定是系統去生成的,而是一些外部的規則或者內部的業務規則去拼接,它的目的就是爲了保障此次操做是絕對惟一的。
將ID + 指紋碼拼接好的值做爲數據庫主鍵,就能夠進行去重了。即在消費消息前呢,先去數據庫查詢這條消息的指紋碼標識是否存在,沒有就執行insert操做,若是有就表明已經被消費了,就不須要管了。
這裏只提用Redis的原子性去解決MQ冪等性重複消費的問題
注意:MQ的冪等性問題 根本在於的是生產端未正常接收ACK,多是網絡抖動、網絡中斷致使
個人方案:
MQ消費端在消費開始時 將 ID放入到Redis的BitMap中,MQ生產端每次生產數據時,從Redis的BitMap對應位置若不能取出ID,則生產消息發送,不然不進行消息發送。
可是有人可能會說,萬一消費端,生產端Redis命令執行失敗了怎麼辦,雖然又出現重複消費又出現Redis非正常執行命令的可能性極低,可是萬一呢?
OK,咱們能夠在Redis命令執行失敗時,將消息落庫,每日用定時器,對這種極特殊的消息進行處理。
的核心保障
確認機制流程圖
生產端發送消息到Broker,而後Broker接收到了消息後,進行回送響應,生產端有一個Confirm Listener
,去監聽應答,固然這個操做是異步進行的,生產端將消息發送出去就能夠不用管了,讓內部監聽器去監聽Broker給咱們的響應。
channel.confirmSelect()
addConfirmListener
,監聽成功和失敗的返回結果,根據具體的結果對消息進行從新發送、或記錄日誌等後續處理!public class Producer { public static void main(String[] args) throws Exception { //建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //獲取Connection Connection connection = connectionFactory.newConnection(); //經過connection建立一個新的Channel Channel channel = connection.createChannel(); //指定咱們的消息投遞模式 channel.confirmSelect(); String exchangeName = "test_confirm_exchange"; String routingkey = "confirm.save"; //發送一條信息 String msg = "Hello RabbitMQ Send confirm message!"; channel.basicPublish(exchangeName, routingkey, null, msg.getBytes()); //添加一個確認監聽 channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("-------no ack!---------"); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("--------ack!----------"); } }); } }
public class Consumer { public static void main(String[] args) throws Exception{ //建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //獲取Connection Connection connection = connectionFactory.newConnection(); //經過connection建立一個新的Channel Channel channel = connection.createChannel(); String exchangeName = "test_confirm_exchange"; String routingkey = "confirm.#"; String queueName = "test_confirm_queue"; //聲明交換機和隊列 而後進行綁定和 設置 最後制定路由key channel.exchangeDeclare(exchangeName, "topic",true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingkey); //建立消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true,queueingConsumer); while(true){ Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("消費端:" + msg); } } }
運行說明
先啓動消費端,訪問管控臺:http://ip:15672,檢查Exchange和Queue是否設置OK,而後啓動生產端,消息被消費端消費,生產端也成功監聽到了ACK響應。
Return Listener
用於處理一些不可路由的消息!Return Listener
!
addReturnListener
,生產端去監聽這些不可達的消息,作一些後續處理,好比說,記錄下消息日誌,或者及時去跟蹤記錄,有可能從新設置一下就行了Mandatory
:若是爲true,則監聽器會接收到路由不可達的消息,而後進行後續處理,若是爲false,那麼broker端自動刪除該消息!
public class ReturnProducer { public static void main(String[] args) throws Exception { //1 建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2 獲取Connection Connection connection = connectionFactory.newConnection(); //3 經過Connection建立一個新的Channel Channel channel = connection.createChannel(); String exchange = "test_return_exchange"; //String routingKey = "return.save"; String routingKeyError = "abc.save"; String msg = "Hello RabbitMQ Return Message"; //添加return監聽 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, BasicProperties properties, byte[] body) throws IOException { //replyCode:響應碼 replyText:響應信息 System.err.println("---------handle return----------"); System.err.println("replyCode: " + replyCode); System.err.println("replyText: " + replyText); System.err.println("exchange: " + exchange); System.err.println("routingKey: " + routingKey); //System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }); //5 發送一條消息,第三個參數mandatory:必須設置爲true channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes()); } }
public class ReturnConsumer { public static void main(String[] args) throws Exception { //1 建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2 獲取Connection Connection connection = connectionFactory.newConnection(); //3 經過Connection建立一個新的Channel Channel channel = connection.createChannel(); String exchangeName = "test_return_exchange"; String routingKey = "return.#"; String queueName = "test_return_queue"; //4 聲明交換機和隊列,而後進行綁定設置路由Key channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //5 建立消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, queueingConsumer); while(true){ Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消費者: " + msg); } } }
運行說明
先啓動消費端,訪問管控臺:http://ip:15672,檢查Exchange和Queue是否設置OK,而後啓動生產端。
因爲生產端設置的是一個錯誤的路由key,因此消費端沒有任何打印,而生產端打印了以下內容
若是咱們將 Mandatory
屬性設置爲false,對於不可達的消息會被Broker直接刪除,那麼生產端就不會進行任何打印了。若是咱們的路由key設置爲正確的,那麼消費端可以正確消費,生產端也不會進行任何打印。