RocketMQ(三)如何解決消息的順序&重複兩大硬傷?

簡述

分佈式消息系統做爲實現分佈式系統可擴展、可伸縮性的關鍵組件,須要具備高吞吐量、高可用等特色。而談到消息系統的設計,就回避不了兩個問題:算法

  • 消息的順序問題
  • 消息的重複問題

RocketMQ做爲阿里開源的一款高性能、高吞吐量的消息中間件,它是怎樣來解決這兩個問題的?RocketMQ 有哪些關鍵特性?其實現原理是怎樣的?數據庫

順序消息

消息有序指的是一類消息消費時,能按照發送的順序來消費。例如:一個訂單產生了 3 條消息,分別是訂單建立、訂單付款、訂單完成。消費時,要按照這個順序消費纔有意義。但同時訂單之間又是能夠並行消費的。緩存

假如生產者產生了2條消息:M一、M2,要保證這兩條消息的順序,應該怎樣作?你腦中想到的多是這樣: 輸入圖片說明 M1發送到S1後,M2發送到S2,若是要保證M1先於M2被消費,那麼須要M1到達消費端後,通知S2,而後S2再將M2發送到消費端。安全

這個模型存在的問題是,若是M1和M2分別發送到兩臺Server上,就不能保證M1先達到,也就不能保證M1被先消費,那麼就須要在MQ Server集羣維護消息的順序。那麼如何解決?一種簡單的方式就是將M一、M2發送到同一個Server上: 輸入圖片說明服務器

只要將消息從一臺服務器發往另外一臺服務器,就會存在網絡延遲問題。如上圖所示,若是發送M1耗時大於發送M2的耗時,那麼M2就先被消費,仍然不能保證消息的順序。即便M1和M2同時到達消費端,因爲不清楚消費端1和消費端2的負載狀況,仍然有可能出現M2先於M1被消費。如何解決這個問題?將M1和M2發往同一個消費者便可,且發送M1後,須要消費端響應成功後才能發送M2。網絡

但又會引入另一個問題,若是發送M1後,消費端1沒有響應,那是繼續發送M2呢,仍是從新發送M1?通常爲了保證消息必定被消費,確定會選擇重發M1到另一個消費端2,就以下圖所示。app

這樣的模型就嚴格保證消息的順序,細心的你仍然會發現問題,消費端1沒有響應Server時有兩種狀況,一種是M1確實沒有到達,另一種狀況是消費端1已經響應,可是Server端沒有收到。若是是第二種狀況,重發M1,就會形成M1被重複消費。也就是咱們後面要說的第二個問題,消息重複問題。負載均衡

回過頭來看消息順序問題,嚴格的順序消息很是容易理解,並且處理問題也比較容易,要實現嚴格的順序消息,簡單且可行的辦法就是:異步

  • 保證生產者 - MQServer - 消費者是一對一對一的關係

可是這樣設計,並行度就成爲了消息系統的瓶頸(吞吐量不夠),也會致使更多的異常處理,好比:只要消費端出現問題,就會致使整個處理流程阻塞,咱們不得不花費更多的精力來解決阻塞的問題。分佈式

但咱們的最終目標是要集羣的高容錯性和高吞吐量。這彷佛是一對不可調和的矛盾,那麼阿里是如何解決的?

  • 世界上解決一個計算機問題最簡單的方法:「剛好」不須要解決它!—— 沈詢

有些問題,看起來很重要,但實際上咱們能夠經過合理的設計或者將問題分解來規避。若是硬要把時間花在解決它們身上,其實是浪費的,效率低下的。從這個角度來看消息的順序問題,咱們能夠得出兩個結論:

    1. 不關注亂序的應用實際大量存在
    1. 隊列無序並不意味着消息無序

通常消息是經過輪詢全部隊列來發送的(負載均衡策略),順序消息能夠根據業務,好比說訂單號相同的消息發送到同一個隊列。下面的示例中,OrderId相同的消息,會發送到同一個隊列:

// RocketMQ默認提供了兩種MessageQueueSelector實現:隨機/Hash
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

在獲取到路由信息之後,會根據MessageQueueSelector實現的算法來選擇一個隊列,同一個OrderId獲取到的隊列是同一個隊列。

private SendResult send()  {
    // 獲取topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        // 根據咱們的算法,選擇一個發送隊列
        // 這裏的arg = orderId
        mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
        if (mq != null) {
            return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
        }
    }
}

2、消息重複

