深度剖析如何實現事務消息

這是一篇從去年寫到今年的文章,但願你們會喜歡spring

1.背景

分佈式事務一直是一個老生常談的一個話題,在個人公衆號下面下面已經寫過不少篇分佈式事務相關的文章了,可是依舊沒有將其徹底剖析。在以前的文章中我也屢次提到咱們可使用消息隊列來實現咱們的分佈式事務,可是大多都是一筆帶過,不少讀者都對這一塊產生了不少疑問,但願讀完這篇文章能讓你理解如何用消息隊列實現分佈式事務。數據庫

固然首先要回顧一下咱們的一些基本概念:bash

CAP

CAP定理,又被叫做布魯爾定理。對於設計分佈式系統來講(不只僅是分佈式事務)的架構師來講,CAP就是你的入門理論。網絡

  • C (一致性):對某個指定的客戶端來講,讀操做能返回最新的寫操做。對於數據分佈在不一樣節點上的數據上來講,若是在某個節點更新了數據,那麼在其餘節點若是都能讀取到這個最新的數據,那麼就稱爲強一致,若是有某個節點沒有讀取到,那就是分佈式不一致。
  • A (可用性):非故障的節點在合理的時間內返回合理的響應(不是錯誤和超時的響應)。可用性的兩個關鍵一個是合理的時間,一個是合理的響應。合理的時間指的是請求不能無限被阻塞,應該在合理的時間給出返回。合理的響應指的是系統應該明確返回結果而且結果是正確的,這裏的正確指的是好比應該返回50,而不是返回40。
  • P (分區容錯性):當出現網絡分區後,系統可以繼續工做。打個比方,這裏個集羣有多臺機器,有臺機器網絡出現了問題,可是這個集羣仍然能夠正常工做。

熟悉CAP的人都知道,三者不能共有,若是感興趣能夠搜索CAP的證實,在分佈式系統中,網絡沒法100%可靠,分區實際上是一個必然現象,若是咱們選擇了CA而放棄了P,那麼當發生分區現象時,爲了保證一致性,這個時候必須拒絕請求,可是A又不容許,因此分佈式系統理論上不可能選擇CA架構,只能選擇CP或者AP架構。架構

對於CP來講,放棄可用性,追求一致性和分區容錯性,咱們的zookeeper其實就是追求的強一致。分佈式

對於AP來講,放棄一致性(這裏說的一致性是強一致性),追求分區容錯性和可用性,這是不少分佈式系統設計時的選擇,後面的BASE也是根據AP來擴展。spa

順便一提,CAP理論中是忽略網絡延遲,也就是當事務提交時,從節點A複製到節點B,可是在現實中這個是明顯不可能的,因此總會有必定的時間是不一致。同時CAP中選擇兩個,好比你選擇了CP,並非叫你放棄A。由於P出現的機率實在是過小了,大部分的時間你仍然須要保證CA。就算分區出現了你也要爲後來的A作準備,好比經過一些日誌的手段,是其餘機器回覆至可用。線程

BASE

BASE 是 Basically Available(基本可用)、Soft state(軟狀態)和 Eventually consistent (最終一致性)三個短語的縮寫。是對CAP中AP的一個擴展設計

基本可用:分佈式系統在出現故障時,容許損失部分可用功能,保證核心功能可用。 軟狀態:容許系統中存在中間狀態,這個狀態不影響系統可用性,這裏指的是CAP中的不一致。 最終一致:最終一致是指通過一段時間後,全部節點數據都將會達到一致。日誌

BASE解決了CAP中理論沒有網絡延遲,在BASE中用軟狀態和最終一致,保證了延遲後的一致性。BASE和 ACID 是相反的,它徹底不一樣於ACID的強一致性模型,而是經過犧牲強一致性來得到可用性,並容許數據在一段時間內是不一致的,但最終達到一致狀態。

事務消息

咱們的全部事務消息均可以看做是BASE模型的實現。在業界中有事務消息功能比較有表明性的就是阿里開源的RocketMQ和去哪兒開源的QMQ,他們兩個消息隊列都實現了事務消息功能,可是實現的方式卻各有不一樣,接下來也會分別剖析這兩個消息隊列是如何實現事務消息。

2. RocketMQ-事務消息

RocketMQ事務消息究竟是怎麼一回事呢?

基本流程以下: 第一階段Prepared消息,會拿到消息的地址。 第二階段執行本地事務。 第三階段經過第一階段拿到的地址去訪問消息,並修改狀態。消息接受者就能使用這個消息。 若是確認消息失敗,在RocketMq Broker中提供了定時掃描沒有更新狀態的消息,若是有消息沒有獲得確認,會向消息發送者發送消息,來判斷是否提交,在rocketmq中是以listener的形式給發送者,用來處理。

若是確認消息失敗,在RocketMq Broker中提供了定時掃描沒有更新狀態的消息,若是有消息沒有獲得確認,會向消息發送者發送消息,來判斷是否提交,在rocketmq中是以listener的形式給發送者,用來處理。

若是消費超時,則須要一直重試,消息接收端須要保證冪等。若是消息消費失敗,這個就須要人工進行處理,由於這個機率較低,若是爲了這種小几率時間而設計這個複雜的流程反而得不償失

這個圖你們想必再其餘地方已經看見過不少次了,不少時候從看這個圖只能只知其一;不知其二,那接下來看看代碼是如何實現的吧。

2.1 使用事務消息

在RocketMQ的事務消息中有個很重要的監聽器叫TransactionListener,咱們須要實現他

其中有兩個方法:

  • executeLocalTransaction:顧名思義執行咱們的本地事務方法,通常來講咱們的本地事務方法是由上層的業務順序推動調用,可是在rocketMQ的事務消息中是須要由Listener來進行驅動,若是要使用RocketMQ的事務消息須要對咱們的業務進行必定的改造。而且這裏還須要注意的是,咱們在事務中還須要保存消息的事務ID和當前事務的對應關係。

  • checkLocalTransaction:根據咱們以前的事務ID來檢查咱們的本地事務狀態,這裏的狀態有三種: 事務消息共有三種狀態,提交狀態、回滾狀態、中間狀態:

    • TransactionStatus.CommitTransaction: 提交事務,它容許消費者消費此消息。
    • TransactionStatus.RollbackTransaction: 回滾事務,它表明該消息將被刪除,不容許被消費。
    • TransactionStatus.Unknown: 中間狀態,它表明須要檢查消息隊列來肯定狀態。返回這個狀態的時候RocketMQ會進行重試檢查,爲了防止頻繁檢查,默認將單個消息的檢查次數限制爲15 次。

對於咱們的消息發送有以下代碼:

咱們發如今代碼中咱們將咱們以前的listener以及一個線程池來和咱們的producer進行綁定,這裏線程池的做用是咱們checkLocalTransaction所使用的線程池。

2.2 實現原理

2.2.1 客戶端

這裏的代碼比較簡單,主要分下面幾個步驟

  • Step 1: 先發送消息至Broker.
  • Step 2: 根據發送的結果,判斷是否執行本地事務,若是發送成功,則執行本地事務。
  • Step 3: 記錄本地事務狀態,這裏的狀態也就是上面咱們所講的提交事務,回滾事務,中間狀態三個狀態。
  • Step 4: 結束事務,根據本地事務狀態決定是提交或者回滾。

對於checkLocalTransaction:

在RocketMQ中會接收RocketMQ-Broker發送的 CHECK_TRANSACTION_STATE請求,來執行檢查本地事務狀態。

2.3.1 服務端

在Broker上會對事務消息進行特殊判斷:

若是是事務消息那麼就須要走prepareMessage這個邏輯,prepareMessage這個邏輯以下:

主要是將當前消息的topic替換成RMQ_SYS_TRANS_HALF_TOPIC。咱們的一階段發送半消息到這裏就完成了,接下來就是Broker處理咱們事務的commit或者rollback:

圖中紅色方框表示咱們的核心步驟,對於commit的一共有三步:

  • 獲取須要commit的半消息
  • 將消息發送到原來的topic
  • 刪除半消息

對於rollback一共有兩步:

  • 獲取須要rollback的半消息
  • 刪除半消息

對於獲取消息這個比較簡單,經過記錄的offset直接查詢就好,對於將消息發送到原來的topic邏輯基本上能夠複用,這裏要重點討論的是如何刪除半消息,咱們都知道RocketMQ是順序寫入,咱們不可能去真正的刪除消息,那麼就只能依靠一些其餘的途徑,咱們能夠想到消息消費了以後,只要offset不重置,這個消息就不會再被消費,那麼其實就實現了刪除的功能。RocketMQ也是經過這樣的思路,本身實現了一個消費者,去消費RMQ_SYS_TRANS_HALF_TOPIC這個Topic,若是消息須要刪除的話消費了以後就不須要作其餘操做,若是不須要刪除的話,消費了以後又會從新投遞。

那其實核心就在於怎麼去記錄半消息是否應該刪除呢?對於這個問題RocketMQ採用了新的TopicRMQ_SYS_TRANS_OP_HALF_TOPIC來保存半消息是否刪除,其實在上面的刪除半消息的流程中其實也是對RMQ_SYS_TRANS_OP_HALF_TOPIC投遞了一個op_message,而後由後臺任務去進行操做。

