利用本地消息表和MQ解決消息可靠性

本地消息表

ebay方案:https://queue.acm.org/detail.cfm?id=1394128數據庫

爲解決producer端消息發送和本地事務執行原子性問題,將須要分佈式處理的任務經過消息日誌方式存儲到一個地方,在本地事務完成以前,這個消息對於消費者是不可見的,本地事務執行成功以後,消費者纔會看到這個消息並進行消費。服務器

消息日誌能夠存儲到本地文本,數據庫或消息隊列,經過業務規則自動或者人工方式發起重試。異步

人工重試多用於支付場景,經過對帳系統對過後問題進行處理。 分佈式

  • 預發送消息到mq,消費者是看不到此消息的,所以不會進行消費
  • 執行本地事務,好比操做數據庫,依賴本地事務
  • 若是本地事務執行成功,則進行mq消息確認。若是失敗,則回滾mq消息

對於本地消息隊列來講,把大事務轉變成小事務,舉個例子:線程

  • 當扣錢時,須要在扣錢當服務器上增長一個本地消息表,須要把扣錢和減去的庫寫入存放到本地消息表,依靠數據庫本地事務保證一致性
  • 定時任務輪訓本地消息表,把沒有發送的消息發送給商品目標服務器,讓其去減庫存,達到商品服務的請求先寫入本地消息表,進行扣減,扣減成功後,更新消息表狀態
  • 商品服務經過定時任務掃描消息表,扣減庫存成功後修改本地消息表狀態
  • 若是定時任務掃描到沒有執行成功的消息,則進行重發,商品服務接受到消息後判斷消息是否重複,保證冪等性

本地事務實現設計

能夠將消息放到一個地方存起來,好比在數據庫中創建一個表,將消息放入這個表,稱之爲本地事務,這個表中有個state字段表示消息狀態,在預發送消息階段,標記成unkonwn狀態。3d

以後根據本地事務執行結果,修改state,執行成功設置爲local_commit,執行失敗執行local_rollback。代理

同時能夠創建一個異步現場執行兜底,不斷從這個表中查詢狀態爲local_commit的消息,將其發送到mq中。日誌

  • 若是發送mq成功,整個事務能夠任務執行結束,修改狀態爲global_commit,接下來消費者進行消息消費。
  • 若是發送mq失敗,能夠進行重試,直到成功,若是先限制重試次數,能夠在表中增長retry_count字段每次重試就+1,當超太重試閾值後,就再也不發送,能夠指定一個消息超時時間,超過期間閾值後,就再也不發送。對於失敗的消息,將其標記爲message_error,還能夠增長一個cause字段,表示由於什麼緣由致使消息發送失敗。

若是本地消息狀態修改失敗,那麼一個消息可能一直處於unkonwn狀態,而異步現成只會發送那些local_commit的消息到mq中,這樣一些消息會一直被忽略,就產生了消息丟失。code

通常有三種解決方案:

一:擴大事務邊界

將預發送消息,執行本地事務,修改本地事務表狀態三個操做,合併到一個事務裏面。第一步預發送消息以前就開啓事務,在第三步執行結束以後提交或回滾事務,,這樣經過事務保證了本地消息表的消息記錄,和操做產生記錄老是成功或者失敗。

二:合併事務狀態

能夠合併事務狀態,直接和正常的數據庫操做合併到一個事務中,寫入到數據庫直接就是local_commit,以後異步現成發送邏輯不變。

三:對prepared狀態消息進行檢查

簡單的操做能夠直接執行一次DB事務就能夠了,若是複雜的一些場景,好比A業務發起方除了須要操做本地數據庫,還須要進行RPC調用查詢其餘業務B,以獲取一些mq消息須要的信息。這樣可能A須要先將消息保存下來,等到B能夠提供消息以後再發送。

這種狀況的策略是,A和B之間約定一個能夠異步處理的時間閾值,讓異步線程除了發送local_commit狀態的消息,還須要對prepared狀態消息進行檢查。依靠設置的時間閾值,在過濾消息時,prepared消息對時間和當前時間必須知足必定的時間閾值,避免和新事務消息的prepared消息狀態混淆。

依賴MQ事務

消息隊列的事務實現相似於本地消息表,只不過是將實現放到了MQ內部。

流程以下:

  1. 第一階段prepared消息,拿到消息地址
  2. 執行本地事務
  3. 經過第一階段拿到地址去訪問消息,並修改狀態,消息接收者使用這個消息

若是消息確認操做失敗,mq broker會定時掃描沒有更新狀態的消息,若是有消息沒有獲得確認,會向消息發送者發送消息,判斷是否提交了。 若是消息消費超時了,須要一直重試,消息接收端須要保證冪等,若是消息消費失敗,須要人工進行處理,由於機率較低,設計複雜的流程反而得不償失。

