分佈式事務之解決方案(可靠消息最終一致性)

 

5. 分佈式事務解決方案之可靠消息最終一致性

5.1. 什麼是可靠消息最終一致性事務

可靠消息最終一致性方案是指當事務發起執行徹底本地事務後併發出一條消息,事務參與方(消息消費者)必定可以接收消息並處理事務成功,此方案強調的是隻要消息發給事務參與方最終事務要達到一致。
此方案是利用消息中間件完成,以下圖:
事務發起方(消息生產方)將消息發給消息中間件,事務參與方從消息中間件接收消息,事務發起方和消息中間件之間,事務參與方(消息消費方)和消息中間件之間都是經過網絡通訊,因爲網絡通訊的不肯定性致使分佈式事務問題。
在這裏插入圖片描述
所以可靠消息最終一致性方案要解決如下幾個問題 :
一、本地事務與消息發送的原子性問題
本地事務與消息發送的原子性問題即 :事務發起方在本地事務執行成功後消息必須發出去,不然就丟棄消息。即實現本地事務和消息發送的原子性,要麼都成功,要麼都失敗。本地事務與消息發送的原子性問題是實現可靠消息最終一致性方案的關鍵問題。
先來嘗試下這種操做,先發送消息,再操做數據庫 :java

begin transaction;
		// 1.發送MQ
		// 2.數據庫操做
commit transation;

這種狀況下沒法保證數據庫操做與發送消息的一致性,由於可能發送消息成功,數據庫操做失敗。
你立馬想到第二種方案,先進行數據庫操做,再發送消息 :spring

begin transaction;
		// 1.數據庫操做
		// 2.發送MQ
commit transation;

這種狀況下貌似沒有問題,若是發送MQ消息失敗,就會拋出異常,致使數據庫事務回滾。但若是是超時異常,數據庫回滾,但MQ其實已經正常發送來,一樣會致使不一致。
二、事務參與方接收消息的可靠性
事務參與方必須可以從消息隊列接收到消息,若是接收消息失敗能夠重複接收消息。
三、消息重複消費的問題
因爲網絡2的存在,若某一個消費節點超時可是消費成功,此時消息中間件會重複投遞此消息,就致使來消息的重複消費。要解決消息重複消費的問題就要實現事務參與方的方法冪等性。數據庫

5.2. 解決方案

5.2.1. 本地消息表方案

本地消息表這個方案最初是eBay提出的,此方案的核心是經過本地事務保證數據業務操做和消息的一致性,而後經過定時任務將消息發送至消息中間件,待確認消息發送給消費方成功再將消息刪除。
下面以註冊送積分爲例來講明 :
下例共有兩個微服務交互,用戶服務和積分服務,用戶服務負責添加用戶,積分服務負責增長積分。
在這裏插入圖片描述
交互流程以下 :
一、用戶註冊
用戶服務在本地事務新增用戶和增長「積分消息日誌」。(用戶表和消息表經過本地事務保證一致)
下表是僞代碼apache

begin transaction;
		// 1.新增用戶
		// 2.存儲積分消息日誌
commit transation;

這種狀況下,本地數據庫操做與存儲積分消息日誌處於同一事務中,本地數據庫操做與記錄消息日誌操做具有原子性。
二、定時任務掃描日誌
如何保證將消息發送給消息隊列呢?
通過第一步消息已經寫到消息日誌表中,能夠啓動獨立的線程,定時對消息日誌表中的消息進行掃描併發送至消息中間件,在消息中間件反饋發送成功後刪除該消息日誌,不然等待定時任務下一週期重試。
三、消費消息
如何保證消費者必定能消費到消息呢?
這裏可使用MQ的ack(即消息確認)機制,消費者監聽MQ,若是消費者接收到消息而且業務處理完成後向MQ發送ack(即消息確認),此時說明消費者正常消費消息完成,MQ將再也不向消費者推送消息,不然消費者會不斷重試向消費者來發送消息。
積分服務接收到「增長積分」消息,開始增長積分,積分增長成功後消息中間件迴應ack,不然消息中間件將重複投遞此消息。
因爲消息會重複投遞,積分服務的「增長積分」功能須要實現冪等性。json

