(微服務)分佈式事務-最大努力交付 && 消息最終一致性方案

本文對比 二階段事務、最大努力交付以及消息最終一致性,並給出部分解決方案,最終一致性方案參考阿里RockMQ事務消息:http://blog.csdn.net/chunlong...
image.pnghtml

image.png

項目git地址:https://github.com/vvsuperman...git

一 2階段事務

分佈式系統最終一致性有N種方案,好比2PC(2階段事務) ,以及三段提交等等,但開銷較大,實現起來複雜,好比2階段事務爲例,須要引入一個協調者(Coordinator)來統一掌控全部參與者(Participant)的操做結果github

以開會爲例:
甲乙丙丁四人要組織一個會議,須要肯定會議時間,不妨設甲是協調者,乙丙丁是參與者。
投票階段:
(1)甲發郵件給乙丙丁,週二十點開會是否有時間;
(2)甲回覆有時間;
(3)乙回覆有時間;
(4)丙遲遲不回覆,此時對於這個活動,甲乙丙均處於阻塞狀態,算法沒法繼續進行;
(5)丙回覆有時間(或者沒有時間);
提交階段:
(1)協調者甲將收集到的結果反饋給乙丙丁(何時反饋,以及反饋結果如何,在此例中取決與丙的時間與決定);
(2)乙收到;
(3)丙收到;
(4)丁收到;
不只要鎖住參與者的全部資源,並且要鎖住協調者資源,開銷大。一句話總結就是:2PC效率很低,分佈式事務很難作。redis

在對事實性要求沒有那麼高的狀況下,能夠用基於最大努力交付 && 消息隊列以及消息存儲來解決最終一致性。算法

二 消息最大努力交付

所謂最大努力交付,就是俺反正用最大努力作,能不能成功,不作徹底保證
會涉及到三個模塊數據庫

  1. 上游應用,發消息到 MQ 隊列。
  2. 下游應用(例如短信服務、郵件服務),接受請求,並返回通知結果。
  3. 最大努力通知服務,監聽消息隊列,將消息存儲到數據庫中,並按照通知規則調用下游應用的發送通知接口。

具體流程以下服務器

image.png

  1. 上游應用發送 MQ 消息到 MQ 組件內,消息內包含通知規則和通知地址
  2. 最大努力通知服務監聽到 MQ 內的消息,解析通知規則並放入延時隊列等待觸發通知
  3. 最大努力通知服務調用下游的通知地址,若是調用成功,則該消息標記爲通知成功,若是失敗則在知足通知規則(例如 5 分鐘發一次,共發送 10 次)的狀況下從新放入延時隊列等待下次觸發。

最大努力通知服務表示在不影響主業務的狀況下,儘量地確保數據的一致性。它須要開發人員根據業務來指定通知規則,在知足通知規則的前提下,儘量的確保數據的一致,以達到最大努力的目的。網絡

實現上也比較簡單,目前主流消息隊列都有ack機制,當沒收到ack的時候用規則作定時重發便可。
優勢:實現簡單
缺點:無補償機制,不保證可以送達
實現要點: 保證消息發送失敗以後可以和業務一塊兒回滾;消息接受方保證冥等性;定時重發機制,採用必定的重發策略,例如說指數增加,聽說阿里採用redis的zset來完成,參考https://zhuanlan.zhihu.com/p/...
消息進到zset後,DelayQ會經過timer觸發(好比秒級),fork相應的消費線程去處理zset裏ExecuteTime大於當前時間的消息。DelayQ拿到一條消息後,解析其中的callbackurl,並組裝參數,push業務消息給Consumer.
Consumer返回處理成功,那麼zrem Codis裏的消息。若是處理失敗,則計算其下次嘗試時間,並更新其ExecuteTime.併發

三 可靠消息最終一致性方案

此方案涉及 3 個模塊:分佈式

  1. 上游應用,執行業務併發送 MQ 消息。
  2. 可靠消息服務和 MQ 消息組件,協調上下游消息的傳遞,並確保上下游數據的一致性。
  3. 下游應用,監聽 MQ 的消息並執行自身業務。

image.png

第一階段:上游應用執行業務併發送 MQ 消息

上游應用將本地業務執行和消息發送綁定在同一個本地事務中,保證要麼本地操做成功併發送 MQ 消息,要麼兩步操做都失敗並回滾。

上游應用和可靠消息之間的業務交互圖以下:

image.png

  1. 上游應用發送待確認消息到可靠消息系統
  2. 可靠消息系統保存待確認消息並返回
  3. 上游應用執行本地業務
  4. 上游應用通知可靠消息系統確認業務已執行併發送消息。
  5. 可靠消息系統修改消息狀態爲發送狀態並將消息投遞到 MQ 中間件。

以上每一步均可能出現失敗狀況,分析一下這 5 步出現異常後上遊業務和消息發送是否一致:

image.png

上游應用執行完成,下游應用還沒有執行或執行失敗時,此事務即處於 BASE 理論的 Soft State 狀態。

第二階段:下游應用監聽 MQ 消息並執行業務

下游應用監聽 MQ 消息並執行業務,而且將消息的消費結果通知可靠消息服務。

可靠消息的狀態須要和下游應用的業務執行保持一致,可靠消息狀態不是已完成時,確保下游應用未執行,可靠消息狀態是已完成時,確保下游應用已執行。