消息隊列通常有事務處理方案,能夠解決producer發送消息和本地事務執行的原子性操做。 MQ的方案通常也是將消息找個地方存起來,RocketMQ將消息存放到內部主題中。爲了支持事務,RocketMQ引入了Half Topic及Operation Topic兩個內部隊列來存儲事務消息推動狀態。

  • Half Topic對應隊列中存放着prepare消息,就是預發送消息,消息不直接發送到topic,所以消費者對其不可見,實現暫存。
  • Opreation Topic對應的隊列存放prepare message對應的commit/rollback消息,消息體中是prepare message對應的offset。

RocketMQ中事務消息發送流程以下:

事務生產者預發送消息

經過TransactionMQProducer發送事務消息,這個producer在一條普通的message上加一些數據,表示這個是一條預發送的事務消息。broker在發現這是一條事務消息的時候,將其放到half topic中。

執行本地事務

發送prepare消息以後,須要執行本地事務,須要實現RocketMQ提供的一個TransactionListener 接口方法完成。

  • executeLocalTransaction方法:用於執行本地事務,能夠操做數據庫或者幹一些別的事情
  • checkLocalTransaction方法:用於檢查事務狀態

這兩個方法返回一個表示本地事務消息的執行狀態LocalTransactionState,事務生產者會將其上報給broker,狀態以下:

public enum LocalTransactionState {
    COMMIT_MESSAGE,
    ROLLBACK_MESSAGE,
    UNKNOW,
}

本地事務狀態處理

生產者拿到狀態後上報broker,broker在處理時,會根據狀態進行處理。

若是是commit/rollback狀態: brokder會把收到的事務消息狀態記錄在內部的operation topci中,消息體中是prepare message對應在half topci中的offset。

  • 若是是rollback消息,broker將從half topic中刪除該prepare消息不進行下發
  • 若是是commit消息,broker會把這個消息取出來,發送到原始的目標topci中,此時consumer端能夠消費

上圖能夠觀察到,一些異常狀況下,可能上報事務消息狀態失敗,所以operate topci中沒有記錄,二者之間的差值通常就是unkown爲確認中間狀態的消息,須要進行特殊處理。

unknow狀態消息

若是是unknow狀態消息,說明存在不肯定的事務狀態,broker須要主動詢問客戶端producer。

出現unknow狀態通常因爲如下緣由形成:

  • 若是本地事務支持過程當中,執行端掛掉,或者超時時會出現異常狀態
  • 一些特殊場景,須要等待一段時間知足特定場景,才把消息交給消費者進行消費的,可能須要主動的返回unknow狀態,屬於有意爲之

因爲unknown中間狀態的消息,不會提交到operation topic中,所以half topci和operation topic這兩個內部主題中,服務端經過對比兩個主題的差值來找到未被提交的超時任務,進行回查。 因此業務方須要提供一個方法讓rocketmq來回調,TransactionListener 中的checkLocalTransaction 就是用於回查。rocketmq會把以前發送的消息看成參數入參,業務實現根據消息內容能夠反查業務信息,來肯定狀態。 broker主動輪訓客戶端producer事務狀態,依賴於broker和producer端的雙向通訊能力來完成的,broker會主動給客戶端producer發請求。

Sage事務

Saga事務是將長事務拆分紅多個本地短事務,有Saga協調器協調,若是正常結束則算完成,某個步驟失敗,則根據相反順序調用一次補償操做。

總結

RocketMQ事務消息能夠解決事務的一致性問題,事務發起方須要關注本地事務執行及實現回查接口進行事務狀態判斷。

RocketMQ事務消息處理的限制:

  • 事務消息沒有延遲和批量支持,不能使用延遲消息特性和批量發送消息特性
  • 爲避免屢次檢查單個消息致使half topic消息積累,默認將單個消息的檢查次數限制爲15次
  • 經過transactionTimeout 配置檢查事務消息的固定週期
  • 能夠屢次檢查和消費事務消息
  • 將事務消息交給目標topic可能會失敗,rocketmq自己高可用機制確保高可用性,爲保證事務消息不丟失或事務完整性能夠採用同步雙寫機制
  • 事務消息生產者id不能和其餘類型消息的生產者id共享,和其餘類型消息不一樣,事務消息容許後向查詢,mq server按照其生產者id查詢客戶端

訂閱數據庫binlog

爲避免對於業務的入侵,能夠採用binlog實現可靠性發送:

  • 先把本地事務執行完成,本地事務每一個數據庫更新操做都產生binlog event,event在本地事務成功後,纔會產生
  • 經過binlog訂閱組件,訂閱數據庫變動,訂閱到binlog event,說明執行了本地事務信息,能夠放心根據event解析相應信息,發送到mq便可

kafka的事務消息

kafka producer支持兩種模式:冪等生產者和事務生產者。

  • 冪等生產者:將kafka交付語意從at least once增強到exactly once,生產者重試將不會引入重複
  • 事務生產者:容許應用程序以原子方式同時發送消息到多個主題和分區

kafka相似於數據庫事務的原子性,能夠在kafka以前加個代理,由代理暫存事務消息,條件知足後,再發送到目標topic供消費者消費。

相關文章
相關標籤/搜索