有關RocketMQ實現分佈式事務前面寫了一篇博客html
一、RocketMQ實現分佈式事務原理java
下面就這個項目作個總體簡單介紹,並在文字最下方附上項目Github地址。git
項目整體技術選型github
SpringCloud(Finchley.RELEASE) + SpringBoot2.0.4 + Maven3.5.4 + RocketMQ4.3 +MySQL + lombok(插件)
有關SpringCloud主要用到如下四個組建spring
Eureka Server +config-server(配置中心)+ Eureka Client + Feign(服務間調用)
配置中心是用MySQL存儲數據。數據庫
config-service # 配置中心 eureka # 註冊中心 service-order #訂單微服務 service-produce #商品微服務
各服務的啓動順序就安裝上面的順序啓動。json
大體流程
api
啓動後,配置中心、訂單微服務、商品微服務都會將信息註冊到註冊中心。服務器
若是訪問:localhost:7001
(註冊中心地址),以上服務都出現說明啓動成功。網絡
用戶在訂單微服務下單後,會去回調商品微服務去減庫存。這個過程須要事務的一致性。
頁面輸入:
http://localhost:9001/api/v1/order/save?userId=1&productId=1&total=4
訂單微服務執行狀況(訂單服務事務執行成功)
商品微服務執行狀況(商品服務事務執行成功)
固然你也能夠經過修改參數來模擬分佈式事務出現的各類狀況。
這裏展現下,生產者發送消息核心代碼。
@Slf4j @Component public class TransactionProducer { /** * 須要自定義事務監聽器 用於 事務的二次確認 和 事務回查 */ private TransactionListener transactionListener ; /** * 這裏的生產者和以前的不同 */ private TransactionMQProducer producer = null; /** * 官方建議自定義線程 給線程取自定義名稱 發現問題更好排查 */ private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); public TransactionProducer(@Autowired Jms jms, @Autowired ProduceOrderService produceOrderService) { transactionListener = new TransactionListenerImpl(produceOrderService); // 初始化 事務生產者 producer = new TransactionMQProducer(jms.getOrderTopic()); // 添加服務器地址 producer.setNamesrvAddr(jms.getNameServer()); // 添加事務監聽器 producer.setTransactionListener(transactionListener); // 添加自定義線程池 producer.setExecutorService(executorService); start(); } public TransactionMQProducer getProducer() { return this.producer; } /** * 對象在使用以前必需要調用一次,只能初始化一次 */ public void start() { try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 通常在應用上下文,使用上下文監聽器,進行關閉 */ public void shutdown() { this.producer.shutdown(); } } /** * @author xub * @Description: 自定義事務監聽器 * @date 2019/7/15 下午12:20 */ @Slf4j class TransactionListenerImpl implements TransactionListener { @Autowired private ProduceOrderService produceOrderService ; public TransactionListenerImpl( ProduceOrderService produceOrderService) { this.produceOrderService = produceOrderService; } @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { log.info("=========本地事務開始執行============="); String message = new String(msg.getBody()); JSONObject jsonObject = JSONObject.parseObject(message); Integer productId = jsonObject.getInteger("productId"); Integer total = jsonObject.getInteger("total"); int userId = Integer.parseInt(arg.toString()); //模擬執行本地事務begin======= /** * 本地事務執行會有三種可能 * 一、commit 成功 * 二、Rollback 失敗 * 三、網絡等緣由服務宕機收不到返回結果 */ log.info("本地事務執行參數,用戶id={},商品ID={},銷售庫存={}",userId,productId,total); int result = produceOrderService.save(userId, productId, total); //模擬執行本地事務end======== //TODO 實際開發下面不須要咱們手動返回,而是根據本地事務執行結果自動返回 //一、二次確認消息,而後消費者能夠消費 if (result == 0) { return LocalTransactionState.COMMIT_MESSAGE; } //二、回滾消息,Broker端會刪除半消息 if (result == 1) { return LocalTransactionState.ROLLBACK_MESSAGE; } //三、Broker端會進行回查消息 if (result == 2) { return LocalTransactionState.UNKNOW; } return LocalTransactionState.COMMIT_MESSAGE; } /** * 只有上面接口返回 LocalTransactionState.UNKNOW 纔會調用查接口被調用 * * @param msg 消息 * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { log.info("==========回查接口========="); String key = msg.getKeys(); //TODO 一、必須根據key先去檢查本地事務消息是否完成。 /** * 由於有種狀況就是:上面本地事務執行成功了,可是return LocalTransactionState.COMMIT_MESSAG的時候 * 服務掛了,那麼最終 Brock還未收到消息的二次肯定,仍是個半消息 ,因此當從新啓動的時候仍是回調這個回調接口。 * 若是不先查詢上面本地事務的執行狀況 直接在執行本地事務,那麼就至關於成功執行了兩次本地事務了。 */ // TODO 二、這裏返回要麼commit 要麼rollback。沒有必要在返回 UNKNOW return LocalTransactionState.COMMIT_MESSAGE; } }
這裏展現下,消費端消費消息核心代碼。消費端和普通消費同樣。
@Slf4j @Component public class OrderConsumer { private DefaultMQPushConsumer consumer; private String consumerGroup = "produce_consumer_group"; public OrderConsumer(@Autowired Jms jms,@Autowired ProduceService produceService) throws MQClientException { //設置消費組 consumer = new DefaultMQPushConsumer(consumerGroup); // 添加服務器地址 consumer.setNamesrvAddr(jms.getNameServer()); // 添加訂閱號 consumer.subscribe(jms.getOrderTopic(), "*"); // 監聽消息 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { MessageExt msg = msgs.get(0); String message = new String(msgs.get(0).getBody()); JSONObject jsonObject = JSONObject.parseObject(message); Integer productId = jsonObject.getInteger("productId"); Integer total = jsonObject.getInteger("total"); String key = msg.getKeys(); log.info("消費端消費消息,商品ID={},銷售數量={}",productId,total); try { produceService.updateStore(productId, total, key); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.info("消費失敗,進行重試,重試到必定次數 那麼將該條記錄記錄到數據庫中,進行若是處理"); e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); consumer.start(); System.out.println("consumer start ..."); } }
至於完整的項目地址見GitHub。
若是對您能有幫助,就給個星星吧,哈哈!
GitHubd地址
https://github.com/yudiandemingzi/spring-cloud-rocketmq-transaction.git
晚安!
只要本身變優秀了,其餘的事情纔會跟着好起來(上將8)