MQ消息最終一致性解決方案

隨着分佈式服務架構的流行與普及,原來在單體應用中執行的多個邏輯操做,如今被拆分紅了多個服務之間的遠程調用。雖然服務化爲咱們的系統帶來了水平伸縮的能力,然而隨之而來挑戰就是分佈式事務問題,多個服務之間使用本身單獨維護的數據庫,它們彼此之間不在同一個事務中,假如A執行成功了,B執行卻失敗了,而A的事務此時已經提交,沒法回滾,那麼最終就會致使兩邊數據不一致性的問題;儘管很早以前就有基於兩階段提交的XA分佈式事務,可是這類方案由於須要資源的全局鎖定,致使性能極差;所以後面就逐漸衍生出了消息最終一致性、TCC等柔性事務的分佈式事務方案,本文主要分析的是基於消息的最終一致性方案數據庫

普通消息的處理流程

image

  1. 消息生成者發送消息
  2. MQ收到消息,將消息進行持久化,在存儲中新增一條記錄
  3. 返回ACK給生產者
  4. MQ push 消息給對應的消費者,而後等待消費者返回ACK
  5. 若是消息消費者在指定時間內成功返回ack,那麼MQ認爲消息消費成功,在存儲中刪除消息,即執行第6步;若是MQ在指定時間內沒有收到ACK,則認爲消息消費失敗,會嘗試從新push消息,重複執行四、五、6步驟
  6. MQ刪除消息

普通消息處理存在的一致性問題

咱們以訂單建立爲例,訂單系統先建立訂單(本地事務),再發送消息給下游處理;若是訂單建立成功,然而消息沒有發送出去,那麼下游全部系統都沒法感知到這個事件,會出現髒數據;bash

public void processOrder() {
    // 訂單處理(業務操做) 
    orderService.process();
    // 發送訂單處理成功消息(發送消息) 
    sendBizMsg ();
}

複製代碼

若是先發送訂單消息,再建立訂單;那麼就有可能消息發送成功,可是在訂單建立的時候卻失敗了,此時下游系統卻認爲這個訂單已經建立,也會出現髒數據。網絡

public void processOrder() {
   // 發送訂單處理成功消息(發送消息) 
    sendBizMsg ();
    // 訂單處理(業務操做) 
    orderService.process();
}

複製代碼

一個錯誤的想法

此時可能有同窗會想,咱們能否將消息發送和業務處理放在同一個本地事務中來進行處理,若是業務消息發送失敗,那麼本地事務就回滾,這樣是否是就能解決消息發送的一致性問題呢?架構

@Transactionnal
public void processOrder() {
    try{
        // 訂單處理(業務操做) 
        orderService.process(); 
        // 發送訂單處理成功消息(發送消息) 
        sendBizMsg ();
    }catch(Exception e){
         事務回滾;   
    }
}

複製代碼

消息發送的異常狀況分析

可能的狀況 一致性
訂單處理成功,而後忽然宕機,事務未提交,消息沒有發送出去 一致
訂單處理成功,因爲網絡緣由或者MQ宕機,消息沒有發送出去,事務回滾 一致
訂單處理成功,消息發送成功,可是MQ因爲其餘緣由,致使消息存儲失敗,事務回滾 一致
訂單處理成功,消息存儲成功,可是MQ處理超時,從而ACK確認失敗,致使發送方本地事務回滾 不一致

從上面的狀況分析,咱們能夠看到,使用普通的處理方式,不管如何,都沒法保證業務處理與消息發送兩邊的一致性,其根本的緣由就在於:遠程調用,結果最終可能爲成功、失敗、超時;而對於超時的狀況,處理方最終的結果多是成功,也多是失敗,調用方是沒法知曉的。 筆者就曾經在項目中出現相似的狀況,調用方先在本地寫數據,而後發起RPC服務調用,可是處理方因爲DB數據量比較大,致使處理超時,調用方在出現超時異常後,直接回滾本地事務,從而致使調用方這邊沒數據,而處理方那邊數據卻已經寫入了,最終致使兩邊業務數據的不一致。爲了保證兩邊數據的一致性,咱們只能從其餘地方尋找新的突破口。分佈式