5.2.2. RocketMQ事務消息方案

RocketMQ是一個來自阿里巴巴的分佈式消息中間件,於2012年開源,並在2017年正式成爲Apache頂級項目。據瞭解,包括阿里雲上的消息產品以及收購的子公司在內,阿里集團的消息產品全線都運行在RocketMQ之上,而且最近幾年的雙十一大促中,RocketMQ都有搶眼表現。Apache RocketMQ 4.3以後的版本正式支持事務消息,爲分佈式事務實現提供來便利性支持。
RocketMQ事務消息設計則主要是爲了解決Producer端的消息發送與本地事務執行的原子性問題,RocketMQ的設計中broker與producer端的雙向通訊能力,使得broker天生能夠做爲一個事務協調者存在;而RocketMQ自己提供的存儲機制爲事務消息提供了持久化能力;RocketMQ的高可用機制以及可靠消息設計則爲事務消息在系統發生異常時依然可以保證達成事務的最終一致性。
在RocketMQ 4.3後實現了完整的事務消息,實際上實際上是對本地消息表的一個封裝,將本地消息表移動到了MQ內部,解決Producer端的消息發送與本地事務執行的原子性問題。
在這裏插入圖片描述
執行流程以下 :
爲方便理解咱們還以註冊送積分的例子來描述整個流程。
Producer即MQ發送方,本例中是用戶服務,負責新增用戶。MQ訂閱方即消息消費方,本例中是積分服務,負責新增積分。
一、Producer發送事務消息
Producer(MQ發送方)發送事務消息至MQ Server,MQ Server將消息狀態標記爲Prepared(預覽狀態),注意此時這條消息消費者(MQ訂閱方)是沒法消費到的。
二、MQ Server迴應消息發送成功
MQ Server接收到Producer發送給的消息則迴應發送成功表示MQ已接收到消息。
三、Producer執行本地事務
Producer端執行業務代碼邏輯,經過本地數據庫事務控制。
本例中,Producer執行添加用戶操做。
四、消息投遞
若Producer本地事務執行成功則自動向MQ Server發送commit消息,MQ Server接收到commit消息後將「增長積分消息」狀態標記爲可消費,此時MQ訂閱方(積分服務)即正常消費消息;
若Producer 本地事務執行失敗則自動向MQ Server發送rollback消息,MQ Server接收到rollback消息後將刪除「增長積分消息」。
MQ訂閱方(積分服務)消費消息,消費成功則向MQ迴應ack,不然將重複接收消息。這裏ack默認自動迴應,即程序執行正常則自動迴應ack。
五、事務回查
若是執行Producer端本地事務過程當中,執行端掛掉,或者超時,MQ Server將會不停的詢問同組的其餘Producer來獲取事務執行狀態,這個過程叫事務回查。MQ Server會根據事務回查結果來決定是否投遞消息。
以上主幹流程已由RocketMQ實現,對用戶則來講,用戶須要分別實現本地事務執行以及本地事務回查方法,所以只需關注本地事務的執行狀態便可。
RocketMQ提供RocketMQLocalTransactionListener接口 :服務器

public interface RocketMQLocalTransactionListener {
	/**
	發送prepare消息成功此方法被回調,該方法用於執行本地事務
	@param msg 回傳的消息,利用transactionId便可獲取到該消息的惟一Id
	@param arg 調用send方法時傳遞的參數,當send時候如有額外的參數能夠傳遞到send方法中,這裏能獲取到
	@return 返回事務狀態,COMMIT :提交 ROLLBACK :回滾 UNKNOW :回調
	*/
	RocketMQLocalTransactionState executeLocalTransaction(Message msg,Object arg);
	/**
	@param msg 經過獲取transactionId來判斷這條消息的本地事務執行狀態
	@return 返回事務狀態,COMMIT :提交 ROLLBACK :回滾 UNKNOW :回調
	*/
	RocketMQLocalTransactionState checkLocalTransaction(Message msg);
}
  • 發送事務消息 :
    如下是RocketMQ提供用於發送事務消息的API :
TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
// 設置TransactionListener實現
producer.setTransactionListener(transactionListener);
// 發送事務消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);

5.3. RocketMQ實現可靠消息最終一致性事務

5.3.1. 業務說明

本實例經過RocketMQ中間件實現可靠消息最終一致性分佈式事務,模擬兩個帳戶的轉帳交易過程。
兩個帳戶在分別在不一樣的銀行(張三在bank一、李四在bank2),bank一、bank2是兩個微服務。交易過程是,張三給李四轉帳指定金額。
上述交易步驟,張三扣減金額與給bank2發轉帳消息,兩個操做必須是一個總體性的事務。
在這裏插入圖片描述網絡

5.3.2.程序組成部分

本示例程序組成部分以下: 數據庫:MySQL-5.7.25
包括bank1和bank2兩個數據庫。
JDK:64位 jdk1.8.0_201
rocketmq 服務端:RocketMQ-4.5.0
rocketmq 客戶端:RocketMQ-Spring-Boot-starter.2.0.2-RELEASE 微服務框架:spring-boot-2.1.三、spring-cloud-Greenwich.RELEASE 微服務及數據庫的關係 :
dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank1 銀行1,操做張三帳戶, 鏈接數據庫bank1 dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank2 銀行2,操做李四帳戶,鏈接數據庫bank2
本示例程序技術架構以下 :
在這裏插入圖片描述
交互流程以下 :
一、Bank1向MQ Server發送轉帳消息;
二、Bank1執行本地事務,扣減金額;
三、Bank2接收消息,執行本地事務,添加金額。架構

5.3.3. 建立數據庫

建立bank1庫,並導入如下表結構和數據(包含張三帳戶)併發

CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶 主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行 卡號',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '賬戶密碼',
`account_balance` double NULL DEFAULT NULL COMMENT '賬戶餘額',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (2, '張三的帳戶', '1', '', 10000);

建立bank2庫,並導入如下表結構和數據(包含李四帳戶)app

CREATE DATABASE `bank2` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶
主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行
卡號',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'賬戶密碼',
`account_balance` double NULL DEFAULT NULL COMMENT '賬戶餘額', PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (3, '李四的帳戶', '2', NULL, 0);

在bank一、bank2數據庫中新增de_duplication,交易記錄表(去重表),用於交易冪等控制。

DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` (
`tx_no` varchar(64) COLLATE utf8_bin NOT NULL, `create_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;

5.3.4. 啓動RocketMQ

(1)下載RocketMQ服務器
下載地址 :http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.5.0/rocketmq-all-4.5.0-bin- release.zip
(2)解壓並啓動
啓動nameserver :

set ROCKETMQ_HOME=[rocketmq服務端解壓路徑] 
 start [rocketmq服務端解壓路徑]/bin/mqnamesrv.cmd

啓動broker:

set ROCKETMQ_HOME=[rocketmq服務端解壓路徑]
start [rocketmq服務端解壓路徑]/bin/mqbroker.cmd ‐n 127.0.0.1:9876 autoCreateTopicEnable=true

3.3.5 工程概述

(1)父工程maven依賴說明
在父工程中指定來SpringBoot和SpringCloud版本

<dependency> 
	<groupId>org.springframework.boot</groupId> 
	<artifactId>spring‐boot‐dependencies</artifactId>
	<version>2.1.3.RELEASE</version>
	<type>pom</type>
	<scope>import</scope>
