可靠消息最終一致性方案是指當事務發起方執行完成本地事務後併發出一條消息,事務參與方(消息消費者)必定可以接收消息並處理事務成功,此方案強調的是隻要消息發給事務參與方最終事務要達到一致。spring
此方案是利用消息中間件完成,以下圖:sql
事務發起方(消息生產方)將消息發給消息中間件,事務參與方從消息中間件接收消息,事務發起方和消息中間件之間,事務參與方(消息消費方)和消息中間件之間都是經過網絡通訊,因爲網絡通訊的不肯定性會致使分佈式事務問題。數據庫
所以可靠消息最終一致性方案要解決如下幾個問題:apache
本地事務與消息發送的原子性問題即:事務發起方在本地事務執行成功後消息必須發出去,不然就丟棄消息。即實現本地事務和消息發送的原子性,要麼都成功,要麼都失敗。本地事務與消息發送的原子性問題是實現可靠消息最終一致性方案的關鍵問題。json
先來嘗試下這種操做,先發送消息,再操做數據庫:服務器
begin transaction; //1.發送MQ //2.數據庫操做 commit transation;
這種狀況下沒法保證數據庫操做與發送消息的一致性,由於可能發送消息成功,數據庫操做失敗。
你立馬想到第二種方案,先進行數據庫操做,再發送消息:網絡
begin transaction; //1.數據庫操做 //2.發送MQ commit transation;
這種狀況下貌似沒有問題,若是發送MQ消息失敗,就會拋出異常,致使數據庫事務回滾。但若是是超時異常,數據庫回滾,但MQ其實已經正常發送了,一樣會致使不一致。架構
事務參與方必須可以從消息隊列接收到消息,若是接收消息失敗能夠重複接收消息。併發
因爲網絡2的存在,若某一個消費節點超時可是消費成功,此時消息中間件會重複投遞此消息,就致使了消息的重複消費。app
要解決消息重複消費的問題就要實現事務參與方的方法冪等性。
上節討論了可靠消息最終一致性事務方案須要解決的問題,本節討論具體的解決方案。
本地消息表這個方案最初是eBay提出的,此方案的核心是經過本地事務保證數據業務操做和消息的一致性,而後經過定時任務將消息發送至消息中間件,待確認消息發送給消費方成功再將消息刪除。
下面以註冊送積分爲例來講明:
下例共有兩個微服務交互,用戶服務和積分服務,用戶服務負責添加用戶,積分服務負責增長積分。
交互流程以下:
一、用戶註冊
用戶服務在本地事務新增用戶和增長 」積分消息日誌「。(用戶表和消息表經過本地事務保證一致)
下邊是僞代碼
begin transaction; //1.新增用戶 //2.存儲積分消息日誌 commit transation;
這種狀況下,本地數據庫操做與存儲積分消息日誌處於同一個事務中,本地數據庫操做與記錄消息日誌操做具有原子性。
二、定時任務掃描日誌
如何保證將消息發送給消息隊列呢?
通過第一步消息已經寫到消息日誌表中,能夠啓動獨立的線程,定時對消息日誌表中的消息進行掃描併發送至消息中間件,在消息中間件反饋發送成功後刪除該消息日誌,不然等待定時任務下一週期重試。
三、消費消息
如何保證消費者必定能消費到消息呢?
這裏可使用MQ的ack(即消息確認)機制,消費者監聽MQ,若是消費者接收到消息而且業務處理完成後向MQ發送ack(即消息確認),此時說明消費者正常消費消息完成,MQ將再也不向消費者推送消息,不然消費者會不斷重試向消費者來發送消息。
積分服務接收到」增長積分「消息,開始增長積分,積分增長成功後向消息中間件迴應ack,不然消息中間件將重複投遞此消息。
因爲消息會重複投遞,積分服務的」增長積分「功能須要實現冪等性。
RocketMQ 是一個來自阿里巴巴的分佈式消息中間件,於 2012 年開源,並在 2017 年正式成爲 Apache 頂級項目。據瞭解,包括阿里雲上的消息產品以及收購的子公司在內,阿里集團的消息產品全線都運行在 RocketMQ 之上,而且最近幾年的雙十一大促中,RocketMQ 都有搶眼表現。Apache RocketMQ 4.3以後的版本正式支持事務消息,爲分佈式事務實現提供了便利性支持。
RocketMQ 事務消息設計則主要是爲了解決 Producer 端的消息發送與本地事務執行的原子性問題,RocketMQ 的設計中 broker 與 producer 端的雙向通訊能力,使得 broker 天生能夠做爲一個事務協調者存在;而 RocketMQ 自己提供的存儲機制爲事務消息提供了持久化能力;RocketMQ 的高可用機制以及可靠消息設計則爲事務消息在系統發生異常時依然可以保證達成事務的最終一致性。
在RocketMQ 4.3後實現了完整的事務消息,實際上實際上是對本地消息表的一個封裝,將本地消息表移動到了MQ內部,解決 Producer 端的消息發送與本地事務執行的原子性問題。
執行流程以下:
爲方便理解咱們還以註冊送積分的例子來描述 整個流程。
Producer 即MQ發送方,本例中是用戶服務,負責新增用戶。MQ訂閱方即消息消費方,本例中是積分服務,負責新增積分。
一、Producer 發送事務消息
Producer (MQ發送方)發送事務消息至MQ Server,MQ Server將消息狀態標記爲Prepared(預備狀態),注意此時這條消息消費者(MQ訂閱方)是沒法消費到的。本例中,Producer 發送 」增長積分消息「 到MQ Server。
二、MQ Server迴應消息發送成功
MQ Server接收到Producer 發送給的消息則迴應發送成功表示MQ已接收到消息。
三、Producer 執行本地事務
Producer 端執行業務代碼邏輯,經過本地數據庫事務控制。本例中,Producer 執行添加用戶操做。
四、消息投遞
若Producer 本地事務執行成功則自動向MQServer發送commit消息,MQ Server接收到commit消息後將」增長積分消息「 狀態標記爲可消費,此時MQ訂閱方(積分服務)即正常消費消息;
若Producer 本地事務執行失敗則自動向MQServer發送rollback消息,MQ Server接收到rollback消息後 將刪除」增長積分消息「 。
MQ訂閱方(積分服務)消費消息,消費成功則向MQ迴應ack,不然將重複接收消息。這裏ack默認自動迴應,即程序執行正常則自動迴應ack。
五、事務回查
若是執行Producer端本地事務過程當中,執行端掛掉,或者超時,MQ Server將會不停的詢問同組的其餘 Producer 來獲取事務執行狀態,這個過程叫事務回查。MQ Server會根據事務回查結果來決定是否投遞消息。
以上主幹流程已由RocketMQ實現,對用戶側來講,用戶須要分別實現本地事務執行以及本地事務回查方法,所以只需關注本地事務的執行狀態便可。
RoacketMQ提供RocketMQLocalTransactionListener接口:
public interface RocketMQLocalTransactionListener { /** ‐ 發送prepare消息成功此方法被回調,該方法用於執行本地事務 ‐ @param msg 回傳的消息,利用transactionId便可獲取到該消息的惟一Id ‐ @param arg 調用send方法時傳遞的參數,當send時候如有額外的參數能夠傳遞到send方法中,這裏能獲取到 ‐ @return 返回事務狀態,COMMIT:提交 ROLLBACK:回滾 UNKNOW:回調 */ RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg); /** ‐ @param msg 經過獲取transactionId來判斷這條消息的本地事務執行狀態 ‐ @return 返回事務狀態,COMMIT:提交 ROLLBACK:回滾 UNKNOW:回調 */ RocketMQLocalTransactionState checkLocalTransaction(Message msg); }
發送事務消息:
如下是RocketMQ提供用於發送事務消息的API:
TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); //設置TransactionListener實現 producer.setTransactionListener(transactionListener); //發送事務消息 SendResult sendResult = producer.sendMessageInTransaction(msg, null);
本實例經過RocketMQ中間件實現可靠消息最終一致性分佈式事務,模擬兩個帳戶的轉帳交易過程。
兩個帳戶在分別在不一樣的銀行(張三在bank一、李四在bank2),bank一、bank2是兩個微服務。交易過程是,張三給李四轉帳指定金額。
上述交易步驟,張三扣減金額與給bank2發轉帳消息,兩個操做必須是一個總體性的事務。
本示例程序組成部分以下:
數據庫:MySQL-5.7.25,包括bank1和bank2兩個數據庫。
JDK:64位 jdk1.8.0_201
rocketmq 服務端:RocketMQ-4.5.0
rocketmq 客戶端:RocketMQ-Spring-Boot-starter.2.0.2-RELEASE
微服務框架:spring-boot-2.1.三、spring-cloud-Greenwich.RELEASE
微服務及數據庫的關係 :
dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank1 銀行1,操做張三帳戶, 鏈接數據庫bank1
dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank2 銀行2,操做李四帳戶,鏈接數據庫bank2
本示例程序技術架構以下:
交互流程以下:
一、Bank1向MQ Server發送轉帳消息
二、Bank1執行本地事務,扣減金額
三、Bank2接收消息,執行本地事務,添加金額
導入數據庫腳本:資料\sql\bank1.sql、資料\sql\bank2.sql,已經導過不用重複導入。
建立bank1庫,並導入如下表結構和數據(包含張三帳戶)
CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'; DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶主姓名', `account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行卡號', `account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '賬戶密碼', `account_balance` double NULL DEFAULT NULL COMMENT '賬戶餘額', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic; INSERT INTO `account_info` VALUES (2, '張三的帳戶', '1', '', 10000);
建立bank2庫,並導入如下表結構和數據(包含李四帳戶)
CREATE DATABASE `bank2` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'; CREATE TABLE `account_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶主姓名', `account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行卡號', `account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '賬戶密碼', `account_balance` double NULL DEFAULT NULL COMMENT '賬戶餘額', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic; INSERT INTO `account_info` VALUES (3, '李四的帳戶', '2', NULL, 0);
在bank一、bank2數據庫中新增de_duplication,交易記錄表(去重表),用於交易冪等控制。
DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` ( `tx_no` varchar(64) COLLATE utf8_bin NOT NULL, `create_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
(1)下載RocketMQ服務器
下載地址:http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.5.0/rocketmq-all-4.5.0-bin-release.zip
(2)解壓並啓動
啓動nameserver:
set ROCKETMQ_HOME=[rocketmq服務端解壓路徑] start [rocketmq服務端解壓路徑]/bin/mqnamesrv.cmd
啓動broker:
set ROCKETMQ_HOME=[rocketmq服務端解壓路徑] start [rocketmq服務端解壓路徑]/bin/mqbroker.cmd ‐n 127.0.0.1:9876 autoCreateTopicEnable=true
dtx-txmsg-demo是本方案的測試工程,根據業務需求須要建立兩個dtx-txmsg-demo工程。
(1)導入dtx-txmsg-demo
導入:資料\基礎代碼\dtx-txmsg-demo到父工程dtx下。
兩個測試工程以下:
dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank1 ,操做張三帳戶,鏈接數據庫bank1
dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank2 ,操做李四帳戶,鏈接數據庫bank2
(2)父工程maven依賴說明
在dtx父工程中指定了SpringBoot和SpringCloud版本
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring‐boot‐dependencies</artifactId> <version>2.1.3.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring‐cloud‐dependencies</artifactId> <version>Greenwich.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency>
在dtx-txmsg-demo父工程中指定了rocketmq-spring-boot-starter的版本。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq‐spring‐boot‐starter</artifactId> <version>2.0.2</version> </dependency>
(3)配置rocketMQ
在application-local.propertis中配置rocketMQ nameServer地址及生產組:
rocketmq.producer.group = producer_bank2 rocketmq.name‐server = 127.0.0.1:9876
其它詳細配置見導入的基礎工程。
dtx-txmsg-demo-bank1實現以下功能:
一、張三扣減金額,提交本地事務。
二、向MQ發送轉帳消息。
2)Dao
@Mapper @Component public interface AccountInfoDao { @Update("update account_info set account_balance=account_balance+#{amount} where account_no=# {accountNo}") int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount); @Select("select count(1) from de_duplication where tx_no = #{txNo}") int isExistTx(String txNo); @Insert("insert into de_duplication values(#{txNo},now());") int addTx(String txNo); }
3)AccountInfoService
@Service @Slf4j public class AccountInfoServiceImpl implements AccountInfoService { @Resource private RocketMQTemplate rocketMQTemplate; @Autowired private AccountInfoDao accountInfoDao; /** * 更新賬號餘額‐發送消息 * producer向MQ Server發送消息 * * @param accountChangeEvent */ @Override public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) { //構建消息體 JSONObject jsonObject = new JSONObject(); jsonObject.put("accountChange",accountChangeEvent); Message<String> message =MessageBuilder.withPayload(jsonObject.toJSONString()).build(); TransactionSendResult sendResult =rocketMQTemplate.sendMessageInTransaction("producer_group_txmsg_bank1", "topic_txmsg", message, null); log.info("send transcation message body={},result= {}",message.getPayload(),sendResult.getSendStatus()); } /** * 更新賬號餘額‐本地事務 * producer發送消息完成後接收到MQ Server的迴應即開始執行本地事務 * * @param accountChangeEvent */ @Transactional @Override public void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) { log.info("開始更新本地事務,事務號:{}",accountChangeEvent.getTxNo()); accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmoun t() * ‐1); //爲冪等做準備 accountInfoDao.addTx(accountChangeEvent.getTxNo()); if(accountChangeEvent.getAmount() == 2){ throw new RuntimeException("bank1更新本地事務時拋出異常"); } log.info("結束更新本地事務,事務號:{}",accountChangeEvent.getTxNo()); } }
4)RocketMQLocalTransactionListener
編寫RocketMQLocalTransactionListener接口實現類,實現執行本地事務和事務回查兩個方法。
@Component @Slf4j @RocketMQTransactionListener(txProducerGroup = "producer_group_txmsg_bank1") public class ProducerTxmsgListener implements RocketMQLocalTransactionListener { @Autowired AccountInfoService accountInfoService; @Autowired AccountInfoDao accountInfoDao; //消息發送成功回調此方法,此方法執行本地事務 @Override @Transactional public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) { //解析消息內容 try { String jsonString = new String((byte[]) message.getPayload()); JSONObject jsonObject = JSONObject.parseObject(jsonString); AccountChangeEvent accountChangeEvent =JSONObject.parseObject(jsonObject.getString("accountChange"), AccountChangeEvent.class); //扣除金額 accountInfoService.doUpdateAccountBalance(accountChangeEvent); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error("executeLocalTransaction 事務執行失敗",e); e.printStackTrace(); return RocketMQLocalTransactionState.ROLLBACK; } } //此方法檢查事務執行狀態 @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { RocketMQLocalTransactionState state; final JSONObject jsonObject = JSON.parseObject(new String((byte[]) message.getPayload())); AccountChangeEvent accountChangeEvent =JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.class); //事務id String txNo = accountChangeEvent.getTxNo(); int isexistTx = accountInfoDao.isExistTx(txNo); log.info("回查事務,事務號: {} 結果: {}", accountChangeEvent.getTxNo(),isexistTx); if(isexistTx>0){ state= RocketMQLocalTransactionState.COMMIT; }else{ state= RocketMQLocalTransactionState.UNKNOWN; } return state; } }
5)Controller
@RestController @Slf4j public class AccountInfoController { @Autowired private AccountInfoService accountInfoService; @GetMapping(value = "/transfer") public String transfer(@RequestParam("accountNo")String accountNo,@RequestParam("amount") Double amount){ String tx_no = UUID.randomUUID().toString(); AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo,amount,tx_no); accountInfoService.sendUpdateAccountBalance(accountChangeEvent); return "轉帳成功"; } }
dtx-txmsg-demo-bank2須要實現以下功能:
一、監聽MQ,接收消息。
二、接收到消息增長帳戶金額。
1) Service
注意爲避免消息重複發送,這裏須要實現冪等。
@Service @Slf4j public class AccountInfoServiceImpl implements AccountInfoService { @Autowired AccountInfoDao accountInfoDao; /** * 消費消息,更新本地事務,添加金額 * @param accountChangeEvent */ @Override @Transactional public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) { log.info("bank2更新本地帳號,帳號:{},金額:{}",accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount()); //冪等校驗 int existTx = accountInfoDao.isExistTx(accountChangeEvent.getTxNo()); if(existTx<=0){ //執行更新 accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmoun t()); //添加事務記錄 accountInfoDao.addTx(accountChangeEvent.getTxNo()); log.info("更新本地事務執行成功,本次事務號: {}", accountChangeEvent.getTxNo()); }else{ log.info("更新本地事務執行失敗,本次事務號: {}", accountChangeEvent.getTxNo()); } } }
2)MQ監聽類
@Component @RocketMQMessageListener(topic = "topic_txmsg",consumerGroup = "consumer_txmsg_group_bank2") @Slf4j public class TxmsgConsumer implements RocketMQListener<String> { @Autowired AccountInfoService accountInfoService; @Override public void onMessage(String s) { log.info("開始消費消息:{}",s); //解析消息爲對象 final JSONObject jsonObject = JSON.parseObject(s); AccountChangeEvent accountChangeEvent =JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.class); //調用service增長帳號金額 accountChangeEvent.setAccountNo("2"); accountInfoService.addAccountInfoBalance(accountChangeEvent); } }
bank1本地事務失敗,則bank1不發送轉帳消息。
bank2接收轉帳消息失敗,會進行重試發送消息。
bank2屢次消費同一個消息,實現冪等。
可靠消息最終一致性就是保證消息從生產方通過消息中間件傳遞到消費方的一致性,本案例使用了RocketMQ做爲消息中間件,RocketMQ主要解決了兩個功能:
一、本地事務與消息發送的原子性問題。
二、事務參與方接收消息的可靠性。
可靠消息最終一致性事務適合執行週期長且實時性要求不高的場景。引入消息機制後,同步的事務操做變爲基於消息執行的異步操做, 避免了分佈式事務中的同步阻塞操做的影響,並實現了兩個服務的解耦。