整個流程原理圖以下面所示:

  • Step1: 發送事務消息,這裏也叫作halfMessage,會將Topic替換爲HalfMessage的Topic。
  • Step2: 發送commit或者rollback,若是是commit這裏會查詢出以前的消息,而後將消息復原成原Topic,而且發送一個OpMessage用於記錄當前消息能夠刪除。若是是rollback這裏會直接發送一個OpMessage刪除。
  • Step3: 在Broker有個處理事務消息的定時任務,定時對比halfMessage和OpMessage,若是有OpMessage且狀態爲刪除,那麼該條消息一定commit或者rollback,因此就能夠刪除這條消息。
  • Step4: 若是事務超時(默認是6s),尚未opMessage,那麼頗有可能commit信息丟了,這裏會去反查咱們的Producer本地事務狀態。
  • Step5: 根據查詢出來的信息作Step2。

2.3 小結

上面已經講了如何使用RocketMQ的事務消息和實現原理,想必你們已經對RocketMQ事務消息有本身的認識了。可是RocketMQ的事務消息目前在個人一些業務實戰中是歷來沒有使用過的,主要緣由有幾個方面:

  • 改形成本大,好比一個下單的操做,建立訂單的本地事務通常來講是同步進行的,建立以後會獲取到訂單ID,可是在RocketMQ中這個本地事務變成了在Listener裏面的操做了,那麼就不能經過返回參數來進行,只能經過一些其餘方法來完成這個業務邏輯,好比ThreadLocal等等。
  • 須要記錄TransactionId和本地事務狀態的關係
  • 只支持單個事務消息,若是我建立訂單須要發送10種消息,若是都想保持事務一致,那麼RocketMQ是不支持的。

綜上所述,RocketMQ的事務消息在我看來的確屬於比較雞肋,很難去適應於老業務。那麼怎麼去接下來說一下QMQ的事務消息的解決方案,看看這種方案可否解決咱們所說的這種問題呢?

3. QMQ事務消息

QMQ的事務消息沒有RocketMQ那麼的複雜,對於消息中間件的自己改造是很小的,其依賴了數據庫自身的本地事務,好比一個建立訂單,須要發送兩種消息,分別是A和B,那麼有以下的僞代碼:

begin transaction;
createOrder();
commit transaction;

sendMessageA();
snedMessageB();
複製代碼

這個時候咱們發現消息A和消息B都在事務以外,其一致性得不到保證,那麼其實咱們發送消息的時候不必定要真正的和消息中間件打交道,咱們能夠作一個本地的存儲,保存咱們的消息:

begin transaction;
createOrder();
saveMessageA();
saveMessageB();
commit transaction;
// 發送消息
sendMessageA();
snedMessageB();

複製代碼

能夠看見其實咱們只是增長兩個保存消息的操做,那麼咱們是如何保證一致性呢,若是發送MessageA的時候掛了,那麼咱們就能夠經過定時任務去拉去咱們數據庫中保存的並無發送的消息,而後再次進行發送。

其實這種方法一樣的能夠擴展至其餘的消息隊列,由於對於消息中間件自己是沒有入侵的,若是RocketMQ或者Kafka也想使用這種方法來保證事務消息,也是能夠的。

咱們來看看這種方法可否解決RocketMQ事務消息帶來的問題呢?

  • 改形成本,只須要改造一次Client,在QMQ中重寫了spring的TransactionSynchronization,能夠直接把代碼簡化成以下面所示:
begin transaction;
createOrder();
sendMessageA();
snedMessageB();
commit transaction;

複製代碼

這裏的send其實內部邏輯是saveMessage,在commit以後會自動進行發送,而且後臺有定時任務會補償發送。

  • 不須要額外作transactionId和message的綁定
  • 支持發送多個事務消息

RocketMQ事務消息帶來的問題基本能夠解決,可是其一樣也有缺點,由於其引入了額外的數據庫寫,若是事務消息較多,那麼就會多出不少寫數據庫的操做,對於響應時間比較敏感的服務須要仔細考慮

4.總結

介紹了兩種事務消息,對於我我的而言,QMQ實現的方案能更加適應於大多數業務。可是這裏要注意事務消息並非全部的分佈式一致性都能使用,事務消息使用的場景只能是發出這個消息就能表明這個操做成功的場景,什麼意思呢?舉個例子,好比咱們支付的時候會扣積分,扣券等等,若是我發一個扣積分的消息能表明必定成功嗎?這個確定是不行的,由於用戶的積分可能不夠,就會致使扣除失敗。若是是發送一個贈送積分的消息那麼就能夠表明成功,由於贈送積分是屬於加法,並無太多的限制。

若是發現事務消息不能很好的知足的知足業務場景,那麼你就能夠考慮其餘的一些事務策略,好比TCC,saga等,這些在我以前的文章都有講述。

若是你們以爲這篇文章對你有幫助,你的關注和轉發是對我最大的支持,O(∩_∩)O:

相關文章
相關標籤/搜索