</dependency>
<dependency> 
	<groupId>org.springframework.cloud</groupId> 
	<artifactId>spring‐cloud‐dependencies</artifactId>
	<version>Greenwich.RELEASE</version>
	<type>pom</type>
<scope>import</scope> </dependency>

在dtx-txmsg-demo父工程中指定來rocketmq-spring-boot-starter的版本

<dependency>
	<groupId>org.apache.rocketmq</groupId> 
	<artifactId>rocketmq‐spring‐boot‐starter</artifactId> 
	<version>2.0.2</version>
</dependency>

(2)配置rocketMQ
application-local.properties中配置rocketMQ nameServer地址及生產組 :

rocketmq.producer.group = producer_bank2 
 rocketmq.name‐server = 127.0.0.1:9876

3.3.6 dtx-txmsg-demo-bank1

dtx-txmsg-demo-bank1實現以下功能:
一、張三扣減金額,提交本地事務。
二、向MQ發送轉帳消息。
2)Dao

@Mapper
@Component
public interface AccountInfoDao {
	@Update("update account_info set account_balance=account_balance+#{amount} where account_no=# {accountNo}")
	int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
	@Select("select count(1) from de_duplication where tx_no = #{txNo}") int isExistTx(String txNo);
	@Insert("insert into de_duplication values(#{txNo},now());") int addTx(String txNo);
}

3)AccountInfoService

@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
	@Resource
	private RocketMQTemplate rocketMQTemplate;
	@Autowired
	private AccountInfoDao accountInfoDao;
	/** * 更新賬號餘額‐發送消息 * producer向MQ Server發送消息 * * @param accountChangeEvent */
	@Override
	public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
	//構建消息體
	JSONObject jsonObject = new JSONObject(); jsonObject.put("accountChange",accountChangeEvent); Message<String> message =
	MessageBuilder.withPayload(jsonObject.toJSONString()).build(); TransactionSendResult sendResult =
	rocketMQTemplate.sendMessageInTransaction("producer_group_txmsg_bank1", "topic_txmsg", message, null);
	log.info("send transcation message body={},result= {}",message.getPayload(),sendResult.getSendStatus());
	}	
	/** * 更新賬號餘額‐本地事務 * producer發送消息完成後接收到MQ Server的迴應即開始執行本地事務 * * @param accountChangeEvent */
	@Transactional
	@Override
	public void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
	log.info("開始更新本地事務,事務號:{}",accountChangeEvent.getTxNo()); accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount() *1);
		} 
	}
	//爲冪等做準備 accountInfoDao.addTx(accountChangeEvent.getTxNo()); if(accountChangeEvent.getAmount() == 2){
	throw new RuntimeException("bank1更新本地事務時拋出異常"); log.info("結束更新本地事務,事務號:{}",accountChangeEvent.getTxNo());
		}
}

4)RocketMQLocalTransactionListener
編寫RocketMQLocalTransactionListener接口實現類,實現執行本地事務和事務回查兩個方法。

@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_group_txmsg_bank1") public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {
	@Autowired
	AccountInfoService accountInfoService;
	@Autowired
	AccountInfoDao accountInfoDao;
	//消息發送成功回調此方法,此方法執行本地事務
	@Override
	@Transactional
	public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
	//解析消息內容 
	try {
		String jsonString = new String((byte[]) message.getPayload()); 
		JSONObject jsonObject = JSONObject.parseObject(jsonString); AccountChangeEvent accountChangeEvent =
		JSONObject.parseObject(jsonObject.getString("accountChange"), AccountChangeEvent.class); //扣除金額
		accountInfoService.doUpdateAccountBalance(accountChangeEvent);
		return RocketMQLocalTransactionState.COMMIT; 
	} catch (Exception e) {
	log.error("executeLocalTransaction 事務執行失敗",e); e.printStackTrace();
	return RocketMQLocalTransactionState.ROLLBACK;
		} 
	}
	//此方法檢查事務執行狀態
	@Override
	public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
		RocketMQLocalTransactionState state;
		final JSONObject jsonObject = JSON.parseObject(new String((byte[]) message.getPayload()));
		AccountChangeEvent accountChangeEvent = JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.class);
		//事務id
		String txNo = accountChangeEvent.getTxNo();
		int isexistTx = accountInfoDao.isExistTx(txNo);
		log.info("回查事務,事務號: {} 結果: {}", accountChangeEvent.getTxNo(),isexistTx); if(isexistTx>0){
		state= RocketMQLocalTransactionState.COMMIT; 
		}else{
		state= RocketMQLocalTransactionState.UNKNOWN; 
		}
	 	return state;
		} 
}

4)Controller

@RestController
@Slf4j
public class AccountInfoController {
@Autowired
private AccountInfoService accountInfoService;
@GetMapping(value = "/transfer")
public String transfer(@RequestParam("accountNo")String accountNo,@RequestParam("amount") Double amount){
	String tx_no = UUID.randomUUID().toString();
	AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo,amount,tx_no);
	accountInfoService.sendUpdateAccountBalance(accountChangeEvent);
	return "轉帳成功"; 
	}
}

3.3.7 dtx-txmsg-demo-bank2

dtx-txmsg-demo-bank2須要實現以下功能 :
一、監聽MQ,接收消息。
二、接收到消息增長帳戶金額。
1)Service
注意爲避免消息重複發送,這裏須要實現冪等。

@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
	@Autowired
	AccountInfoDao accountInfoDao;
	/** * 消費消息,更新本地事務,添加金額 * @param accountChangeEvent */
	@Override
	@Transactional
	public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {
		log.info("bank2更新本地帳號,帳號:{},金額: {}",accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
		//冪等校驗
		int existTx = accountInfoDao.isExistTx(accountChangeEvent.getTxNo());			if(existTx<=0){
		//執行更新 accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
			}
		} 
	}
	//添加事務記錄
	accountInfoDao.addTx(accountChangeEvent.getTxNo()); 
	log.info("更新本地事務執行成功,本次事務號: {}", accountChangeEvent.getTxNo());
	}else{
	log.info("更新本地事務執行失敗,本次事務號: {}", accountChangeEvent.getTxNo());
			}
		} 
}

2)MQ監聽類

@Component
@RocketMQMessageListener(topic = "topic_txmsg",consumerGroup = "consumer_txmsg_group_bank2") 
@Slf4j
public class TxmsgConsumer implements RocketMQListener<String> {
	@Autowired
	AccountInfoService accountInfoService;
	@Override
	public void onMessage(String s) {
		log.info("開始消費消息:{}",s);
		//解析消息爲對象
		final JSONObject jsonObject = JSON.parseObject(s); AccountChangeEvent accountChangeEvent =
		JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.class);
		//調用service增長帳號金額 accountChangeEvent.setAccountNo("2"); accountInfoService.addAccountInfoBalance(accountChangeEvent);
		} 
}

5.3.8 測試場景

  • bank1本地事務失敗,則bank1不發送轉帳消息。
  • bank2接收轉帳消息失敗,會進行重試發送消息。
  • bank2屢次消費同一個消息,實現冪等。

5.4 小結

可靠消息最終一致性就是保證消息從生產方通過消息中間件傳遞到消費方的一致性,本案例使用了RocketMQ做爲消息中間件,RocketMQ主要解決了兩個功能 :一、本地事務與消息發送的原子性問題。二、事務參與方接收消息的可靠性。可靠消息最終一致性事務適合執行週期長且實時性要求不高的場景。引入消息機制後,同步的事務操做變爲基於消息執行的異步操做,避免了分佈式事務中的同步阻塞操做的影響,並實現了兩個服務的解耦。

相關文章
相關標籤/搜索