上面在解決消息順序問題時,引入了一個新的問題,就是消息重複。那麼RocketMQ是怎樣解決消息重複的問題呢?仍是「剛好」不解決。 形成消息的重複的根本緣由是:網絡不可達。只要經過網絡交換數據,就沒法避免這個問題。因此解決這個問題的辦法就是不解決,轉而繞過這個問題。那麼問題就變成了:若是消費端收到兩條同樣的消息,應該怎樣處理?

    1. 消費端處理消息的業務邏輯保持冪等性
    1. 保證每條消息都有惟一編號且保證消息處理成功與去重表的日誌同時出現 第1條很好理解,只要保持冪等性,無論來多少條重複消息,最後處理的結果都同樣。第2條原理就是利用一張日誌表來記錄已經處理成功的消息的ID,若是新到的消息ID已經在日誌表中,那麼就再也不處理這條消息。

咱們能夠看到第1條的解決方式,很明顯應該在消費端實現,不屬於消息系統要實現的功能。第2條能夠消息系統實現,也能夠業務端實現。正常狀況下出現重複消息的機率不必定大,且由消息系統實現的話,確定會對消息系統的吞吐量和高可用有影響,因此最好仍是由業務端本身處理消息重複的問題,這也是RocketMQ不解決消息重複的問題的緣由。

RocketMQ不保證消息不重複,若是你的業務須要保證嚴格的不重複消息,須要你本身在業務端去重。

3、事務消息

RocketMQ除了支持普通消息,順序消息,另外還支持事務消息。首先討論一下什麼是事務消息以及支持事務消息的必要性。咱們以一個轉賬的場景爲例來講明這個問題:Bob向Smith轉帳100塊。

在單機環境下,執行事務的狀況,大概是下面這個樣子: 輸入圖片說明 當用戶增加到必定程度,Bob和Smith的帳戶及餘額信息已經不在同一臺服務器上了,那麼上面的流程就變成了這樣: 這時候你會發現,一樣是一個轉帳的業務,在集羣環境下,耗時竟然成倍的增加,這顯然是不可以接受的。那咱們如何來規避這個問題?

  • 大事務 = 小事務 + 異步

將大事務拆分紅多個小事務異步執行。這樣基本上可以將跨機事務的執行效率優化到與單機一致。轉帳的事務就能夠分解成以下兩個小事務:

輸入圖片說明 圖中執行本地事務(Bob帳戶扣款)和發送異步消息應該保持同時成功或者失敗中,也就是扣款成功了,發送消息必定要成功,若是扣款失敗了,就不能再發送消息。那問題是:咱們是先扣款仍是先發送消息呢? 首先咱們看下,先發送消息,大體的示意圖以下: 輸入圖片說明 存在的問題是:若是消息發送成功,可是扣款失敗,消費端就會消費此消息,進而向Smith帳戶加錢。 先發消息不行,那咱們就先扣款唄,大體的示意圖以下: 輸入圖片說明

存在的問題跟上面相似:若是扣款成功,發送消息失敗,就會出現Bob扣錢了,可是Smith帳戶未加錢。

可能你們會有不少的方法來解決這個問題,好比:直接將發消息放到Bob扣款的事務中去,若是發送失敗,拋出異常,事務回滾。這樣的處理方式也符合「剛好」不須要解決的原則。RocketMQ支持事務消息,下面咱們來看看RocketMQ是怎樣來實現的。

輸入圖片說明

RocketMQ第一階段發送Prepared消息時,會拿到消息的地址,第二階段執行本地事物,第三階段經過第一階段拿到的地址去訪問消息,並修改狀態。細心的你可能又發現問題了,若是確認消息發送失敗了怎麼辦?RocketMQ會按期掃描消息集羣中的事物消息,這時候發現了Prepared消息,它會向消息發送者確認,Bob的錢究竟是減了仍是沒減呢?若是減了是回滾仍是繼續發送確認消息呢?RocketMQ會根據發送端設置的策略來決定是回滾仍是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。

那咱們來看下RocketMQ源碼,是否是這樣來處理事務消息的。客戶端發送事務消息的部分(完整代碼請查看:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer):

// 未決事務,MQ服務器回查客戶端
// 也就是上文所說的,當RocketMQ發現`Prepared消息`時,會根據這個Listener實現的策略來決斷事務
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 構造事務消息的生產者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 設置事務決斷處理類
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事務的處理邏輯,至關於示例中檢查Bob帳戶並扣錢的邏輯
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 構造MSG,省略構造參數
Message msg = new Message(......);
// 發送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();

接着查看sendMessageInTransaction方法的源碼,總共分爲3個階段:發送Prepared消息、執行本地事務、發送確認消息。