事務消息

因爲傳統的處理方式沒法解決消息生成者本地事務處理成功消息發送成功二者的一致性問題,所以事務消息就誕生了,它實現了消息生成者本地事務與消息發送的原子性,保證了消息生成者本地事務處理成功與消息發送成功的最終一致性問題。性能

事務消息處理的流程

image

  1. 事務消息與普通消息的區別就在於消息生產環節,生產者首先預發送一條消息到MQ(這也被稱爲發送half消息)spa

  2. MQ接受到消息後,先進行持久化,則存儲中會新增一條狀態爲待發送的消息日誌

  3. 而後返回ACK給消息生產者,此時MQ不會觸發消息推送事件code

  4. 生產者預發送消息成功後,執行本地事務cdn

  5. 執行本地事務,執行完成後,發送執行結果給MQ

  6. MQ會根據結果刪除或者更新消息狀態爲可發送

  7. 若是消息狀態更新爲可發送,則MQ會push消息給消費者,後面消息的消費和普通消息是同樣的

注意點:因爲MQ一般都會保證消息可以投遞成功,所以,若是業務沒有及時返回ACK結果,那麼就有可能形成MQ的重複消息投遞問題。所以,對於消息最終一致性的方案,消息的消費者必需要對消息的消費支持冪等,不能形成同一條消息的重複消費的狀況。

事務消息異常狀況分析

異常狀況 一致性 處理異常方法
消息未存儲,業務操做未執行 一致
存儲待發送消息成功,可是ACK失敗,致使業務未執行(多是MQ處理超時、網絡抖動等緣由) 不一致 MQ確認業務操做結果,處理消息(刪除消息)
存儲待發送消息成功,ACK成功,業務執行(可能成功也可能失敗),可是MQ沒有收到生產者業務處理的最終結果 不一致 MQ確認業務操做結果,處理消息(根據就業務處理結果,更新消息狀態,若是業務執行成功,則投遞消息,失敗則刪除消息)
業務處理成功,而且發送結果給MQ,可是MQ更新消息失敗,致使消息狀態依舊爲待發送 不一致 同上

支持事務消息的MQ

如今目前較爲主流的MQ,好比ActiveMQ、RabbitMQ、Kafka、RocketMQ等,只有RocketMQ支持事務消息。據筆者瞭解,早年阿里對MQ增長事務消息也是由於支付寶那邊由於業務上的需求而產生的。所以,若是咱們但願強依賴一個MQ的事務消息來作到消息最終一致性的話,在目前的狀況下,技術選型上只能去選擇RocketMQ來解決。上面咱們也分析了事務消息所存在的異常狀況,即MQ存儲了待發送的消息,可是MQ沒法感知到上游處理的最終結果。對於RocketMQ而言,它的解決方案很是的簡單,就是其內部實現會有一個定時任務,去輪訓狀態爲待發送的消息,而後給producer發送check請求,而producer必須實現一個check監聽器,監聽器的內容一般就是去檢查與之對應的本地事務是否成功(通常就是查詢DB),若是成功了,則MQ會將消息設置爲可發送,不然就刪除消息。

