導讀java
在以前的文章中咱們介紹瞭如何基於RocketMQ搭建生產級消息集羣,以及2PC、3PC和TCC等與分佈式事務相關的基本概念(沒有讀過的讀者詳見👇推薦閱讀)。在這篇文章中咱們將介紹RocketMQ的事務消息相關的內容,並經過一些實踐和你們一塊兒來探索下事務消息如何解決分佈式系統中的分佈式事務問題。git
事務消息原理github
事務消息特性能夠看做是兩階段協議的消息實現方式,用以確保在以消息中間件解耦的分佈式系統中本地事務的執行和消息的發送,能夠以原子的方式進行。spring
舉個例子,以某互聯網公司的用戶餘額充值爲例,由於有充返活動(充值100元贈送20元),優惠比較大,用戶Joe禁不住誘惑用支付寶向本身的餘額帳戶充值了100元,支付成功後Joe的餘額帳戶有了120元錢。數據庫
而該公司的關於用戶餘額充值的系統設計是這樣的:編程
在這個設計流程中,該公司經過自建支付系統完成用戶Joe的支付寶扣款操做,成功後須要更新支付流水的狀態,由於用戶的餘額帳戶系統與支付系統之間經過MQ解耦了,因此支付系統在完成支付流水狀態更新後須要經過發送MQ消息到消息中間件服務,而後用戶餘額系統做爲消費者經過消息消費的方式完成用戶餘額的增長操做。springboot
這裏有個問題:「支付系統如何確保這筆餘額充值消息必定會成功發送到MQ,而且用戶餘額系統必定能處理成功呢」?若是支付系統在完成支付訂單狀態更新後,MQ消息發送失敗或者用戶餘額系統消息處理失敗的話,都會致使Joe支付扣款成功,而本身的餘額帳戶卻沒到帳的狀況發生。服務器
爲了解決這個問題,按照目前的系統設計是須要「支付系統-MQ服務-用戶餘額系統」三者的處理知足數據的一致性要求。例如,若是支付系統感知到消息發送失敗後還能夠進行從新投遞,從而確保支付系統與用戶餘額數據的最終一致性。架構
而上述問題就是事務消息要解決的問題,在具體瞭解RocketMQ提供的事務消息機制以前,咱們先來看下在RocketMQ的早期版本不支持事務消息,或者由於歷史緣由選擇的消息中間件自己就不支持事務消息的狀況下,一些大公司是怎麼解決這個問題的?併發
早期爲了實現基於MQ異步調用的多個服務間,業務邏輯執行要麼一塊兒成功、要麼一塊兒失敗,具有事務特色,一般會採用可靠消息最終一致性方案,來實現分佈式事務。仍是以Joe充值這件事來舉例,可靠消息方案實現過程以下:
在可靠消息最終一致性方案中,爲了實現分佈式事務,須要確保上游服務本地事務的處理與MQ消息的投遞具備原子性,也就是說上游服務本地事務處理成功後要確保消息必定要成功投遞到MQ服務,不然消息就不該該被投遞到MQ服務;一樣,被成功投遞到MQ服務的消息,也必定要被下游服務成功處理,不然就須要從新投遞MQ消息。
爲了實現雙向的原子性,可靠消息服務須要對消息進行狀態標記,與此同時還須要對消息進行狀態檢查,從而實現從新投遞及消息狀態的最終一致性。核心流程說明以下:
一、上游服務(支付系統)如何確保完成自身支付成功狀態更新後消息100%的可以投遞到下游服務(用戶餘額系統)指定的Topic中?
在這個流程中上游服務在進行本地數據庫事務操做前,會先發送一個狀態爲「待確認」的消息至可靠消息服務,而不是直接將消息投遞到MQ服務的指定Topic。可靠消息服務此時會將該消息記錄到自身服務的消息數據庫中(消息狀態爲->待確認),完成後可靠消息服務會回調上游服務表示收到了消息,大家能夠進行本地事務的操做了。
以後上游服務就會開啓本地數據庫事務執行業務邏輯操做,這裏支付系統就會將該筆支付訂單狀態更新爲「已成功」。(注意,這裏只是舉個示例場景,在真正的實踐中通常是不會把支付訂單自己的狀態與業務端回調放在一個事務流程中的,關於這部分的詳細說明咱們在下面的場景說明中再討論)。
若是上游服務本地數據庫事務執行成功,則繼續向可靠消息服務發送消息確認消息,此時可靠消息服務就會正式將消息投遞到MQ服務,而且同時更新消息數據庫中的消息狀態爲「已發送」。(注意,這裏可靠消息服務更新消息狀態與投遞消息至MQ也必須是在一個原子操做中,即消息投遞成功則必定要將消息狀態更新爲「已發送」,因此在編程的細節中,可靠消息服務通常會先更新消息狀態,而後再進行消息投遞,這樣即便消息投遞失敗,也能夠對消息狀態進行回滾->「待確認」,相反若是先進行消息投遞再更新消息狀態,可能就很差控制了)。
相反,若是上游本地數據庫事務執行失敗,則須要向可靠消息服務發送消息刪除消息,可靠消息服務此時就會將消息刪除,這樣就意味着事務在上游消息投遞過程當中就被回滾了,而流程也就此結束了,此時上游服務能夠須要經過業務邏輯的設計進行重發,這個就再也不分佈式事務的討論範疇了。
說到這裏,你們可能會有疑問了!由於在上述描述中,即便上游服務本地數據庫事務執行成功了,可是在發送確認消息至可靠消息服務的過程當中,以及可靠消息服務在投遞消息至MQ服務的過程當中,仍是會存在失敗的風險,這樣的話仍是會致使支付服務更新了狀態,可是用戶餘額系統連消息都沒有收到的狀況發生?
實際上,實現數據一致性是一個複雜的活。在這個方案中可靠消息服務做爲基礎性的服務除了執行正常的邏輯外,還得處理複雜的異常場景。在實現過程當中可靠消息服務須要啓動相應的後臺線程,不斷輪訓消息的狀態,這裏會輪訓消息狀態爲「待確認」的消息,並判斷該消息的狀態的持續時間是否超過了規定的時間,若是超過規定時間的消息還處於「待確認」的狀態,就會觸發上游服務狀態詢問機制。
可靠消息服務就會調用上游服務提供的相關藉口,詢問這筆消息的處理狀況,若是這筆消息在上游服務處理成功,則後臺線程就會繼續觸發上圖中的步驟5,更新消息狀態爲「已發送」並投遞消息至MQ服務;反之若是這筆消息上游服務處理失敗,可靠消息服務則會進行消息刪除。經過這樣以上機制就確保了「上游服務本地事務成功處理+消息成功投遞」處於一個原子操做了。
二、下游服務(用戶餘額系統)如何確保對MQ服務Topic消息的消費100%都能處理成功?
在1的過程當中,確保了上游服務邏輯處理與MQ消息的投遞具有原子性,那麼當消息被成功投遞到了MQ服務的指定Topic後,下游服務如何才能確保消息的消費必定能被成功處理呢?
在正常的流程中,下游服務等待消費Topic的消息並進行自身本地數據庫事務的處理,若是處理成功則會主動通知可靠消息服務,可靠消息服務此時就會將消息的狀態更新爲「已完成」;反之,處理失敗下游服務就沒法再主動向可靠消息服務發送通知消息了。
此時,與消息投遞過程當中的異常邏輯同樣,可靠消息服務也會啓動相應的後臺線程,輪詢一直處於「已發送」狀態的消息,判斷狀態持續時間是否超過了規定時間,若是超時,可靠消息服務就會再次向MQ服務投遞此消息,從而確保消息能被再次消費處理。(注意,也可能出現下游服務處理成功,可是通知消息發送失敗的狀況,因此爲了確保冪等,下游服務也須要在業務邏輯上作好相應的防重處理)。
RocketMQ事務消息機制
在👆面第2小節的內容中,咱們演示了一個自編寫的中間服務+MQ來實現事務消息的示例。可是在現實的工做場景中,開發和維護一套可靠消息服務是一件很耗費資源和成本的事情,實際上,RocketMQ的最新版本(4.3.0+)中已經實現了可靠消息服務的全部功能,而且在保證高併發、高可用、高性能方面作了更爲優秀的架構實現。
從設計邏輯上看RocketMQ所支持的分佈式事務特性與上節中闡述的可靠消息服務基本上是一致的。只是RocketMQ在實現上相比較於可靠消息服務而言作了更爲複雜的設計,而且由於自然與MQ服務自己緊密結合,因此在高可用、可靠性、性能等方面直接繼承了MQ服務自己的架構優點。
下面咱們就結合流程並經過示例代碼的分析來和你們一塊兒理解下利用RocketMQ是如何實現分佈式事務操做的?
在應用場景中分佈式服務經過MQ通訊的過程當中,發送消息的一方咱們稱之爲Producer,接收消費消息的一方咱們稱之爲Consumer。若是Producer自身業務邏輯本地事務執行成功與否但願和消息的發送保持一個原子性(也就是說若是Producer本地事務執行成功,那麼這筆消息就必定要被成功的發送到RocketMQ服務的指定Topic,而且Consumer必定要被消費成功;反之,若是Producer本地事務執行失敗,那麼這筆消息就應該被RocketMQ服務器丟棄)的話,RocketMQ是怎麼作的呢?
一、Producer選擇使用RockerMQ提供的事務消息方法向RocketMQ服務發送事務消息(設置消息屬性TRAN_MSG=TRUE);
二、RocketMQ服務端在收到消息後會判斷消息的屬性是否爲事務消息,若是是普通消息就直接Push給Consumer;若是是事務消息就會對該消息進行特殊處理設置事務ID,並暫時設置該消息對Consumer不可見,以後向Producer返回Pre消息發送狀態(SEND_OK)。
三、以後Producer就會開始執行本地事務邏輯,並設置本地事務處理狀態後向RocketMQ服務器發送該事務消息的確認/回滾消息(COMMIT_MESSAGE/ROLLBACK_MESSAGE)。
四、RocketMQ服務器根據該筆事務消息的本地事務執行狀態決定是否將消息Push給Consumer仍是刪除該消息。
五、以後Consumer就會消費該消息,執行Consumer的本地事務邏輯,若是執行成功則向RocketMQ返回「CONSUME_SUCCESS」;反之出現異常則須要返回「RECONSUME_LATER」,以便RocketMQ再次Push該消息,這一點在實際編程中須要控制好。
正常狀況下以上就是RocketMQ事務消息的基本運行流程了,可是從異常狀況考慮,理論上也是存在Producer遲遲不發送確認或回滾消息的狀況。與可靠消息服務同樣,RocketMQ服務端也會設置後臺線程去掃描消息狀態,以後會調用Producer的本地checkLocalTransaction函數獲取本地事務狀態後繼續進行第3步操做。
相信看到這裏,你們對於RocketMQ的分佈式事務消息的理解應該有了一個相對清晰的概念了,那麼在代碼中如何編寫呢?
在開發中使用RocketMQ的分佈式事務消息Consumer的代碼不須要有什麼特別的變化與普通消息Consumer代碼一致就能夠。
Consumer示例代碼:
public static void main(String[] args) throws InterruptedException, MQClientException { // Instantiate with specified consumer group name. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_PAY_ACCOUNT"); // Specify name server addresses. consumer.setNamesrvAddr("10.211.55.4:9876;10.211.55.5:9876;10.211.55.6:9876"); // Subscribe one more more topics to consume. consumer.subscribe("PAY_ACCOUNT", "*"); // Register callback to execute on arrival of messages fetched from brokers. consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { System.out.println(new String(messageExt.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //Launch the consumer instance. consumer.start(); System.out.printf("Consumer Started.%n"); }
主要的改變是在Producer代碼,咱們須要額外編寫一個實現執行本地事務邏輯,以及檢查本地事務狀態的類。示例代碼以下:
public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } }
Producer示例代碼:
public class TransactionProducerTest { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("CID_PAY_ACCOUNT"); producer.setNamesrvAddr("10.211.55.4:9876;10.211.55.5:9876;10.211.55.6:9876"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; try { Map<String, String> paramMap = new HashMap<>(); paramMap.put("type", "6"); paramMap.put("bizOrderId", "15414012438257823"); paramMap.put("payOrderId", "15414012438257823"); paramMap.put("amount", "10"); paramMap.put("userId", "200001"); paramMap.put("tradeType", "charge"); paramMap.put("financeStatus", "0");//財務狀態,應收 paramMap.put("channel", "a");//餘額 paramMap.put("tradeTime", "20190101202022"); paramMap.put("nonce_str", "xkdkskskdksk"); //拼湊消息體 Message msg = new Message("PAY_ACCOUNT", "pre",paramMap.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } Thread.sleep(10*1000); producer.shutdown(); } }
與非事務消息直接調用RocketMQ Client的send方法不一樣,事務消息發送須要設置事務監聽器類,並調用sendMessageInTransaction方法,而這個方法的具體邏輯也就是上述流程中描述的那樣,具體你們能夠看下。
以上代碼只是示例代碼,在實際的項目中咱們是須要進行一些封裝設計的,以便與項目上下文環境集成。例如對於Springboot項目,咱們通常會編寫一個stater工程進行集成。你們感興趣能夠關注下個人github項目,後面我會以真實的項目場景作一些集成示範。
https://github.com/qiaojiang2/springboot-starter
場景說明
目前RocketMQ消息中間件的使用場景比較普遍,對於須要經過MQ進行異步解耦的分佈式應用系統來講,RocketMQ無疑是一個不錯的技術選擇。接下來,咱們就以對數據一致性要求很是高的分佈式支付系統爲例,來看看基於RocketMQ的事務消息適用於哪些特定場景,從而實現支付系統數據的高度一致性。
事實上,支付系統的數據一致性是一個複雜的問題,緣由在於支付流程的各個環節都存在異步的不肯定性,例如支付系統須要跟第三方渠道進行交互,不一樣的支付渠道交互流程存在差別,而且有異步支付結果回調的狀況。
除此之外,支付系統內部自己又是由多個不一樣子系統組成,除核心支付系統外,還有帳務系統、商戶通知系統等等,而核心支付系統自己也會被拆分爲多個不一樣的服務模塊,如風控、路由等用以實現不一樣的功能邏輯。某些場景咱們沒法經過分佈式事務來實現數據一致性,只能經過額外的業務補償手段,如二次輪訓、支付對帳等來實現數據最終一致性。
綜上所述,支付系統是一個複雜的系統,要徹底實現數據的一致性單靠某一種手段是沒法實現的,大部分狀況下咱們能夠經過額外的業務補償邏輯來實現數據最終一致性,只是這樣補償邏輯須要以更多的業務開發邏輯爲代價,而且在時效性上會存在延遲的問題。
舉個例子,支付核心系統支付成功後會更新本身的訂單狀態爲支付成功,整個核心交易流程是一個比較實時同步的場景,若是出現數據不一致,會有額外的補償邏輯如二次支付訂單狀態輪詢、T+1日對帳等用以確保支付狀態數據的最終一致性。可是除了核心支付外,支付成功的結果是須要通知到支付帳務系統、以及業務端系統,而爲了確保性能,通常後續的通知就不會與主流程同樣設計成實時同步,而是經過MQ異步解耦發送消息給獨立的「通知響應模塊」,而「通知響應模塊」此時就能夠經過分佈式事務消息來與支付帳戶系統、業務端等系統實現數據一致性,從而減小須要補償手段處理的範圍,提升系統的數據一致性等級和靈敏度。