public TransactionSendResult sendMessageInTransaction(.....)  {
    // 邏輯代碼,非實際代碼
    // 1.發送消息
    sendResult = this.send(msg);
    // sendResult.getSendStatus() == SEND_OK
    // 2.若是消息發送成功,處理與消息關聯的本地事務單元
    LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
    // 3.結束事務
    this.endTransaction(sendResult, localTransactionState, localException);
}

endTransaction方法會將請求發往broker(mq server)去更新事物消息的最終狀態:

    1. 根據sendResult找到Prepared消息
    1. 根據localTransaction更新消息的最終狀態

若是endTransaction方法執行失敗,致使數據沒有發送到broker,broker會有回查線程定時(默認1分鐘)掃描每一個存儲事務狀態的表格文件,若是是已經提交或者回滾的消息直接跳過,若是是prepared狀態則會向Producer發起CheckTransaction請求,Producer會調用DefaultMQProducerImpl.checkTransactionState()方法來處理broker的定時回調請求,而checkTransactionState會調用咱們的事務設置的決斷方法,最後調用endTransactionOneway讓broker來更新消息的最終狀態。

再回到轉帳的例子,若是Bob的帳戶的餘額已經減小,且消息已經發送成功,Smith端開始消費這條消息,這個時候就會出現消費失敗和消費超時兩個問題?解決超時問題的思路就是一直重試,直到消費端消費消息成功,整個過程當中有可能會出現消息重複的問題,按照前面的思路解決便可。

輸入圖片說明

這樣基本上能夠解決超時問題,可是若是消費失敗怎麼辦?阿里提供給咱們的解決方法是:人工解決。你們能夠考慮一下,按照事務的流程,由於某種緣由Smith加款失敗,須要回滾整個流程。若是消息系統要實現這個回滾流程的話,系統複雜度將大大提高,且很容易出現Bug,估計出現Bug的機率會比消費失敗的機率大不少。咱們須要衡量是否值得花這麼大的代價來解決這樣一個出現機率很是小的問題,這也是你們在解決疑難問題時須要多多思考的地方。

4、Producer如何發送消息

Producer輪詢某topic下的全部隊列的方式來實現發送方的負載均衡,以下圖所示: 輸入圖片說明

首先分析一下RocketMQ的客戶端發送消息的源碼:

// 構造Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 初始化Producer,整個應用生命週期內,只須要初始化1次
producer.start();
// 構造Message
Message msg = new Message("TopicTest1",// topic
                        "TagA",// tag:給消息打標籤,用於區分一類消息,可爲null
                        "OrderID188",// key:自定義Key,能夠用於去重,可爲null
                        ("Hello MetaQ").getBytes());// body:消息內容
// 發送消息並返回結果
SendResult sendResult = producer.send(msg);
// 清理資源,關閉網絡鏈接,註銷本身
producer.shutdown();

在整個應用生命週期內,生產者須要調用一次start方法來初始化,初始化主要完成的任務有:

    1. 若是沒有指定namesrv地址,將會自動尋址
    1. 啓動定時任務:更新namesrv地址、從namsrv更新topic路由信息、清理已經掛掉的broker、向全部broker發送心跳…
    1. 啓動負載均衡的服務

初始化完成後,開始發送消息,發送消息的主要代碼以下:

private SendResult sendDefaultImpl(Message msg,......) {
    // 檢查Producer的狀態是不是RUNNING
    this.makeSureStateOK();
    // 檢查msg是否合法:是否爲null、topic,body是否爲空、body是否超長
    Validators.checkMessage(msg, this.defaultMQProducer);
    // 獲取topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    // 從路由信息中選擇一個消息隊列
    MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
    // 將消息發送到該隊列上去
    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}

代碼中須要關注的兩個方法tryToFindTopicPublishInfo和selectOneMessageQueue。前面說過在producer初始化時,會啓動定時任務獲取路由信息並更新到本地緩存,因此tryToFindTopicPublishInfo會首先從緩存中獲取topic路由信息,若是沒有獲取到,則會本身去namesrv獲取路由信息。selectOneMessageQueue方法經過輪詢的方式,返回一個隊列,以達到負載均衡的目的。

若是Producer發送消息失敗,會自動重試,重試的策略:

  • 重試次數 < retryTimesWhenSendFailed(可配置)
  • 總的耗時(包含重試n次的耗時) < sendMsgTimeout(發送消息時傳入的參數)
  • 同時知足上面兩個條件後,Producer會選擇另一個隊列發送消息

5、消息存儲

RocketMQ的消息存儲是由consume queue和commit log配合完成的。

一、Consume Queue

consume queue是消息的邏輯隊列,至關於字典的目錄,用來指定消息在物理文件commit log上的位置。

