RabbitMq解決分佈式事務

一、分佈式事務的經常使用解決方案

(1)、基於數據庫XA/JTA協議的方式;(須要數據庫廠商的支持;java組件有atomikos等)java

(2)、異步校對數據的方式;(支付寶、微信支付主動查詢支付轉態、對帳單的形式)spring

(3)、基於可靠消息(MQ)的解決方案;(異步場景;通用性強;拓展性較高)數據庫

(4)、TCC編程式解決方案。(嚴選、阿里、螞蟻金服本身封裝的DTX)編程

二、使用rabbitmq解決分佈式事務

(1)、總體設計思路微信

要求:①、可靠生產:保證消息必定要發送到rabbitmq服務中運維

           ②、可靠消費:保證消息取出來必定正確消費掉。異步

最終多方數據達到一致性。分佈式

 發送消息到rabbitmq以後的回調方法修改消息發送的狀態測試

設置發送成功的回調配置:spring.rabbitmq.publisher-confirm-type=CORRELATED微信支付

private static boolean IS_END = false; @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void setUp(){ System.out.println("回調方法"); //回調方法 rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{ if (!ack){ //TODO 執行失敗的邏輯 System.out.println("執行失敗:"+cause); }else{ //TODO 執行成功的邏輯 System.out.println("執行成功"); } }); IS_END = true; } @Test public void test0(){ //發送消息 rabbitTemplate.convertAndSend("OrderDispathExchange",null,"測試sa"); System.out.println("發送完成"); do{}while (!IS_END); }

備註:若是出現回執沒有收到或者狀態修改失敗等特殊狀況,能夠採用一下方案:定時檢查消息表,超時沒有發送成功的,再次發送。


冪等:根據ID或業務數據,判斷數據是否重複(能夠查看對應的數據表),重複則忽略。

消費者正常消費後,返回ACK狀態到rabbitMq的服務端,進行數據的清除

消費者處理消息失敗以後,須要MQ再次重發給消費者(通常會重試幾回,由消費者自身記錄重試次數,並進行次數控制)。

 

開啓手動ack,控制消息在MQ中的刪除、重發、丟棄

spring.rabbitmq.listener.simple.acknowledge-mode=manual
/** * * @param message 消息內容 * @param channel java和mq創建的鏈接通道 * @param tag 當前消息的標籤 * @throws Exception */ @RabbitListener(queues = "OrderDispath") public void messageConsumer(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG)long tag)throws Exception{ try { //取出信息,執行相關業務,同一個數據不一樣屢次處理,能夠根據業務狀況去重, // 保證冪等性(如:可使用readis記錄每條數據處理次數或者使用主鍵進行數據庫的插入)  System.err.println(message); //..... //ack -反饋給mq,數據已經正常處理 channel.basicAck(tag,false); }catch (Exception e){ //捕獲異常,通知mq重發 //可是必定要記錄該條消息處理的次數,防止重試屢次,致使死循環 //對於重試屢次,或者明確不在繼續重發的異常,通知mq丟棄,經過運維告急機制,通知人工處理 channel.basicNack(tag,false,false); } //生產環境,重要的數據出現異常,必須人工干預 //若是不回覆,就等這個consumer斷開鏈接以後,mq-server會繼續推送信息 }

三、方案優缺點

優勢:通用性強,拓展性強,方案成熟。

缺點:基於消息中間件,只適合異步場景,不支持事務回滾(要求必須成功);消息處理會有延遲,須要業務上可以容忍。

備註:儘可能避免分佈式事務;儘可能將非核心的事務作成異步。

相關文章
相關標籤/搜索