Dubbo學習系列之八(分佈式事務之MQ方案)

自從小王玩起了微服務,發現微服務果真很強大,好處真是太多,心中暗喜,然而,卻也遇到了分佈式中最棘手的問題:分佈式事務。小王遍訪各路神仙,也無個完美開源解決方案,固然,也有些實際可行的手法,雖不算完美,但也可拿來研究一番,那今天咱們也來講說分佈式事務。前端

    分佈式事務的起源,即因各服務是獨立的,各自使用獨立的DB,那本地事務能夠保證事務式執行,但其餘服務上關聯的事務呢?以前Dubbo學習系列之六(微服務架構實戰)項目中鋪墊的最大bug在於:若是訂單付款中異常,本地訂單數據將會自動回滾,然而庫存服務和物流服務,收到通訊消息後,就執行了,並無同步回滾!這就是典型的分佈式事務。linux

    分佈式事務的解決思路有二:其一,強一致性方案,2PC(兩階段提交)和TCC(try-confirm-cancel)都是對全部相關的事務作同步協調處理,先嚐試準備資源,鎖住數據,若是均可執行,才都開始執行事務,執行後再確認,若是有一個事務失敗,將所有回滾,這個方案缺點太多,執行效率低,回滾代價高,鎖住資源多,對各業務侵入嚴重,必須人工進行各類事件編碼,除了如金融類強調一致性的場景,並不推薦使用。其二,弱一致性(最終一致性)方案,基於可靠性MQ的異步,根據BASE理論以及CAP理論,即造成最終一致性。本地事務先執行,而後發送消息至其餘服務,並確保消費者服務執行相關事務。舉例:只要訂單付款成功,那麼就必定會減掉用戶的帳戶金額。下面就動手實現這個例子吧!git

準備github

Idea201902/JDK11/ZK3.5.5/Gradle5.4.1/RabbitMQ3.7.13/Mysql8.0.11/Lombok0.26/Erlang21.2/postman7.5.0/Redis3.2/Rocket4.5.2sql

難度:新手--戰士--老兵--大師數據庫

目標:1.模擬商城系統,訂單付款後,經過RocketMQ消息機制實現分佈式事務方法  2.使用RabbitMQ延遲隊列實現訂單過時取消apache

步驟編程

1.項目架構及代碼基礎設施,見往期文章,總體不變,只作增量,若有重大修改,會加以說明。網絡

2.Rocket的安裝見文章最後部分後記第2點。架構

3.RocketMQ事務實現邏輯:「兩段提交,回查確認」,首先msg發給Rocket,但msg並沒馬上發送給下游消費者,而是等待發起者的事務執行結果,若是執行無誤,則Rocket纔將消息發送到下游,若是事務執行失敗,則消息返回,下游無感知。

4.先實現第二個目標--使用RabbitMQ延遲隊列實現訂單過時取消,先看一個總體邏輯圖:


 

exchangeA-->queueC是「訂單到物流線」,exchangeA-->queueA-->exchangeB-->queueB是「過時訂單取消線」。

5.訂單到物流線參考往期文章--Dubbo學習系列之六(微服務架構實戰),有微調,過時訂單取消線,看幾個核心點:

隊列屬性設置見前面文章)com.biao.mall.business.impl.DubboOrderServiceImpl中saveOrder(orderBO)方法中的最後添加消息的邏輯,即將訂單放入過時訂單取消線,注意全部訂單都會放入,而後在取消處理邏輯中判斷是否取消,由於不能選擇性從queue取出特定消息:


 

再定義消息消費者,處理訂單消息:com.biao.mall.business.service.DLXMsgConsumer


 

再定義一個訂單取消邏輯:

com.biao.mall.business.impl.DubboOrderServiceImpl中的cancelOrder(String orderId)方法:

在這裏判斷訂單是否須要取消,已經付款的直接返回。須要取消的而後更新訂單狀態,更新庫存:


 

6.測試流程:啓動Redis-->ZK-->Stock-->Business-->Logistics

模擬一個訂單:


 

數據庫會生成一個訂單:


 

隊列中會生成一個消息:


 

一段時間過時後,自動進入DLX處理,DB中能夠看到最後訂單被取消,置爲expired,這個過程當中同時也可觀察到dubbo_stock表,鎖定庫存數變化又回退的過程。


 

7.再來看第一個邏輯實現過程,這個稍微複雜一點:現改變下分析順序,咱們按實際邏輯處理順序來:


 

controller前端傳入後,進入com.biao.mall.business.impl.DubboOrderServiceImpl這個核心邏輯來處理:


 

這裏使用了線程池,充分利用硬件,保證效率。注意這裏必須使用事務型消息生產者:TransactionMQProducer,其綁定了一個事務監聽器:TransactionListenerImpl,這個是我認爲是事務MQ的核心,監聽器能夠根據處理事務結果決定下一步處理,能夠看到msg/orderId/this做爲實參傳入,爲啥這樣設計?有點怪異的感受。看看傳入後的地方或許有答案:com.biao.mall.business.impl.DubboOrderServiceImpl


 

這裏我特地構造了一個用於傳參的構造函數,對應上一圖的實參,答案就是第一個方法executeLocalTransaction是用於處理本地事務的,即業務邏輯要放這裏!並且特別注意,我使用的是orderService中構造一個獨立的payOrderTrans方法,實際上是上一版本中payOrder方法的切割,而後在這裏引入使用,且這裏的arg不是對應形參Object arg!!執行正常即返回對應的狀態常量。


 

