RocketMQ事務性消息及持久化

TransactionProducer(事務消息):

  在分佈式系統中,咱們時常會遇到分佈式事務的問題,除了常規的解決方案以外,咱們還能夠利用RocketMQ的事務性消息來解決分佈式事務的問題。RocketMQ和其餘消息中間件最大的一個區別是支持了事務消息,這也是分佈式事務裏面的基於消息的最終一致性方案。html

RocketMQ消息的事務架構設計:

  1. 生產者執行本地事務,修改訂單支付狀態,而且提交事務
  2. 生產者發送事務消息到broker上,消息發送到broker上在沒有確認以前,消息對於consumer是不可見狀態
  3. 生產者確認事務消息,使得發送到broker上的事務消息對於消費者可見
  4. 消費者獲取到消息進行消費,消費完以後執行ack進行確認

  這裏可能會存在一個問題,生產者本地事務成功後,發送事務確認消息到broker上失敗了怎麼辦?這個時候意味着消費者沒法正常消費到這個消息。因此RocketMQ提供了消息回查機制,若是事務消息一直處於中間狀態,broker會發起重試去查詢broker上這個事務的處理狀態。一旦發現事務處理成功,則把當前這條消息設置爲可見。mysql

RocketMQ事務消息的實踐:

  生產者producer:sql

public class TransactionProducer {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, InterruptedException {
        TransactionMQProducer transactionMQProducer=new
                TransactionMQProducer("tx_producer");
        transactionMQProducer.setNamesrvAddr("192.168.1.101:9876;192.168.1.102:9876");
        ExecutorService executorService= Executors.newFixedThreadPool(10);
        transactionMQProducer.setExecutorService(executorService);
        transactionMQProducer.setTransactionListener(new TransactionListenerLocal()); //本地事務的監聽

        transactionMQProducer.start();

        for(int i=0;i<10;i++){
            String orderId= UUID.randomUUID().toString();
            String body="{'operation':'doOrder','orderId':'"+orderId+"'}";
            Message message=new Message("testTopic2",
                    null,orderId,body.getBytes(RemotingHelper.DEFAULT_CHARSET));
            transactionMQProducer.sendMessageInTransaction(message,orderId);
            Thread.sleep(1000);
        }
    }
}

  TransactionListenerLocal:數據庫

public class TransactionListenerLocal implements TransactionListener {

    private Map<String,Boolean> results=new ConcurrentHashMap<>();

    //執行本地事務
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        System.out.println("開始執行本地事務:"+o.toString()); //o
        String orderId=o.toString();
        //模擬數據庫保存(成功/失敗)
        boolean result=Math.abs(Objects.hash(orderId))%2==0;
        if(!result) {
            results.put(orderId, result); //
        }
        return result? LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW;
    }
    //提供給事務執行狀態檢查的回調方法,給broker用的(異步回調)
    //若是回查失敗,消息就丟棄
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        String orderId=messageExt.getKeys();
        System.out.println("執行事務回調檢查: orderId:"+orderId);
        if(results.size()==0){
           return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

  消費端 consumer:架構

public class TransactionConsumer {

    //rocketMQ 除了在同一個組和不一樣組之間的消費者的特性和kafka相同以外
    //RocketMQ能夠支持廣播消息,就意味着,同一個group的每一個消費者均可以消費同一個消息
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer defaultMQPushConsumer=
                new DefaultMQPushConsumer("tx_consumer");
        defaultMQPushConsumer.setNamesrvAddr("192.168.1.101:9876;192.168.1.102:9876");
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //subExpression 能夠支持sql的表達式. or  and  a=? ,,,
        defaultMQPushConsumer.subscribe("testTopic2","*");
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.stream().forEach(message->{
                    System.out.println("開始業務處理邏輯:消息體:"+new String(message.getBody())+"->key:"+message.getKeys());
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //簽收
            }
        });
        defaultMQPushConsumer.start();
    }

}

RocketMQ事務消息的三種狀態:dom

  1. ROLLBACK_MESSAGE:回滾事務
  2. COMMIT_MESSAGE: 提交事務
  3. UNKNOW: broker會定時的回查Producer消息狀態,直到完全成功或失敗。

  當executeLocalTransaction方法返回ROLLBACK_MESSAGE時,表示直接回滾事務,當返回COMMIT_MESSAGE提交事務當返回UNKNOW時,Broker會在一段時間以後回查checkLocalTransaction,根據checkLocalTransaction返回狀態執行事務的操做(回滾或提交),如示例中,當返回ROLLBACK_MESSAGE時消費者不會收到消息,且不會調用回查函數,當返回COMMIT_MESSAGE時事務提交,消費者收到消息,當返回UNKNOW時,在一段時間以後調用回查函數,並根據status判斷返回提交或回滾狀態,返回提交狀態的消息將會被消費者消費,因此此時消費者能夠消費部分消息異步

消息的存儲和發送:

  因爲分佈式消息隊列對於可靠性的要求比較高,因此須要保證生產者將消息發送到broker以後,保證消息是不出現丟失的,所以消息隊列就少不了對於可靠性存儲的要求分佈式

  從主流的幾種MQ消息隊列採用的存儲方式來看,主要會有三種ide

  1. 分佈式KV存儲,好比ActiveMQ中採用的levelDB、Redis, 這種存儲方式對於消息讀寫能力要求不高的狀況下可使用
  2. 文件系統存儲,常見的好比kafka、RocketMQ、RabbitMQ都是採用消息刷盤到所部署的機器上的文件系統來作持久化,這種方案適合對於有高吞吐量要求的消息中間件,由於消息刷盤是一種高效率,高可靠、高性能的持久化方式,除非磁盤出現故障,不然通常是不會出現沒法持久化的問題
  3. 關係型數據庫,好比ActiveMQ能夠採用mysql做爲消息存儲,關係型數據庫在單表數據量達到千萬級的狀況下IO性能會出現瓶頸,因此ActiveMQ並不適合於高吞吐量的消息隊列場景。總的來講,對於存儲效率,文件系統要優於分佈式KV存儲,分佈式KV存儲要優於關係型數據庫

