每一個時代,都不會虧待會學習的人。數據庫
你們好,我是 yes。併發
今天咱們來談一談消息隊列的事務消息,一提及事務相信你們都不陌生,腦海裏蹦出來的就是 ACID。異步
一般咱們理解的事務就是爲了一些更新操做要麼都成功,要麼都失敗,不會有中間狀態的產生,而 ACID 是一個嚴格的事務實現的定義,不過在單體系統時候通常都不會嚴格的遵循 ACID 的約束來實現事務,更別說分佈式系統了。分佈式
分佈式系統每每只能妥協到最終一致性,保證數據最終的完整性和一致性,主要緣由就是實力不容許...由於可用性爲王。ide
並且要保證徹底版的事務實現代價很大,你想一想要維護這麼多系統的數據,不容許有中間狀態數據能夠被讀取,全部的操做必須不可分割,這意味着一個事務的執行是阻塞的,資源是被長時間鎖定的。高併發
在高併發狀況下資源被長時間的佔用,就是致命的傷害,舉一個有味道的例子,如廁高峯期,好了懂得都懂。源碼分析
對了, ACID 是什麼還不太清楚的同窗,趕忙去查一查,這裏我就不展開說了。學習
那說到分佈式事務,常見的有 2PC、TCC 和事務消息,這篇文章重點就是事務消息,不過 2PC 和 TCC 我稍微提一下。spa
2PC 就是二階段提交,分別有協調者和參與者兩個角色,二階段分別是準備階段和提交階段。線程
準備階段就是協調者向各參與者發送準備命令,這個階段參與者除了事務的提交啥都作了,而提交階段就是協調者看看各個參與者準備階段都 o 不 ok,若是有 ok 那麼就向各個參與者發送提交命令,若是有一個不 ok 那麼就發送回滾命令。
這裏的重點就是 2PC 只適用於數據庫層面的事務,什麼意思呢?就是你想在數據庫裏面寫一條數據同時又要上傳一張圖片,這兩個操做 2PC 沒法保證兩個操做知足事務的約束。
並且 2PC 是一種強一致性的分佈式事務,它是同步阻塞的,即在接收到提交或回滾命令以前,全部參與者都是互相等待,特別是執行完準備階段的時候,此時的資源都是鎖定的狀態,假若有一個參與者卡了好久,其餘參與者都得等它,產生長時間資源鎖定狀態下的阻塞。
整體而言效率低,而且存在單點故障問題,協調者是就是那個單點,而且在極端條件下存在數據不一致的風險,例如某個參與者未收到提交命令,此時宕機了,恢復以後數據是回滾的,而其餘參與者其實都已經執行了提交事務的命令了。
TCC 能保證業務層面的事務,也就是說它不只僅是數據庫層面,上面的上傳圖片這種操做它也能作。
TCC 分爲三個階段 try - confirm - cancel,簡單的說就是每一個業務都須要有這三個方法,先都執行 try 方法,這一階段不會作真正的業務操做,只是先佔個坑,什麼意思呢?好比打算加 10 個積分,那先在預添加字段加上這 10 積分,這個時候用戶帳上的積分實際上是沒有增長的。
而後若是都 try 成功了那麼就執行 confirm 方法,你們都來作真正的業務操做,若是有一個 try 失敗了那麼你們都執行 cancel 操做,來撤回剛纔的修改。
能夠看到 TCC 其實對業務的耦合性很大,由於業務上須要作必定的改造才能完成這三個方法,這其實就是 TCC 的缺點,而且 confirm 和 cancel 操做要注意冪等,由於到執行這兩步的時候沒有退路,是務必要完成的,所以須要有重試機制,因此須要保證方法冪等。
事務消息就是今天文章的主角了,它主要是適用於異步更新的場景,而且對數據實時性要求不高的地方。
它的目的是爲了解決消息生產者與消息消費者的數據一致性問題。
好比你點外賣,咱們先選了炸雞加入購物車,又選了瓶可樂,而後下單,付完款這個流程就結束了。
而購物車裏面的數據就很適合用消息通知異步刪除,由於通常而言咱們下完單不會再去點開這個店家的菜單,並且就算點開了購物車裏還有這些菜品也沒有關係,影響不大。
咱們但願的就是下單成功以後購物車的菜品最終會被刪除,因此要點就是下單和發消息這兩個步驟要麼都成功要麼都失敗。
咱們先來看一下 RocketMQ 是如何實現事務消息的。
RocketMQ 的事務消息也能夠被認爲是一個兩階段提交,簡單的說就是在事務開始的時候會先發送一個半消息給 Broker。
半消息的意思就是這個消息此時對 Consumer 是不可見的,並且也不是存在真正要發送的隊列中,而是一個特殊隊列。
發送完半消息以後再執行本地事務,再根據本地事務的執行結果來決定是向 Broker 發送提交消息,仍是發送回滾消息。
此時有人說這一步發送提交或者回滾消息失敗了怎麼辦?
影響不大,Broker 會定時的向 Producer 來反查這個事務是否成功,具體的就是 Producer 須要暴露一個接口,經過這個接口 Broker 能夠得知事務到底有沒有執行成功,沒成功就返回未知,由於有可能事務還在執行,會進行屢次查詢。
若是成功那麼就將半消息恢復到正常要發送的隊列中,這樣消費者就能夠消費這條消息了。
咱們再來簡單的看下如何使用,我根據官網示例代碼簡化了下。
能夠看到使用起來仍是很簡便直觀的,無非就是多加個反查事務結果的方法,而後把本地事務執行的過程寫在 TransationListener 裏面。
至此 RocketMQ 事務消息大體的流程已經清晰了,咱們畫一張總體的流程圖來過一遍,其實到第四步這個消息要麼就是正常的消息,要麼就是拋棄什麼都不存在,此時這個事務消息已經結束它的生命週期了。
而後咱們再從源碼的角度來看看究竟是怎麼作的,首先咱們看下sendMessageInTransaction
方法,方法有點長,不過沒有關係結構仍是很清晰的。
流程也就是咱們上面分析的,將消息塞入一些屬性,標明此時這個消息仍是半消息,而後發送至 Broker,而後執行本地事務,而後將本地事務的執行狀態發送給 Broker ,咱們如今再來看下 Broker 究竟是怎麼處理這個消息的。
在 Broker 的 SendMessageProcessor#sendMessage 中會處理這個半消息請求,由於今天主要分析的是事務消息,因此其餘流程不作分析,我大體的說一下原理。
簡單的說就是 sendMessage 中查到接受來的消息的屬性裏面MessageConst.PROPERTY_TRANSACTION_PREPARED
是 true ,那麼能夠得知這個消息是事務消息,而後再判斷一下這條消息是否超過最大消費次數,是否要延遲,Broker 是否接受事務消息等操做後,將這條消息真正的 topic 和隊列存入屬性中,而後重置消息的 topic 爲RMQ_SYS_TRANS_HALF_TOPIC
,而且隊列是 0 的隊列中,使得消費者沒法讀取這個消息。
以上就是總體處理半消息的流程,咱們來看一下源碼。
就是來了波狸貓換太子,其實延時消息也是這麼實現的,最終將換了皮的消息入盤。
Broker 處理提交或者回滾消息的處理方法是 EndTransactionProcessor#proce***equest
,咱們來看一看它作了什麼操做。
能夠看到,若是是提交事務就是把皮再換回來寫入真正的topic所屬的隊列中,供消費者消費,若是是回滾則是將半消息記錄到一個 half_op 主題下,到時候後臺服務掃描半消息的時候就依據其來判斷這個消息已經處理過了。
那個後臺服務就是 TransactionalMessageCheckService
服務,它會定時的掃描半消息隊列,去請求反查接口看看事務成功了沒,具體執行的就是TransactionalMessageServiceImpl#check
方法。
我大體說一下流程,這一步驟其實涉及到的代碼不少,我就不貼代碼了,有興趣的同窗自行了解。不過我相信用語言也是能說清楚的。
首先取半消息 topic 即RMQ_SYS_TRANS_HALF_TOPIC
下的全部隊列,若是還記得上面內容的話,就知道半消息寫入的隊列是 id 是 0 的這個隊列,而後取出這個隊列對應的 half_op 主題下的隊列,即 RMQ_SYS_TRANS_OP_HALF_TOPIC
主題下的隊列。
這個 half_op 主要是爲了記錄這個事務消息已經被處理過,也就是說已經得知此事務消息是提交的仍是回滾的消息會被記錄在 half_op 中。
而後調用 fillOpRemoveMap
方法,從 half_op 取一批已經處理過的消息來去重,將那些沒有記錄在 half_op 裏面的半消息調用 putBackHalfMsgQueue
又寫入了 commitlog 中,而後發送事務反查請求,這個反查請求也是 oneWay,即不會等待響應。固然此時的半消息隊列的消費 offset 也會推動。
而後producer中的 ClientRemotingProcessor#proce***equest 會處理這個請求,會把任務扔到 TransactionMQProducer 的線程池中進行,最終會調用上面咱們發消息時候定義的 checkLocalTransactionState
方法,而後將事務狀態發送給 Broker,也是用 oneWay 的方式。
看到這裏相信你們會有一些疑問,好比爲何要有個 half_op ,爲何半消息處理了還要再寫入 commitlog 中別急聽我一一道來。
首先 RocketMQ 的設計就是順序追加寫入,因此說不會更改已經入盤的消息,那事務消息又須要更新反查的次數,超過必定反查失敗就斷定事務回滾。
所以每一次要反查的時候就將之前的半消息再入盤一次,而且往前推動消費進度。而 half_op 又會記錄每一次反查的結果,不管是提交仍是回滾都會記錄,所以下一次還循環處處理此半消息的時候,能夠從 half_op 得知此事務已經結束了,所以就被過濾掉不須要處理了。
若是獲得的反查的結果是 UNKNOW,那 half_op 中也不會記錄此結果,所以還能再次反查,而且更新反查次數。
到如今整個流程已經清晰了,我再畫個圖總結一下 Broker 的事務處理流程。
Kafka 的事務消息和 RocketMQ 的事務消息又不同了,RocketMQ 解決的是本地事務的執行和發消息這兩個動做知足事務的約束。
而 Kafka 事務消息則是用在一次事務中須要發送多個消息的狀況,保證多個消息之間的事務約束,即多條消息要麼都發送成功,要麼都發送失敗,就像下面代碼所演示的。
Kafka 的事務基本上是配合其冪等機制來實現 Exactly Once 語義的,因此說 Kafka 的事務消息不是咱們想的那種事務消息,RocketMQ 的纔是。
講到這我就想扯一下了,說到這個 Exactly Once 其實不太清楚的同窗很容易會誤解。
咱們知道消息可靠性有三種,分別是最多一次、剛好一次、最少一次,以前在消息隊列連環問的文章我已經提到了基本上咱們都是用最少一次而後配合消費者端的冪等來實現剛好一次。
消息剛好被消費一次固然咱們全部人追求的,可是以前文章我已經從各方面已經分析過了,基本上難以達到。
而 Kafka 竟說它能實現 Exactly Once?這麼牛啤嗎?這實際上是 Kafka 的一個噱頭,你要說他錯,他還真沒錯,你要說他對可是他實現的 Exactly Once 不是你心中想的那個 Exactly Once。
它的剛好一次只能存在一種場景,就是從 Kafka 做爲消息源,而後作了一番操做以後,再寫入 Kafka 中。
那他是如何實現剛好一次的?就是經過冪等,和咱們在業務上實現的同樣經過一個惟一 Id, 而後記錄下來,若是已經記錄過了就不寫入,這樣來保證剛好一次。
因此說 Kafka 實現的是在特定場景下的剛好一次,不是咱們所想的利用 Kafka 來發送消息,那麼這條消息只會恰巧被消費一次。
這其實和 Redis 說他實現事務了同樣,也不是咱們心想的事務。
因此開源軟件說啥啥特性開發出來了,咱們一味的相信,所以其每每都是殘血的或者在特殊的場景下才能知足,不要被誤導了,不能相信表面上的描述,還得詳細的看看文檔或者源碼。
不過從另外一個角度看也無可厚非,做爲一個開源軟件確定是想更多的人用,我也沒說謊呀,我文檔上寫的很清楚的,這標題也沒騙人吧?
確實,好比你點進震驚xxxx標題的文章,人家也沒騙你啥,他本身確實震驚的呢。
再回來談 Kafka 的事務消息,因此說這個事務消息不是咱們想要的那個事務消息,其實不是今天的主題了,不過我仍是簡單的說一下。
Kafka 的事務有事務協調者角色,事務協調者其實就是 Broker 的一部分。
在開始事務的時候,生產者會向事務協調者發起請求表示事務開啓,事務協調者會將這個消息記錄到特殊的日誌-事務日誌中,而後生產者再發送真正想要發送的消息,這裏 Kafka 和 RocketMQ 處理不同,Kafka 會像對待正常消息同樣處理這些事務消息,由消費端來過濾這個消息。
而後發送完畢以後生產者會向事務協調者發送提交或者回滾請求,由事務協調者來進行兩階段提交,若是是提交那麼會先執行預提交,即把事務的狀態置爲預提交而後寫入事務日誌,而後再向全部事務有關的分區寫入一條相似事務結束的消息,這樣消費端消費到這個消息的時候就知道事務好了,能夠把消息放出來了。
最後協調者會向事務日誌中再記一條事務結束信息,至此 Kafka 事務就完成了,我拿 confluent.io 上的圖來總結一下這個流程。
至此咱們已經知道了 RocketMQ 和 Kakfa 的事務消息全流程,能夠看到 RocketMQ 的事務消息纔是咱們想要的,固然你要是用的流式計算那麼 Kakfa 的事務消息也是你想要的。
須要貼代碼的文章其實很難受,這貼的多很差,貼的少又怕不清晰,真的難,若是以爲文章不錯記得點個在看喲。
掃碼可關注個人公衆號哦~
我是 yes,從一點點到億點點,咱們下篇見。