咱們能夠在配置中指定consumequeue與commitlog存儲的目錄 每一個topic下的每一個queue都有一個對應的consumequeue文件,好比:

${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

Consume Queue文件組織,如圖所示:

輸入圖片說明

-1. 根據topic和queueId來組織文件,圖中TopicA有兩個隊列0,1,那麼TopicA和QueueId=0組成一個ConsumeQueue,TopicA和QueueId=1組成另外一個ConsumeQueue。

  • 2.按照消費端的GroupName來分組重試隊列,若是消費端消費失敗,消息將被髮往重試隊列中,好比圖中的%RETRY%ConsumerGroupA。
  • 3.按照消費端的GroupName來分組死信隊列,若是消費端消費失敗,並重試指定次數後,仍然失敗,則發往死信隊列,好比圖中的%DLQ%ConsumerGroupA。

死信隊列(Dead Letter Queue)通常用於存放因爲某種緣由沒法傳遞的消息,好比處理失敗或者已通過期的消息。

Consume Queue中存儲單元是一個20字節定長的二進制數據,順序寫順序讀,以下圖所示:

輸入圖片說明

CommitLog Offset是指這條消息在Commit Log文件中的實際偏移量

Size存儲中消息的大小

Message Tag HashCode存儲消息的Tag的哈希值:主要用於訂閱時消息過濾(訂閱時若是指定了Tag,會根據HashCode來快速查找到訂閱的消息)

二、Commit Log

CommitLog:消息存放的物理文件,每臺broker上的commitlog被本機全部的queue共享,不作任何區分。 文件的默認位置以下,仍然可經過配置文件修改:

${user.home} \store${commitlog}${fileName}

CommitLog的消息存儲單元長度不固定,文件順序寫,隨機讀。消息的存儲結構以下表所示,按照編號順序以及編號對應的內容依次存儲。 輸入圖片說明

三、消息存儲實現

消息存儲實現,比較複雜,也值得你們深刻了解,後面會單獨成文來分析,這小節只以代碼說明一下具體的流程。

// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
synchronized (this) {
    long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
    // Here settings are stored timestamp, in order to ensure an orderly global
    msg.setStoreTimestamp(beginLockTimestamp);
    // MapedFile:操做物理文件在內存中的映射以及將內存數據持久化到物理文件中
    MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();
    // 將Message追加到文件commitlog
    result = mapedFile.appendMessage(msg, this.appendMessageCallback);
    switch (result.getStatus()) {
    case PUT_OK:break;
    case END_OF_FILE:
         // Create a new file, re-write the message
         mapedFile = this.mapedFileQueue.getLastMapedFile();
         result = mapedFile.appendMessage(msg, this.appendMessageCallback);
     break;
     DispatchRequest dispatchRequest = new DispatchRequest(
                topic,// 1
                queueId,// 2
                result.getWroteOffset(),// 3
                result.getWroteBytes(),// 4
                tagsCode,// 5
                msg.getStoreTimestamp(),// 6
                result.getLogicsOffset(),// 7
                msg.getKeys(),// 8
                /**
                 * Transaction
                 */
                msg.getSysFlag(),// 9
                msg.getPreparedTransactionOffset());// 10
    // 1.分發消息位置到ConsumeQueue
    // 2.分發到IndexService創建索引
    this.defaultMessageStore.putDispatchRequest(dispatchRequest);
}

四、消息的索引文件

若是一個消息包含key值的話,會使用IndexFile存儲消息索引,文件的內容結構如圖:

輸入圖片說明

輸入圖片說明

索引文件主要用於根據key來查詢消息的,流程主要是:

    1. 根據查詢的 key 的 hashcode%slotNum 獲得具體的槽的位置(slotNum 是一個索引文件裏面包含的最大槽的數目,例如圖中所示 slotNum=5000000)
    1. 根據 slotValue(slot 位置對應的值)查找到索引項列表的最後一項(倒序排列,slotValue 老是指向最新的一個索引項)
    1. 遍歷索引項列表返回查詢時間範圍內的結果集(默認一次最大返回的 32 條記錄)。

6、消息訂閱

RocketMQ消息訂閱有兩種模式,一種是Push模式,即MQServer主動向消費端推送;另一種是Pull模式,即消費端在須要時,主動到MQServer拉取。但在具體實現時,Push和Pull模式都是採用消費端主動拉取的方式。

首先看下消費端的負載均衡: 輸入圖片說明

消費端會經過RebalanceService線程,10秒鐘作一次基於topic下的全部隊列負載:

    1. 遍歷Consumer下的全部topic,而後根據topic訂閱全部的消息
    1. 獲取同一topic和Consumer Group下的全部Consumer
    1. 而後根據具體的分配策略來分配消費隊列,分配的策略包含:平均分配、消費端配置等

如同上圖所示:若是有 5 個隊列,2 個 consumer,那麼第一個 Consumer 消費 3 個隊列,第二 consumer 消費 2 個隊列。這裏採用的就是平均分配策略,它相似於咱們的分頁,TOPIC下面的全部queue就是記錄,Consumer的個數就至關於總的頁數,那麼每頁有多少條記錄,就相似於某個Consumer會消費哪些隊列。

經過這樣的策略來達到大致上的平均消費,這樣的設計也能夠很方面的水平擴展Consumer來提升消費能力。

消費端的Push模式是經過長輪詢的模式來實現的,就如同下圖:

輸入圖片說明

Consumer端每隔一段時間主動向broker發送拉消息請求,broker在收到Pull請求後,若是有消息就當即返回數據,Consumer端收到返回的消息後,再回調消費者設置的Listener方法。若是broker在收到Pull請求時,消息隊列裏沒有數據,broker端會阻塞請求直到有數據傳遞或超時才返回。

固然,Consumer端是經過一個線程將阻塞隊列LinkedBlockingQueue中的PullRequest發送到broker拉取消息,以防止Consumer一致被阻塞。而Broker端,在接收到Consumer的PullRequest時,若是發現沒有消息,就會把PullRequest扔到ConcurrentHashMap中緩存起來。broker在啓動時,會啓動一個線程不停的從ConcurrentHashMap取出PullRequest檢查,直到有數據返回。

7、RocketMQ的其餘特性

前面的6個特性都是基本上都是點到爲止,想要深刻了解,還須要你們多多查看源碼,多多在實際中運用。固然除了已經提到的特性外,RocketMQ還支持:

  1. 定時消息
  1. 消息的刷盤策略
  2. 主動同步策略:同步雙寫、異步複製
  3. 海量消息堆積能力
  4. 高效通訊 .
  5. ……

其中涉及到的不少設計思路和解決方法都值得咱們深刻研究:

  1. 消息的存儲設計:既要知足海量消息的堆積能力,又要知足極快的查詢效率,還要保證寫入的效率。
  1. 高效的通訊組件設計:高吞吐量,毫秒級的消息投遞能力都離不開高效的通訊。
  2. …….

RocketMQ最佳實踐

1、Producer最佳實踐

一、一個應用盡量用一個 Topic,消息子類型用 tags 來標識,tags 能夠由應用自由設置。只有發送消息設置了tags,消費方在訂閱消息時,才能夠利用 tags 在 broker 作消息過濾。

二、每一個消息在業務層面的惟一標識碼,要設置到 keys 字段,方便未來定位消息丟失問題。因爲是哈希索引,請務必保證 key 儘量惟一,這樣能夠避免潛在的哈希衝突。

三、消息發送成功或者失敗,要打印消息日誌,務必要打印 sendresult 和 key 字段。

四、對於消息不可丟失應用,務必要有消息重發機制。例如:消息發送失敗,存儲到數據庫,能有定時程序嘗試重發或者人工觸發重發。

五、某些應用若是不關注消息是否發送成功,請直接使用sendOneWay方法發送消息。

2、Consumer最佳實踐

消費過程要作到冪等(即消費端去重)

儘可能使用批量方式消費方式,能夠很大程度上提升消費吞吐量

優化每條消息消費過程

3、其餘配置

線上應該關閉autoCreateTopicEnable,即在配置文件中將其設置爲false。

RocketMQ在發送消息時,會首先獲取路由信息。若是是新的消息,因爲MQServer上面尚未建立對應的Topic,這個時候,若是上面的配置打開的話,會返回默認TOPIC的(RocketMQ會在每臺broker上面建立名爲TBW102的TOPIC)路由信息,而後Producer會選擇一臺Broker發送消息,選中的broker在存儲消息時,發現消息的topic尚未建立,就會自動建立topic。後果就是:之後全部該TOPIC的消息,都將發送到這臺broker上,達不到負載均衡的目的。

因此基於目前RocketMQ的設計,建議關閉自動建立TOPIC的功能,而後根據消息量的大小,手動建立TOPIC。

RocketMQ設計相關 RocketMQ的設計假定:

每臺PC機器均可能宕機不可服務

任意集羣都有可能處理能力不足

最壞的狀況必定會發生

內網環境須要低延遲來提供最佳用戶體驗

RocketMQ的關鍵設計:

分佈式集羣化

強數據安全

海量數據堆積

毫秒級投遞延遲(推拉模式)
相關文章
相關標籤/搜索