在分佈式系統中,咱們時常會遇到分佈式事務的問題,除了常規的解決方案以外,咱們還能夠利用RocketMQ的事務性消息來解決分佈式事務的問題。RocketMQ和其餘消息中間件最大的一個區別是支持了事務消息,這也是分佈式事務裏面的基於消息的最終一致性方案。html
這裏可能會存在一個問題,生產者本地事務成功後,發送事務確認消息到broker上失敗了怎麼辦?這個時候意味着消費者沒法正常消費到這個消息。因此RocketMQ提供了消息回查機制,若是事務消息一直處於中間狀態,broker會發起重試去查詢broker上這個事務的處理狀態。一旦發現事務處理成功,則把當前這條消息設置爲可見。mysql
生產者producer:sql
public class TransactionProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, InterruptedException { TransactionMQProducer transactionMQProducer=new TransactionMQProducer("tx_producer"); transactionMQProducer.setNamesrvAddr("192.168.1.101:9876;192.168.1.102:9876"); ExecutorService executorService= Executors.newFixedThreadPool(10); transactionMQProducer.setExecutorService(executorService); transactionMQProducer.setTransactionListener(new TransactionListenerLocal()); //本地事務的監聽 transactionMQProducer.start(); for(int i=0;i<10;i++){ String orderId= UUID.randomUUID().toString(); String body="{'operation':'doOrder','orderId':'"+orderId+"'}"; Message message=new Message("testTopic2", null,orderId,body.getBytes(RemotingHelper.DEFAULT_CHARSET)); transactionMQProducer.sendMessageInTransaction(message,orderId); Thread.sleep(1000); } } }
TransactionListenerLocal:數據庫
public class TransactionListenerLocal implements TransactionListener { private Map<String,Boolean> results=new ConcurrentHashMap<>(); //執行本地事務 @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("開始執行本地事務:"+o.toString()); //o String orderId=o.toString(); //模擬數據庫保存(成功/失敗) boolean result=Math.abs(Objects.hash(orderId))%2==0; if(!result) { results.put(orderId, result); // } return result? LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW; } //提供給事務執行狀態檢查的回調方法,給broker用的(異步回調) //若是回查失敗,消息就丟棄 @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { String orderId=messageExt.getKeys(); System.out.println("執行事務回調檢查: orderId:"+orderId); if(results.size()==0){ return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; } }
消費端 consumer:架構
public class TransactionConsumer { //rocketMQ 除了在同一個組和不一樣組之間的消費者的特性和kafka相同以外 //RocketMQ能夠支持廣播消息,就意味着,同一個group的每一個消費者均可以消費同一個消息 public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer defaultMQPushConsumer= new DefaultMQPushConsumer("tx_consumer"); defaultMQPushConsumer.setNamesrvAddr("192.168.1.101:9876;192.168.1.102:9876"); defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //subExpression 能夠支持sql的表達式. or and a=? ,,, defaultMQPushConsumer.subscribe("testTopic2","*"); defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { list.stream().forEach(message->{ System.out.println("開始業務處理邏輯:消息體:"+new String(message.getBody())+"->key:"+message.getKeys()); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //簽收 } }); defaultMQPushConsumer.start(); } }
RocketMQ事務消息的三種狀態:dom
當executeLocalTransaction方法返回ROLLBACK_MESSAGE時,表示直接回滾事務,當返回COMMIT_MESSAGE提交事務當返回UNKNOW時,Broker會在一段時間以後回查checkLocalTransaction,根據checkLocalTransaction返回狀態執行事務的操做(回滾或提交),如示例中,當返回ROLLBACK_MESSAGE時消費者不會收到消息,且不會調用回查函數,當返回COMMIT_MESSAGE時事務提交,消費者收到消息,當返回UNKNOW時,在一段時間以後調用回查函數,並根據status判斷返回提交或回滾狀態,返回提交狀態的消息將會被消費者消費,因此此時消費者能夠消費部分消息異步
因爲分佈式消息隊列對於可靠性的要求比較高,因此須要保證生產者將消息發送到broker以後,保證消息是不出現丟失的,所以消息隊列就少不了對於可靠性存儲的要求分佈式
從主流的幾種MQ消息隊列採用的存儲方式來看,主要會有三種ide
消息的存儲結構:函數
RocketMQ就是採用文件系統的方式來存儲消息,消息的存儲是由ConsumeQueue和CommitLog配合完成的。CommitLog是消息真正的物理存儲文件。ConsumeQueue是消息的邏輯隊列,有點相似於數據庫的索引文件,裏面存儲的是指向CommitLog文件中消息存儲的地址。每一個Topic下的每一個Message Queue都會對應一個ConsumeQueue文件,文件的地址是:${store_home}/consumequeue/${topicNmae}/${queueId}/${filename}, 默認路徑: /root/store在rocketMQ的文件存儲目錄下,能夠看到這樣一個結構的的而文件。
CommitLog:
CommitLog是用來存放消息的物理文件,每一個broker上的commitLog本當前機器上的全部consumerQueue共享,不作任何的區分。CommitLog中的文件默認大小爲1G,能夠動態配置; 當一個文件寫滿之後,會生成一個新的commitlog文件。全部的Topic數據是順序寫入在CommitLog文件中的。文件名的長度爲20位,左邊補0,剩餘未起始偏移量,好比00000000000000000000 表示第一個文件, 文件大小爲102410241024,當第一個文件寫滿以後,生成第二個文件000000000001073741824 表示第二個文件,起始偏移量爲1073741824。
ConsumeQueue:
consumeQueue表示消息消費的邏輯隊列,這裏麪包含MessageQueue在commitlog中的其實物理位置偏移量offset,消息實體內容的大小和Message Tag的hash值。對於實際物理存儲來講,consumeQueue對應每一個topic和queueid下的文件,每一個consumeQueue類型的文件也是有大小,每一個文件默認大小約爲600W個字節,若是文件滿了後會也會生成一個新的文件。
IndexFile:
索引文件,若是一個消息包含Key值的話,會使用IndexFile存儲消息索引。Index索引文件提供了對CommitLog進行數據檢索,提供了一種經過key或者時間區間來查找CommitLog中的消息的方法。在物理存儲中,文件名是以建立的時間戳明明,固定的單個IndexFile大小大概爲400M,一個IndexFile能夠保存2000W個索引。
abort:
broker在啓動的時候會建立一個空的名爲abort的文件,並在shutdown時將其刪除,用於標識進程是否正常退出,若是不正常退出,會在啓動時作故障恢復。
Config:
能夠看到這個裏面保存了 消費端consumer的偏移量:
以及topic的一些配置信息:
RocketMQ的消息存儲採用的是混合型的存儲結構,也就是Broker單個實例下的全部隊列公用一個日誌數據文件CommitLog。這個是和Kafka又一個不一樣之處。爲何不採用kafka的設計,針對不一樣的partition存儲一個獨立的物理文件呢?這是由於在kafka的設計中,一旦kafka中Topic的Partition數量過多,隊列文件會過多,那麼會給磁盤的IO讀寫形成比較大的壓力,也就形成了性能瓶頸。因此RocketMQ進行了優化,消息主題統一存儲在CommitLog中。固然它也有它的優缺點
1. Producer將消息發送到Broker後,Broker會採用同步或者異步的方式把消息寫入到CommitLog。RocketMQ全部的消息都會存放在CommitLog中,爲了保證消息存儲不發生混亂,對CommitLog寫以前會加鎖,同時也可使得消息可以被順序寫入到CommitLog,只要消息被持久化到磁盤文件CommitLog,那麼就能夠保證Producer發送的消息不會丟失。
2. commitLog持久化後,會把裏面的消息Dispatch到對應的Consume Queue上,Consume Queue至關於kafka中的partition,是一個邏輯隊列,存儲了這個Queue在CommiLog中的起始offset,log大小和MessageTag的hashCode。
3. 當消費者進行消息消費時,會先讀取consumerQueue , 邏輯消費隊列ConsumeQueue保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量Offset,消息大小、和消息Tag的HashCode值
4. 直接從consumequeue中讀取消息是沒有數據的,真正的消息主體在commitlog中,因此還須要從commitlog中讀取消息
何時清理物理消息文件?那消息文件到底刪不刪,何時刪?
消息存儲在CommitLog以後,的確是會被清理的,可是這個清理只會在如下任一條件成立纔會批量刪除消息文件(CommitLog):
注:若磁盤空間達到危險水位線(默認90%),出於保護自身的目的,broker會拒絕寫入服務。