下游應用和可靠消息服務之間的交互圖以下:

image.png

  1. 下游應用監聽 MQ 消息組件並獲取消息
  2. 下游應用根據 MQ 消息體信息處理本地業務
  3. 下游應用向 MQ 組件自動發送 ACK 確認消息被消費
  4. 下游應用通知可靠消息系統消息被成功消費,可靠消息將該消息狀態更改成已完成。

以上每一步均可能出現失敗狀況,分析一下這 4 步出現異常後下遊業務和消息狀態是否一致:

經過分析以上兩個階段可能失敗的狀況,爲了確保上下游數據的最終一致性,在可靠消息系統中,須要開發 消息狀態確認消息重發 兩個功能以實現 BASE 理論的 Eventually Consistent 特性。

異常處理一:消息狀態確認

可靠消息服務定時監聽消息的狀態,若是存在狀態爲待確認而且超時的消息,則表示上游應用和可靠消息交互中的步驟 4 或者 5 出現異常。

可靠消息則攜帶消息體內的信息向上遊應用發起請求查詢該業務是否已執行。上游應用提供一個可查詢接口供可靠消息追溯業務執行狀態,若是業務執行成功則更改消息狀態爲已發送,不然刪除此消息確保數據一致。具體流程以下:

image.png

  1. 可靠消息查詢超時的待確認狀態的消息
  2. 向上遊應用查詢業務執行的狀況
  3. 業務未執行,則刪除該消息,保證業務和可靠消息服務的一致性。業務已執行,則修改消息狀態爲已發送,併發送消息到 MQ 組件。

異常處理二:消息重發

消息已發送則表示上游應用已經執行,接下來則確保下游應用也能正常執行。

可靠消息服務發現可靠消息服務中存在消息狀態爲已發送而且超時的消息,則表示可靠消息服務和下游應用中存在異常的步驟,不管哪一個步驟出現異常,可靠消息服務都將此消息從新投遞到 MQ 組件中供下游應用監聽。

下游應用監聽到此消息後,在保證冪等性的狀況下從新執行業務並通知可靠消息服務此消息已經成功消費,最終確保上游應用、下游應用的數據最終一致性。具體流程以下:

image.png

  1. 可靠消息服務定時查詢狀態爲已發送並超時的消息
  2. 可靠消息將消息從新投遞到 MQ 組件中
  3. 下游應用監聽消息,在知足冪等性的條件下,從新執行業務。
  4. 下游應用通知可靠消息服務該消息已經成功消費。

經過消息狀態確認和消息重發兩個功能,能夠確保上游應用、可靠消息服務和下游應用數據的最終一致性。

四 肉身實戰Rabbitmq

咱們在rabbitmq上肉身實戰了一下可靠消息,rabbitmq的發送過程以下

  1. 發送消息到消息服務
  2. 消息隊列將消息發送給監聽
  3. 消息監聽接受並處理消息

咱們來看看可能發送異常的四種

1 直接沒法到達消息服務

網絡斷了,拋出異常,業務直接回滾便可。若是出現connection closed錯誤,直接增長 connection數便可

connectionFactory.setChannelCacheSize(100);

2 消息已經到達服務器,但返回的時候出現異常

rabbitmq提供了確認ack機制,能夠用來確認消息是否有返回。所以咱們能夠在發送前在db中(內存或關係型數據庫)先存一下消息,若是ack異常則進行重發

/**confirmcallback用來確認消息是否有送達消息隊列*/     
   rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        if (!ack) {
            //try to resend msg
        } else {
            //delete msg in db
        }
    });
     /**若消息找不到對應的Exchange會先觸發returncallback */
    rabbitTemplate.setReturnCallback((message, replyCode, replyText, tmpExchange, tmpRoutingKey) -> {
        try {
            Thread.sleep(Constants.ONE_SECOND);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        log.info("send message failed: " + replyCode + " " + replyText);
        rabbitTemplate.send(message);
    });

若是消息沒有到exchange,則confirm回調,ack=false
若是消息到達exchange,則confirm回調,ack=true
但若是是找不到exchange,則會先觸發returncallback

3 消息送達後,消息服務本身掛了

若是設置了消息持久化,那麼ack= true是在消息持久化完成後,就是存到硬盤上以後再發送的,確保消息已經存在硬盤上,萬一消息服務掛了,消息服務恢復是可以再重發消息

4 未送達消費者

消息服務收到消息後,消息會處於"UNACK"的狀態,直到客戶端確認消息

channel.basicQos(1); // accept only one unack-ed message at a time (see below)
final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
       //確認收到消息
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

5 確認消息丟失

消息返回時假設確認消息丟失了,那麼消息服務會重發消息。注意,若是你設置了autoAck= false,但又沒應答 channel.baskAck也沒有應答 channel.baskNack,那麼會致使很是嚴重的錯誤:消息隊列會被堵塞住,可參考http://blog.sina.com.cn/s/blo...,因此,不管如何都必須應答

6 消費者業務處理異常

消息監聽接受消息並處理,假設拋異常了,第一階段事物已經完成,若是要配置回滾則過於麻煩,即便作事務補償也可能事務補償失效的狀況,因此這裏能夠作一個重複執行,好比guava的retry,設置一個指數時間來循環執行,若是n次後依然失敗,發郵件、短信,用人肉來兜底。
參考:http://blog.csdn.net/reviveds...

相關文章
相關標籤/搜索