ebay方案:https://queue.acm.org/detail.cfm?id=1394128數據庫
爲解決producer端消息發送和本地事務執行原子性問題,將須要分佈式處理的任務經過消息日誌方式存儲到一個地方,在本地事務完成以前,這個消息對於消費者是不可見的,本地事務執行成功以後,消費者纔會看到這個消息並進行消費。服務器
消息日誌能夠存儲到本地文本,數據庫或消息隊列,經過業務規則自動或者人工方式發起重試。異步
人工重試多用於支付場景,經過對帳系統對過後問題進行處理。
分佈式
對於本地消息隊列來講,把大事務轉變成小事務,舉個例子:線程
本地事務實現設計
能夠將消息放到一個地方存起來,好比在數據庫中創建一個表,將消息放入這個表,稱之爲本地事務,這個表中有個state字段表示消息狀態,在預發送消息階段,標記成unkonwn狀態。3d
以後根據本地事務執行結果,修改state,執行成功設置爲local_commit,執行失敗執行local_rollback。代理
同時能夠創建一個異步現場執行兜底,不斷從這個表中查詢狀態爲local_commit的消息,將其發送到mq中。日誌
若是本地消息狀態修改失敗,那麼一個消息可能一直處於unkonwn狀態,而異步現成只會發送那些local_commit的消息到mq中,這樣一些消息會一直被忽略,就產生了消息丟失。code
通常有三種解決方案:
一:擴大事務邊界
將預發送消息,執行本地事務,修改本地事務表狀態三個操做,合併到一個事務裏面。第一步預發送消息以前就開啓事務,在第三步執行結束以後提交或回滾事務,,這樣經過事務保證了本地消息表的消息記錄,和操做產生記錄老是成功或者失敗。
二:合併事務狀態
能夠合併事務狀態,直接和正常的數據庫操做合併到一個事務中,寫入到數據庫直接就是local_commit,以後異步現成發送邏輯不變。
三:對prepared狀態消息進行檢查
簡單的操做能夠直接執行一次DB事務就能夠了,若是複雜的一些場景,好比A業務發起方除了須要操做本地數據庫,還須要進行RPC調用查詢其餘業務B,以獲取一些mq消息須要的信息。這樣可能A須要先將消息保存下來,等到B能夠提供消息以後再發送。
這種狀況的策略是,A和B之間約定一個能夠異步處理的時間閾值,讓異步線程除了發送local_commit狀態的消息,還須要對prepared狀態消息進行檢查。依靠設置的時間閾值,在過濾消息時,prepared消息對時間和當前時間必須知足必定的時間閾值,避免和新事務消息的prepared消息狀態混淆。
消息隊列的事務實現相似於本地消息表,只不過是將實現放到了MQ內部。
流程以下:
若是消息確認操做失敗,mq broker會定時掃描沒有更新狀態的消息,若是有消息沒有獲得確認,會向消息發送者發送消息,判斷是否提交了。 若是消息消費超時了,須要一直重試,消息接收端須要保證冪等,若是消息消費失敗,須要人工進行處理,由於機率較低,設計複雜的流程反而得不償失。
消息隊列通常有事務處理方案,能夠解決producer發送消息和本地事務執行的原子性操做。 MQ的方案通常也是將消息找個地方存起來,RocketMQ將消息存放到內部主題中。爲了支持事務,RocketMQ引入了Half Topic及Operation Topic兩個內部隊列來存儲事務消息推動狀態。
RocketMQ中事務消息發送流程以下:
事務生產者預發送消息
經過TransactionMQProducer發送事務消息,這個producer在一條普通的message上加一些數據,表示這個是一條預發送的事務消息。broker在發現這是一條事務消息的時候,將其放到half topic中。
執行本地事務
發送prepare消息以後,須要執行本地事務,須要實現RocketMQ提供的一個TransactionListener 接口方法完成。
這兩個方法返回一個表示本地事務消息的執行狀態LocalTransactionState,事務生產者會將其上報給broker,狀態以下:
public enum LocalTransactionState { COMMIT_MESSAGE, ROLLBACK_MESSAGE, UNKNOW, }
本地事務狀態處理
生產者拿到狀態後上報broker,broker在處理時,會根據狀態進行處理。
若是是commit/rollback狀態: brokder會把收到的事務消息狀態記錄在內部的operation topci中,消息體中是prepare message對應在half topci中的offset。
上圖能夠觀察到,一些異常狀況下,可能上報事務消息狀態失敗,所以operate topci中沒有記錄,二者之間的差值通常就是unkown爲確認中間狀態的消息,須要進行特殊處理。
unknow狀態消息
若是是unknow狀態消息,說明存在不肯定的事務狀態,broker須要主動詢問客戶端producer。
出現unknow狀態通常因爲如下緣由形成:
因爲unknown中間狀態的消息,不會提交到operation topic中,所以half topci和operation topic這兩個內部主題中,服務端經過對比兩個主題的差值來找到未被提交的超時任務,進行回查。 因此業務方須要提供一個方法讓rocketmq來回調,TransactionListener 中的checkLocalTransaction 就是用於回查。rocketmq會把以前發送的消息看成參數入參,業務實現根據消息內容能夠反查業務信息,來肯定狀態。 broker主動輪訓客戶端producer事務狀態,依賴於broker和producer端的雙向通訊能力來完成的,broker會主動給客戶端producer發請求。
Saga事務是將長事務拆分紅多個本地短事務,有Saga協調器協調,若是正常結束則算完成,某個步驟失敗,則根據相反順序調用一次補償操做。
RocketMQ事務消息能夠解決事務的一致性問題,事務發起方須要關注本地事務執行及實現回查接口進行事務狀態判斷。
RocketMQ事務消息處理的限制:
訂閱數據庫binlog
爲避免對於業務的入侵,能夠採用binlog實現可靠性發送:
kafka的事務消息
kafka producer支持兩種模式:冪等生產者和事務生產者。
kafka相似於數據庫事務的原子性,能夠在kafka以前加個代理,由代理暫存事務消息,條件知足後,再發送到目標topic供消費者消費。