常見的問題

  1. 問:若是預發送消息失敗,是否是業務就不執行了?

    答:是的,對於基於消息最終一致性的方案,通常都會強依賴這步,若是這個步驟沒法獲得保證,那麼最終也 就不可能作到最終一致性了。

  2. 問:爲何要增長一個消息預發送機制,增長兩次發佈出去消息的重試機制,爲何不在業務成功以後,發送失敗的話使用一次重試機制?

    答:若是業務執行成功,再去發消息,此時若是還沒來得及發消息,業務系統就已經宕機了,系統重啓後,根本沒有記錄以前是否發送過消息,這樣就會致使業務執行成功,消息最終沒發出去的狀況。

  3. 若是consumer消費失敗,是否須要producer作回滾呢?

    答:這裏的事務消息,producer不會由於consumer消費失敗而作回滾,採用事務消息的應用,其所追求的是高可用最終一致性,消息消費失敗的話,MQ本身會負責重推消息,直到消費成功。所以,事務消息是針對生產端而言的,而消費端,消費端的一致性是經過MQ的重試機制來完成的。

  4. 若是consumer端由於業務異常而致使回滾,那麼豈不是兩邊最終沒法保證一致性?

    答:基於消息的最終一致性方案必須保證消費端在業務上的操做沒障礙,它只容許系統異常的失敗,不容許業務上的失敗,好比在你業務上拋出個NPE之類的問題,致使你消費端執行事務失敗,那就很難作到一致了。

因爲並不是全部的MQ都支持事務消息,假如咱們不選擇RocketMQ來做爲系統的MQ,是否可以作到消息的最終一致性呢?答案是能夠的。

基於本地消息的最終一致性

image

基於本地消息的最終一致性方案的最核心作法就是在執行業務操做的時候,記錄一條消息數據到DB,而且消息數據的記錄與業務數據的記錄必須在同一個事務內完成,這是該方案的前提核心保障。在記錄完成後消息數據後,後面咱們就能夠經過一個定時任務到DB中去輪訓狀態爲待發送的消息,而後將消息投遞給MQ。這個過程當中可能存在消息投遞失敗的可能,此時就依靠重試機制來保證,直到成功收到MQ的ACK確認以後,再將消息狀態更新或者消息清除;然後面消息的消費失敗的話,則依賴MQ自己的重試來完成,其最後作到兩邊系統數據的最終一致性。基於本地消息服務的方案雖然能夠作到消息的最終一致性,可是它有一個比較嚴重的弊端,每一個業務系統在使用該方案時,都須要在對應的業務庫建立一張消息表來存儲消息。針對這個問題,咱們能夠將該功能單獨提取出來,作成一個消息服務來統一處理,於是就衍生出了咱們下面將要討論的方案。

獨立消息服務的最終一致性

image

獨立消息服務最終一致性本地消息服務最終一致性最大的差別就在於將消息的存儲單獨地作成了一個RPC的服務,這個過程其實就是模擬了事務消息的消息預發送過程,若是預發送消息失敗,那麼生產者業務就不會去執行,所以對於生產者的業務而言,它是強依賴於該消息服務的。不過好在獨立消息服務支持水平擴容,所以只要部署多臺,作成HA的集羣模式,就可以保證其可靠性。在消息服務中,還有一個單獨地定時任務,它會按期輪訓長時間處於待發送狀態的消息,經過一個check補償機制來確認該消息對應的業務是否成功,若是對應的業務處理成功,則將消息修改成可發送,而後將其投遞給MQ;若是業務處理失敗,則將對應的消息更新或者刪除便可。所以在使用該方案時,消息生產者必須同時實現一個check服務,來供消息服務作消息的確認。對於消息的消費,該方案與上面的處理是同樣,都是經過MQ自身的重發機制來保證消息被消費。

總結:上游事務提交以後,在基於MQ的場景下就不考慮回滾了。失敗的多是因爲網絡、服務宕機所致使,文章中提到說業務上執行是無障礙的。若是下游服務長時間沒有恢復,那麼就應該設置告警,在這裏有幾種機制來解決一些牛皮癬類型的問題,假如上游消息始終發送失敗(這種可能性基本不存在除非代碼是假的)這種狀況咱們能夠設置報警機制好比發生異常時能夠打印日誌,發送短信,發送郵件,將異常訂單保存到數據庫,這些措施能夠同時用於下游一些異常訂單,同時也能夠在發生異常的時候新建一個異常Topic的消息提示,讓人工來介入數據訂正。

這篇文章若是對你有幫助 點個關注吧

相關文章
相關標籤/搜索