第二個方法checkLocalTransaction是用於檢查本地事務執行的結果,即對第一個方法的跟蹤,並提供回調,最終結果返回到DubboOrderServiceImpl中。補充說明下三個狀態,事務執行結果就是這三個之一:


 

UNKNOW 時,程序會自動不斷check狀態,默認15次(可指定)以後報異常,COMMIT表示執行完成,向下遊發送消息,ROLLBACK即回滾,下游無感知;我註釋了三種狀態的check次數。


 

7.再回到DubboOrderServiceImpl,其中的transactionMQProducer經過com.biao.mall.common.conf.RocketTransMqProducerConf注入,

注意這裏必須使用事務型Producer:


 

8.同理,消息consumer注入在com.biao.mall.common.conf.RocketMqConsumerConf,注意選擇的是Push型消費者,而後加入了一個監聽器(裏面包含真正的消費業務處理邏輯),最後是設置訂閱的主題和標籤。


 

9.再進一步,看這個的消費者監聽器實現com.biao.mall.common.component.RocketConsumeMsgListener,這裏有個核心方法consumeMessage,處理消息,業務邏輯我暫時作個消息展現,後續再實現,要特別注意這個的冪等處理,因消息可能會重複消費。我親測,若是開另一個客戶端作消費,就會收到重複消息,分佈式多實例下,非廣播消息是競爭消費,廣播型消息全消費,因此冪等必需要處理!


 

10.回到DubboOrderServiceImpl中,rocketMQService對象實現,兩個方法,一個發消息,一個收消息:


 

11.爲了簡化並完備邏輯,我直接在DubboOrderServiceImpl中作消息消費處理。即//扣款 註釋下。

12. 測試:啓動ZK-->Redis-->Rocket-->stock-->business-->logistics

postman先生成一個未付款訂單:


 

模擬付款:


 

控制檯的輸出:rocketMQ接收,並模擬扣款處理:


 

最後訂單數據,paid爲1,完成!其餘表數據變化就不展現了。


 

13.代碼結構集體照:


 

14.項目代碼地址:其中的day11

https://github.com/xiexiaobiao/dubbo-project.git

                                                     項目覆盤記

1.MQ在微服務中至關之重要,因此出場比較多,這裏我簡要的作了個幾款主流MQ的比較,之因此我這裏同時使用了Rabbit和Rocket,折騰一把,是爲了實際的體會與學習使用之目的,後續文章確定會加入kafka。


 

2.Rocket的安裝要點:A.下載bin-release.zip版本,B.要配置ROCKETMQ_HOME環境變量  C.若是Windows+JDK11環境,要修改runserver.cmd/runbroker.cmd文件,因JDK兼容性問題,以下命令


 

會提示xxx命令沒法識別,修改時直接去掉-XX xxx便可,詳細可參考網絡資源 D.Rocket的網頁管理工具,需本身打包生成jar,項目是https://github.com/apache/rocketmq-externals.git中的console子項目,具體打包步驟參考官網介紹,很詳細,但JDK11時要額外添加依賴包,


 

3.RocketMQ4.5.2+JDK11在window下目前兼容性不好,官方提供的啓動腳本不能正常啓動,因啓動腳本是根據JDK1.8開發,修改啓動腳本,能啓動RocketMQ後,console管理工具啓動也沒法鏈接RocketMQ,從新安裝JDK1.8,一切正常!整了一天未果,但我比較鑽尖,就是要使用JDK11,最後只能曲線救國,在linux上安裝RocketMQ和console,成功!生成console的jar時,如下警告,能夠忽略,我嘗試添加netty最新的依賴,能夠消除該警告。


 

開啓producer,遇到這個錯誤:


 

關閉linux防火牆,或者開相應的端口!linux下安裝RocketMQ方法在此不表,稍微複雜,但值得操做。

4.Windows下啓動Rocket遇到的錯誤示例(JDK11):


 

Windows下啓動Rocket正常的狀態(JDK1.8):


 

linux下正常啓動(JDK11),也要修改啓動腳本,參考個人另外一篇:

linux下JDK11和RocketMQ安裝,啓動命令及成功結果以下:


 

 

5.linux下安裝RocketMQ,詳細見個人另外一篇:linux下JDK11和RocketMQ安裝

6.編程就是「思考十分鐘,編寫一分鐘」,不少設計思惟很重要,好比此項目中的「過時訂單取消線」,若是看官說,沒付款且過時的消息才進入DLX,這個回答就是錯誤的!由於queue沒法跳躍式pop消息。再好比,MQ事務中各業務邏輯的位置是有約定的,就要作轉換設計,業務邏輯如何切割整合,各方的調用等。

往期文章導航:

Dubbo學習系列之七(分佈式訂單ID方案)

Dubbo學習系列之六(微服務架構實戰)

Dubbo學習系列之五(MQ實現微服務間通訊)

Dubbo學習系列之四(MybaitsPlus+Druid使用)

Dubbo學習系列之三(Gradle打包可運行jar)

Dubbo學習系列之二(Nacos作配置中心)

Dubbo學習系列之一(消費者生產者模式搭建)

歡迎關注個人 公衆號,各類技術分析使用實戰:

 

相關文章
相關標籤/搜索