消息的存儲結構:函數

  RocketMQ就是採用文件系統的方式來存儲消息,消息的存儲是由ConsumeQueue和CommitLog配合完成的。CommitLog是消息真正的物理存儲文件。ConsumeQueue是消息的邏輯隊列,有點相似於數據庫的索引文件,裏面存儲的是指向CommitLog文件中消息存儲的地址。每一個Topic下的每一個Message Queue都會對應一個ConsumeQueue文件,文件的地址是:${store_home}/consumequeue/${topicNmae}/${queueId}/${filename}, 默認路徑: /root/store在rocketMQ的文件存儲目錄下,能夠看到這樣一個結構的的而文件。

CommitLog:

  CommitLog是用來存放消息的物理文件,每一個broker上的commitLog本當前機器上的全部consumerQueue共享,不作任何的區分。CommitLog中的文件默認大小爲1G,能夠動態配置; 當一個文件寫滿之後,會生成一個新的commitlog文件。全部的Topic數據是順序寫入在CommitLog文件中的。文件名的長度爲20位,左邊補0,剩餘未起始偏移量,好比00000000000000000000 表示第一個文件, 文件大小爲102410241024,當第一個文件寫滿以後,生成第二個文件000000000001073741824 表示第二個文件,起始偏移量爲1073741824。

ConsumeQueue:

  consumeQueue表示消息消費的邏輯隊列,這裏麪包含MessageQueue在commitlog中的其實物理位置偏移量offset,消息實體內容的大小和Message Tag的hash值。對於實際物理存儲來講,consumeQueue對應每一個topic和queueid下的文件,每一個consumeQueue類型的文件也是有大小,每一個文件默認大小約爲600W個字節,若是文件滿了後會也會生成一個新的文件。

IndexFile:

  索引文件,若是一個消息包含Key值的話,會使用IndexFile存儲消息索引。Index索引文件提供了對CommitLog進行數據檢索,提供了一種經過key或者時間區間來查找CommitLog中的消息的方法。在物理存儲中,文件名是以建立的時間戳明明,固定的單個IndexFile大小大概爲400M,一個IndexFile能夠保存2000W個索引。

abort:

  broker在啓動的時候會建立一個空的名爲abort的文件,並在shutdown時將其刪除,用於標識進程是否正常退出,若是不正常退出,會在啓動時作故障恢復。

Config:

  能夠看到這個裏面保存了 消費端consumer的偏移量:

  以及topic的一些配置信息:

消息存儲的總體結構:

  RocketMQ的消息存儲採用的是混合型的存儲結構,也就是Broker單個實例下的全部隊列公用一個日誌數據文件CommitLog。這個是和Kafka又一個不一樣之處。爲何不採用kafka的設計,針對不一樣的partition存儲一個獨立的物理文件呢?這是由於在kafka的設計中,一旦kafka中Topic的Partition數量過多,隊列文件會過多,那麼會給磁盤的IO讀寫形成比較大的壓力,也就形成了性能瓶頸。因此RocketMQ進行了優化,消息主題統一存儲在CommitLog中。固然它也有它的優缺點

  • 優勢在於:因爲消息主題都是經過CommitLog來進行讀寫,ConsumerQueue中只存儲不多的數據,因此隊列更加輕量化。對於磁盤的訪問是串行化從而避免了磁盤的競爭
  • 缺點在於:消息寫入磁盤雖然是基於順序寫,可是讀的過程確是隨機的。讀取一條消息會先讀取ConsumeQueue,再讀CommitLog,會下降消息讀的效率。

消息發送到消息接收的總體流程:

1. Producer將消息發送到Broker後,Broker會採用同步或者異步的方式把消息寫入到CommitLog。RocketMQ全部的消息都會存放在CommitLog中,爲了保證消息存儲不發生混亂,對CommitLog寫以前會加鎖,同時也可使得消息可以被順序寫入到CommitLog,只要消息被持久化到磁盤文件CommitLog,那麼就能夠保證Producer發送的消息不會丟失。

2. commitLog持久化後,會把裏面的消息Dispatch到對應的Consume Queue上,Consume Queue至關於kafka中的partition,是一個邏輯隊列,存儲了這個Queue在CommiLog中的起始offset,log大小和MessageTag的hashCode。

3. 當消費者進行消息消費時,會先讀取consumerQueue , 邏輯消費隊列ConsumeQueue保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量Offset,消息大小、和消息Tag的HashCode值

4. 直接從consumequeue中讀取消息是沒有數據的,真正的消息主體在commitlog中,因此還須要從commitlog中讀取消息

何時清理物理消息文件?那消息文件到底刪不刪,何時刪?

  消息存儲在CommitLog以後,的確是會被清理的,可是這個清理只會在如下任一條件成立纔會批量刪除消息文件(CommitLog):

  • 消息文件過時(默認48小時),且到達清理時點(默認是凌晨4點),刪除過時文件。
  • 消息文件過時(默認48小時),且磁盤空間達到了水位線(默認75%),刪除過時文件。
  • 磁盤已經達到必須釋放的上限(85%水位線)的時候,則開始批量清理文件(不管是否過時),直到空間充足。

  注:若磁盤空間達到危險水位線(默認90%),出於保護自身的目的,broker會拒絕寫入服務。

相關文章
相關標籤/搜索