最大努力通知也是一種解決分佈式事務的方案,下邊是一個是充值的例子:
交互流程 :
一、帳戶系統調用充值系統接口
二、充值系統完成支付處理向帳戶系統發起充值結果通知
若通知失敗,則充值系統按策略進行重複通知
三、帳戶系統接收到充值結果通知修改充值狀態
四、帳戶系統未接收到通知會主動調用充值系統的接口查詢充值結果
經過上邊的例子咱們總結最大努力通知方案的目標 :
目標 :發起通知方經過必定的機制最大努力將業務處理結果通知到接收方。
具體包括 :
一、有必定的消息重複通知機制。
由於接收通知方可能沒有接收到通知,此時要有必定的機制對消息重複通知。
二、消息校對機制。
若是盡最大努力也沒有通知到接收方,或者接收方消費消息後要再次消費,此時可由接收方主動向通知方查詢消息信息來知足需求。
最大努力通知與可靠消息一致性有什麼不一樣?
一、解決方案思想不一樣
可靠消息一致性,發起通知方須要保證將消息發出去,而且將消息發到接收通知方,消息的可靠性關鍵由發起通知方來保證。
最大努力通知,發起通知方盡最大的努力將業務處理結果通知爲接收通知方,可是可能消息接收不到,此時須要接收通知方主動調用發起通知方的接口查詢業務處理結果,通知的可靠性關鍵在接收通知方。
二、二者的業務應用場景不一樣
可靠消息一致性關注的是交易過程的事務一致,以異步的方式完成交易。
最大努力通知關注的是交易後的通知事務,即將交易結果可靠的通知出去。
三、技術解決方向不一樣
可靠消息一致性要解決消息從發出到接收的一致性,即消息發出而且被接收到。
最大努力通知沒法保證消息從發出到接收的一致性,只提供消息接收的可靠性機制。可靠機制是,最大努力的將消息通知給接收方,當消息沒法被接收方接收時,由接收方主動查詢消費(業務處理結果)。java
經過對最大努力通知的理解,採用MQ的ack機制就能夠實現最大努力通知。
方案1 :
本方案是利用MQ的ack機制由MQ向接收通知方發送通知,流程以下 :
一、發起通知方將通知發給MQ。
使用普通消息機制將通知發給MQ。
注意 :若是消息沒有發出去可由接收通知方主動請求發起通知方查詢業務執行結果。
二、接收通知方監聽MQ。
三、接收通知方接收消息,業務處理完成迴應ack。
四、接收通知方若沒有迴應ack則MQ會重複通知。
MQ會按照間隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知間隔(若是MQ採用rocketMq,在broker中可進行配置),直到達到通知要求的時間窗口上限。
五、接收通知方可經過消息校對接口來校對消息的一致性。
方案2 :
本方案也是利用MQ的ack機制,與方案1不一樣的是應用程序向接收通知方發送通知,以下圖 :
交互流程以下 :
一、發起通知方將通知發給MQ。
使用可靠消息一致方案中的事務消息保證本地事務與消息的原子性,最終將通知先發給MQ。
二、通知程序監聽MQ,接收MQ的消息。
方案1中接收通知方直接監聽MQ,方案2中由通知程序監聽MQ。
通知程序若沒有迴應ack則MQ會重複通知。
三、通知程序經過互聯網接口協議(如http、webservice)調用接收通知方案接口,完成通知。
通知程序調用接收通知方案接口成功就表示通知成功,即消費MQ消息成功,MQ將再也不向通知程序投遞通知消息。
四、接收通知方可經過消息校對接口來校對消息的一致性。
方案1和方案2的不一樣點 :
一、方案1中接收通知方與MQ接口,即接收通知方案監聽MQ,此方案主要應用與內部應用之間的通知。
二、方案2中由通知程序與MQ接口,通知程序監聽MQ,收到MQ的消息後由通知程序經過互聯網接口協議調用接收通知方。此方案主要應用於外部應用之間的通知,例如支付寶、微信的支付結果通知。web
本實例經過RocketMq中間件實現最大努力通知型分佈式事務,模擬充值過程。
本案例有帳戶系統和充值系統兩個微服務,其中帳戶系統的數據庫是bank1數據庫,其中有張三帳戶。充值系統的數據庫使用bank1_pay數據庫,記錄了帳戶的充值記錄。
業務流程以下圖 :
交互流程以下 :
一、用戶請求充值系統進行充值。
二、充值系統完成充值將充值結果發給MQ。
三、帳戶系統監聽MQ,接收充值結果通知,若是接收不到消息,MQ會重複發送通知。接收到充值結果通知帳戶系統增長充值金額。
四、帳戶系統也能夠主動查詢充值系統的充值結果查詢接口,增長金額。spring
本示例程序組成部分以下 :
數據庫:MySQL-5.7.25
包括bank1和bank1_pay兩個數據庫。
JDK:64位 jdk1.8.0_201
rocketmq 服務端:RocketMQ-4.5.0
rocketmq 客戶端:RocketMQ-Spring-Boot-starter.2.0.2-RELEASE 微服務框架:spring-boot-2.1.三、spring-cloud-Greenwich.RELEASE
微服務及數據庫的關係 :
dtx/dtx-notifymsg-demo/dtx-notifymsg-demo-bank1 銀行1,操做張三帳戶, 鏈接數據庫bank1 dtx/dtx-notifymsg-demo/dtx-notifymsg-demo-pay 銀行2,操做充值記錄,鏈接數據庫bank1_pay
交互流程以下 :
一、用戶請求充值系統進行充值。
二、充值系統完成充值將充值結果發給MQ。
三、帳戶系統監聽MQ,接收充值結果通知,若是接收不到消息,MQ會重複發送通知。接收到充值結果通知帳戶系統增長充值金額。
四、帳戶系統也能夠主動查詢充值系統的充值結果查詢接口,增長金額。數據庫
建立bank1庫,並導入如下表結構和數據(包含張三帳戶)apache
CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'; DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶 主姓名', `account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行卡號', `account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '賬戶密碼', `account_balance` double NULL DEFAULT NULL COMMENT '賬戶餘額', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic; INSERT INTO `account_info` VALUES (2, '張三的帳戶', '1', '', 10000); DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` ( `tx_no` varchar(64) COLLATE utf8_bin NOT NULL, `create_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
建立bank1_pay庫,並導入如下表結構:微信
CREATE DATABASE `bank1_pay` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'; CREATE TABLE `account_pay` ( `id` varchar(64) COLLATE utf8_bin NOT NULL, `account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帳號', `pay_amount` double NULL DEFAULT NULL COMMENT '充值餘額', `result` varchar(20) COLLATE utf8_bin DEFAULT NULL COMMENT '充值結果:success,fail', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
rocketMQ啓動方式與RocketMQ實現可靠消息最終一致性事務中徹底一致併發
discover-server是服務註冊中心,測試工程將本身註冊至discover-server。app
(1)父工程maven依賴說明
在dtx父工程中指定了SpringBoot和SpringCloud版本框架
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring‐boot‐dependencies</artifactId> <version>2.1.3.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring‐cloud‐dependencies</artifactId> <version>Greenwich.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency>
在dtx-notifymsg-demo父工程中指定了rocketmq-spring-boot-starter的版本。dom
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq‐spring‐boot‐starter</artifactId> <version>2.0.2</version> </dependency>
(2)配置rocketMQ
在application-local.properties中配置rocketMQ nameServer地址及生產組 :
rocketmq.producer.group = producer_bank2 rocketmq.name-server = 127.0.0.1:9876
dtx-notifydemo-pay實現以下功能 :
一、充值接口;
二、充值完成要通知;
三、充值結果查詢接口。
(2)Dao
@Mapper @Component public interface AccountPayDao { @Insert("insert into account_pay(id,account_no,pay_amount,result) values(#{id},# {accountNo},#{payAmount},#{result})") int insertAccountPay(@Param("id") String id,@Param("accountNo") String accountNo, @Param("payAmount") Double pay_amount,@Param("result") String result); @Select("select id,account_no accountNo,pay_amount payAmount,result from account_pay where id=#{txNo}") AccountPay findByIdTxNo(@Param("txNo") String txNo); }
(3)Service
@Service @Slf4j public class AccountPayServiceImpl implements AccountPayService{ @Autowired RocketMQTemplate rocketMQTemplate; @Autowired AccountPayDao accountPayDao; @Transactional @Override public AccountPay insertAccountPay(AccountPay accountPay) { int result = accountPayDao.insertAccountPay(accountPay.getId(), accountPay.getAccountNo(), accountPay.getPayAmount(), "success"); if(result>0){ //發送通知 rocketMQTemplate.convertAndSend("topic_notifymsg",accountPay); return accountPay; } return null; } @Override public AccountPay getAccountPay(String txNo) { AccountPay accountPay = accountPayDao.findByIdTxNo(txNo); return accountPay; } }
(4)Controller
@RestController public class AccountPayController { @Autowired AccountPayService accountPayService; //充值 @GetMapping(value = "/paydo") public AccountPay pay(AccountPay accountPay){ //事務號 String txNo = UUID.randomUUID().toString(); accountPay.setId(txNo); return accountPayService.insertAccountPay(accountPay); } //查詢充值結果 @GetMapping(value = "/payresult/{txNo}") public AccountPay payresult(@PathVariable("txNo") String txNo){ return accountPayService.getAccountPay(txNo); } }
dtx-notifydemo-bank1實現以下功能 :
一、監聽MQ,接收充值結果,根據充值結果完成帳戶金額修改。
二、主動查詢充值系統,根據充值結果完成帳戶金額修改。
1)Dao
@Mapper @Component public interface AccountInfoDao { //修改帳戶金額 @Update("update account_info set account_balance=account_balance+#{amount} where account_no=#{accountNo}") int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount); //查詢冪等記錄,用於冪等控制 @Select("select count(1) from de_duplication where tx_no = #{txNo}") int isExistTx(String txNo); //添加事務記錄,用於冪等控制 @Insert("insert into de_duplication values(#{txNo},now());") int addTx(String txNo); }
2)AccountInfoService
@Service @Slf4j public class AccountInfoServiceImpl implements AccountInfoService { @Autowired AccountInfoDao accountInfoDao; @Autowired PayClient payClient; /** * 更新賬號餘額,併發送消息 * * @param accountChange */ @Transactional @Override public void updateAccountBalance(AccountChangeEvent accountChange) { //冪等校驗 int existTx = accountInfoDao.isExistTx(accountChange.getTxNo()); if(existTx >0){ log.info("已處理消息:{}", JSONObject.toJSONString(accountChange)); return ; } //添加事務記錄 accountInfoDao.addTx(accountChange.getTxNo()); //更新帳戶金額 accountInfoDao.updateAccountBalance(accountChange.getAccountNo(),accountChange.getAmount()); } /** * 主動查詢充值結果 * * @param tx_no */ @Override public AccountPay queryPayResult(String tx_no) { //主動請求充值系統查詢充值結果 AccountPay accountPay = payClient.queryPayResult(tx_no); //充值結果 String result = accountPay.getResult(); log.info("主動查詢充值結果:{}", JSON.toJSONString(accountPay)); if("success".equals(result)){ AccountChangeEvent accountChangeEvent = new AccountChangeEvent(); accountChangeEvent.setAccountNo(accountPay.getAccountNo()); accountChangeEvent.setAmount(accountPay.getPayAmount()); accountChangeEvent.setTxNo(accountPay.getId()); updateAccountBalance(accountChangeEvent); } return accountPay; } }
@FeignClient(value = "dtx‐notifymsg‐demo‐pay", fallback = PayFallback.class) public interface PayClient { @GetMapping("/pay/payresult/{txNo}") AccountPay queryPayResult(@PathVariable("txNo") String txNo); } @Component public class PayFallback implements PayClient { @Override public AccountPay queryPayResult(String txNo) { AccountPay accountPay = new AccountPay(); accountPay.setResult("fail"); return accountPay; } }
3)監聽MQ
@Component @Slf4j @RocketMQMessageListener(topic="topic_notifymsg",consumerGroup="consumer_group_notifymsg_bank1") public class NotifyMsgListener implements RocketMQListener<AccountPay> { @Autowired AccountInfoService accountInfoService; @Override public void onMessage(AccountPay accountPay) { log.info("接收到消息:{}", JSON.toJSONString(accountPay)); AccountChangeEvent accountChangeEvent = new AccountChangeEvent(); accountChangeEvent.setAmount(accountPay.getPayAmount()); accountChangeEvent.setAccountNo(accountPay.getAccountNo()); accountChangeEvent.setTxNo(accountPay.getId()); accountInfoService.updateAccountBalance(accountChangeEvent); log.info("處理消息完成:{}", JSON.toJSONString(accountChangeEvent)); } }
4)Controller
@RestController @Slf4j public class AccountInfoController { @Autowired private AccountInfoService accountInfoService; //主動查詢充值結果 @GetMapping(value = "/payresult/{txNo}") public AccountPay result(@PathVariable("txNo") String txNo){ AccountPay accountPay = accountInfoService.queryPayResult(txNo); return accountPay; } }
最大努力通知方案是分佈式事務中對一致性要求最低的一種,適用於一些最終一致性時間敏感度低的業務;最大努力通知方案須要實現以下功能 :一、消息重複通知機制。二、消息校對機制。