今天的博客有點多,由於前幾天一直用筆記錄,今天都補上了。後面的博客先停一段時間,後面還有dubbo、storm、kafka、solor、nginx、keepalived、fastdfs等內容,只是由於最近準備跳槽,停更一段時間,等到新公司後再繼續更新。java
場景1:支付寶轉1w到餘額寶,支付寶扣了1w,服務掛了怎麼辦?餘額尚未加上nginx
場景2:訂單系統和庫存系統如何保持一致服務器
若是是本地的話很好解決網絡
那若是是跨系統的呢?該如何解決?併發
有一種思路是這樣的:分佈式
可是這個有個問題,那就是性能很受影響,主要卡在事務協調器這裏。ide
RocketMQ的實現方式以下(圖片來自網絡):性能
支付寶先生成 扣款信息 --> 消息隊列 --> 餘額寶消費消息atom
發送消息:spa
1 import com.alibaba.rocketmq.client.exception.MQClientException; 2 import com.alibaba.rocketmq.client.producer.SendResult; 3 import com.alibaba.rocketmq.client.producer.TransactionCheckListener; 4 import com.alibaba.rocketmq.client.producer.TransactionMQProducer; 5 import com.alibaba.rocketmq.common.message.Message; 6 7 8 /** 9 * 發送事務消息例子 10 * 11 */ 12 public class TransactionProducer { 13 public static void main(String[] args) throws MQClientException, InterruptedException { 14 15 TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); 16 TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); 17 // 事務回查最小併發數 18 producer.setCheckThreadPoolMinSize(2); 19 // 事務回查最大併發數 20 producer.setCheckThreadPoolMaxSize(2); 21 // 隊列數 22 producer.setCheckRequestHoldMax(2000); 23 producer.setTransactionCheckListener(transactionCheckListener); 24 producer.start(); 25 26 String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" }; 27 TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); 28 for (int i = 0; i < 100; i++) { 29 try { 30 Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, 31 ("Hello RocketMQ " + i).getBytes()); 32 SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); 33 System.out.println(sendResult); 34 35 Thread.sleep(10); 36 } 37 catch (MQClientException e) { 38 e.printStackTrace(); 39 } 40 } 41 42 for (int i = 0; i < 100000; i++) { 43 Thread.sleep(1000); 44 } 45 46 producer.shutdown(); 47 48 } 49 }
執行本地事務
1 import java.util.concurrent.atomic.AtomicInteger; 2 3 import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; 4 import com.alibaba.rocketmq.client.producer.LocalTransactionState; 5 import com.alibaba.rocketmq.common.message.Message; 6 7 8 /** 9 * 執行本地事務 10 */ 11 public class TransactionExecuterImpl implements LocalTransactionExecuter { 12 private AtomicInteger transactionIndex = new AtomicInteger(1); 13 14 15 @Override 16 public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { 17 int value = transactionIndex.getAndIncrement(); 18 19 if (value == 0) { 20 throw new RuntimeException("Could not find db"); 21 } 22 else if ((value % 5) == 0) { 23 return LocalTransactionState.ROLLBACK_MESSAGE; 24 } 25 else if ((value % 4) == 0) { 26 return LocalTransactionState.COMMIT_MESSAGE; 27 } 28 29 return LocalTransactionState.UNKNOW; 30 } 31 }
服務器回查客戶端(這個功能在開源版本中已經被咔掉了,可是咱們仍是要寫,否則報錯)
1 import java.util.concurrent.atomic.AtomicInteger; 2 3 import com.alibaba.rocketmq.client.producer.LocalTransactionState; 4 import com.alibaba.rocketmq.client.producer.TransactionCheckListener; 5 import com.alibaba.rocketmq.common.message.MessageExt; 6 7 8 /** 9 * 未決事務,服務器回查客戶端 10 */ 11 public class TransactionCheckListenerImpl implements TransactionCheckListener { 12 private AtomicInteger transactionIndex = new AtomicInteger(0); 13 14 15 @Override 16 public LocalTransactionState checkLocalTransactionState(MessageExt msg) { 17 System.out.println("server checking TrMsg " + msg.toString()); 18 19 int value = transactionIndex.getAndIncrement(); 20 if ((value % 6) == 0) { 21 throw new RuntimeException("Could not find db"); 22 } 23 else if ((value % 5) == 0) { 24 return LocalTransactionState.ROLLBACK_MESSAGE; 25 } 26 else if ((value % 4) == 0) { 27 return LocalTransactionState.COMMIT_MESSAGE; 28 } 29 30 return LocalTransactionState.UNKNOW; 31 } 32 }
到這就完了,爲何只介紹發送不介紹接收呢?由於一旦消息提交到MQ就不用管了, 要相信MQ會把消息送達consumer,若是消息未能被成功消費的話,那麼Producer也會回滾
如何保證分佈式系統的全局性事務?
由於阿里在3.2.6版本後,砍掉了消息回查的功能,也就是consumer端是否成功消費,Producer端並不知道,因此若是要保證全局性事務,咱們要有本身的實現機制: