RabbitMQ都寫了,RocketMQ怎麼能落下?

總體架構

最近看到了我在Github上寫的rabbitmq-examples陸續被人star了,就想着寫個rocketmq-examples。對rabbitmq感興趣的小夥伴能夠看我以前的文章。下面把RocketMQ的各個特性簡單介紹一下,這樣在用的時候內心也更有把握git

全網最全RabbitMQ總結,別再說你不會RabbitMQRocketMQ是阿里自研的消息中間件,RocketMQ的總體架構以下主要有4個角色github

Producer:消息生產者。相似,發信者 Consumer:消息消費者。相似,收信者 BrokerServer:消息的存儲,投遞,查詢。相似,郵局 NameServer:註冊中心,支持Broker的動態註冊與發現。相似,郵局的管理結構web

再介紹幾個基本概念算法

Topic(主題):一類消息的集合,Topic和消息是一對多的關係。每一個Broker能夠存儲多個Topic的消息,每一個Topic也能夠分片存儲於不一樣的Broker數據庫

Tag(標籤):在Topic類別下的二級子類別。如財務系統的全部消息的Topic爲Finance_Topic,建立訂單消息的Tag爲Create_Tag,關閉訂單消息的Tag爲Close_Tag。這樣就能根據Tag消費不一樣的消息,固然你也能夠爲建立訂單和關閉訂單的消息各自建立一個Topicapache

Message Queue(消息隊列):至關於Topic的分區,用於並行發送和消費消息。Message Queue在Broker上,一個Topic默認的Message Queue的數量爲4微信

Producer Group(生產者組):同一類Producer的集合。若是發送的是事務消息且原始生產者在發送以後崩潰,Broker會聯繫統一輩子產者組內的其餘生產者實例以提交或回溯消費網絡

Consumer Group(消費者組):同一類Consumer的集合。消費者組內的實例必須訂閱徹底相同的Topic架構

Clustering(集羣消費):相同Consumer Group下的每一個Consumer實例平均分攤消息負載均衡

Broadcasting(廣播消費):相同Consumer Group的每一個Consumer實例都接收全量的消息

用圖演示一下Clustering和Broadcasting的區別若是我有一條訂單程成交的消息,財務系統和物流系統都要同時訂閱消費這條消息,該怎麼辦呢?定義2個Consumer Group便可

Consumer1和Consumer2屬於一個Consumer Group,Consumer3和Consumer4屬於一個Consumer Group,消息會全量發送到這2個Consuemr Group,至於這2個Consumer Group是集羣消費仍是廣播消費,本身定義便可

工做流程在官方文檔寫的很詳細,再也不深刻了

https://github.com/apache/rocketmq/tree/master/docs/cn

Message

消息的各類處理方式涉及到的內容較多,因此我就不在文章中放代碼了,直接放GitHub了,目前還在不斷完善中

地址爲:https://github.com/erlieStar/rocketmq-examples,

和以前的RabbitMQ一個風格,基本上全部知識點都涉及到了

地址爲:https://github.com/erlieStar/rabbitmq-example

每一個消息必須屬於一個Topic。RocketMQ中每一個消息具備惟一的Message Id,且能夠攜帶具備業務標識的Key,咱們能夠經過Topic,Message Id或Key來查詢消息

消息消費的方式

  1. Pull(拉取式消費),Consumer主動從Broker拉取消息
  2. Push(推送式消費),Broker收到數據後會主動推送給Consumer,實時性較高

消息的過濾方式

  1. 指定Tag
  2. SQL92語法過濾

消息的發送方式

  1. 同步,收到響應後纔會發送下一條消息
  2. 異步,一直髮,用異步的回調函數來獲取結果
  3. 單向(只管發,無論結果)

消息的種類

  1. 順序消息
  2. 延遲消息
  3. 批量消息
  4. 事務消息

順序消息

順序消息分爲局部有序和全局有序

官方介紹爲普通順序消息和嚴格順序消息

局部有序:同一個業務相關的消息是有序的,如針對同一個訂單的建立和付款消息是有序的,只須要在發送的時候指定message queue便可,以下所示,將同一個orderId對應的消息發送到同一個隊列

SendResult sendResult = producer.send(message, new MessageQueueSelector() {
 /**
  * @param mqs topic對應的message queue
  * @param msg send方法傳入的message
  * @param arg send方法傳入的orderId
  */

 @Override
 public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  // 根據業務對象選擇對應的隊列
  Integer orderId = (Integer) arg;
  int index = orderId % mqs.size();
  return mqs.get(index);
 }
}, orderId);

消費者所使用的Listener必須是MessageListenerOrderly(對於一個隊列的消息採用一個線程去處理),而日常的話咱們使用的是MessageListenerConcurrently

全局有序:要想實現全局有序,則Topic只能有一個message queue。

延遲消息

RocketMQ並不支持任意時間的延遲,須要設置幾個固定的延時等級,從1s到2h分別對應着等級1到18

// org.apache.rocketmq.store.config.MessageStoreConfig 
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"

批量消息

批量發送消息能顯著提升傳遞小消息的性能,限制是這批消息應該有相同的topic,相同的waitStoreMsgOK,並且不能是延時消息,一批消息的總大小不該超過1MB

事務消息

事務在實際的業務場景中仍是常常遇到的,以轉帳爲例子

張三給李四轉帳100元,能夠分爲以下2步

  1. 張三的帳戶減去100元
  2. 李四的帳戶加上100元

這2個操做要是同時成功,要是同時失敗,否則會形成數據不一致的狀況,基於單個數據庫Connection時,咱們只須要在方法上加上@Transactional註解就能夠了。

若是基於多個Connection(如服務拆分,數據庫分庫分表),加@Transactional此時就無論用了,就得用到分佈式事務

分佈式事務的解決方案不少,RocketMQ只是其中一種方案,RocketMQ能夠保證最終一致性RocketMQ實現分佈式事務的流程以下

  1. producer向mq server發送一個半消息
  2. mq server將消息持久化成功後,向發送方確認消息已經發送成功,此時消息並不會被consumer消費
  3. producer開始執行本地事務邏輯
  4. producer根據本地事務執行結果向mq server發送二次確認,mq收到commit狀態,將消息標記爲可投遞,consumer會消費該消息。mq收到rollback則刪除半消息,consumer將不會消費該消息,若是收到unknow狀態,mq會對消息發起回查
  5. 在斷網或者應用重啓等特殊狀況下,步驟4提交的2次確認有可能沒有到達mq server,通過固定時間後mq會對該消息發起回查
  6. producer收到回查後,須要檢查本地事務的執行狀態
  7. producer根據本地事務的最終狀態,再次提交二次確認,mq仍按照步驟4對半消息進行操做

理解了原理,看代碼實現就很容易了,放一個官方的example

public class TransactionListenerImpl implements TransactionListener {

    private AtomicInteger index = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = index.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (status != null) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

實現分佈式事務須要實現TransactionListener接口,2個方法的做用以下

  1. executeLocalTransaction,執行本地事務
  2. checkLocalTransaction,回查本地事務狀態

針對這個例子,全部的消息都會回查,由於返回的都是UNKNOW,回查的時候status=1的數據會被消費,status=2的數據會被刪除,status=0的數據會一直回查,直到超過默認的回查次數。

發送方代碼以下

public class TransactionProducer {

    public static final String RPODUCER_GROUP_NAME = "transactionProducerGroup";
    public static final String TOPIC_NAME = "transactionTopic";
    public static final String TAG_NAME = "transactionTag";

    public static void main(String[] args) throws Exception {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer(RPODUCER_GROUP_NAME);

        ExecutorService executorService = new ThreadPoolExecutor(25100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100), new ThreadFactory() {

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread();
                thread.setName("transaction-msg-check-thread");
                return thread;
            }
        });
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        for (int i = 0; i < 100; i++) {
            Message message = new Message(TOPIC_NAME, TAG_NAME,
                    ("hello rocketmq " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(message);
            System.out.println(sendResult);
        }

        TimeUnit.HOURS.sleep(1);
        producer.shutdown();
    }
}

看到這,可能有人會問了,咱們先執行本地事務,執行成功後再發送消息,這樣能夠嗎?

其實這樣作仍是有可能會形成數據不一致的問題。假如本地事務執行成功,發送消息,因爲網絡延遲,消息發送成功,可是回覆超時了,拋出異常,本地事務回滾。可是消息其實投遞成功並被消費了,此時就會形成數據不一致的狀況

那消息投遞到mq server,consumer消費失敗怎麼辦?

若是是消費超時,重試便可。若是是因爲代碼等緣由真的消費失敗了,此時就得人工介入,從新手動發送消息,達到最終一致性。

消息重試

發送端重試

producer向broker發送消息後,沒有收到broker的ack時,rocketmq會自動重試。重試的次數能夠設置,默認爲2次

DefaultMQProducer producer = new DefaultMQProducer(RPODUCER_GROUP_NAME);
// 同步發送設置重試次數爲5次
producer.setRetryTimesWhenSendFailed(5);
// 異步發送設置重試次數爲5次
producer.setRetryTimesWhenSendAsyncFailed(5);

消費端重試

順序消息的重試

對於順序消息,當Consumer消費消息失敗後,RocketMQ會不斷進行消息重試,此時後續消息會被阻塞。因此當使用順序消息的時候,監控必定要作好,避免後續消息被阻塞

無序消息的重試

當消費模式爲集羣模式時,Broker纔會自動進行重試,對於廣播消息是不會進行重試的

當consumer消費消息後返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS代表消費消息成功,不會進行重試

當consumer符合以下三種場景之一時,會對消息進行重試

  1. 返回ConsumeConcurrentlyStatus.RECONSUME_LATER
  2. 返回null
  3. 主動或被動拋出異常

RocketMQ默認每條消息會被重試16次,超過16次則再也不重試,會將消息放到死信隊列,固然咱們也能夠本身設置重試次數

每次重試的時間間隔以下

第幾回重試 與上次間隔時間 第幾回重試 與上次間隔時間
1 10s 10 7分鐘
2 30s 11 8分鐘
3 1分鐘 12 9分鐘
4 2分鐘 13 10分鐘
5 3分鐘 14 20分鐘
6 4分鐘 15 30分鐘
7 5分鐘 16 1小時
8 6分鐘 17 2小時

重試隊列和死信隊列

當消息消費失敗,會被髮送到重試隊列

當消息消費失敗,並達到最大重試次數,rocketmq並不會將消息丟棄,而是將消息發送到死信隊列

死信隊列有以下特色

  1. 裏面存的是不能被正常消費的消息
  2. 有效期與正常消息相同,都是3天,3天后會被刪除
  3. 每一個死信隊列對應一個Consumer Group ID,即死信隊列是消費者組級別的
  4. 若是一個Consumer Group沒有產生死信消息,則RocketMQ不會建立對應的死信隊列
  5. 死信隊列包含了一個Consumer Group下的全部死信消息,無論該消息屬於哪一個Topic

重試隊列的命名爲  %RETRY%消費組名稱 死信隊列的命名爲 %DLQ%消費組名稱

RocketMQ高性能和高可用的方式

總體架構

rocketmq是經過broker主從機制來實現高可用的。相同broker名稱,不一樣brokerid的機器組成一個broker組,brokerId=0代表這個broker是master,brokerId>0代表這個broker是slave。

消息生產的高可用:建立topic時,把topic的多個message queue建立在多個broker組上。這樣當一個broker組的master不可用後,producer仍然能夠給其餘組的master發送消息。rocketmq目前還不支持主從切換,須要手動切換

消息消費的高可用:consumer並不能配置從master讀仍是slave讀。當master不可用或者繁忙的時候consumer會被自動切換到從slave讀。這樣當master出現故障後,consumer仍然能夠從slave讀,保證了消息消費的高可用

消息存儲結構

RocketMQ須要保證消息的高可靠性,因此要將數據經過磁盤進行持久化存儲。

將數據存到磁盤會不會很慢?其實磁盤有時候比你想象的快,有時候比你想象的慢。目前高性能磁盤的順序寫速度能夠達到600M/s,而磁盤的隨機寫大概只有100k/s,和順序寫的性能相差6000倍,因此RocketMQ採用順序寫。

而且經過mmap(零拷貝的一種實現方式,零拷貝能夠省去用戶態到內核態的數據拷貝,提升速度)具體原理並非很懂,有興趣的小夥伴能夠看看相關書籍

總而言之,RocketMQ經過順序寫和零拷貝技術實現了高性能的消息存儲和消息相關的文件有以下幾種

  1. CommitLog:存儲消息的元數據
  2. ConsumerQueue:存儲消息在CommitLog的索引
  3. IndexFile:提供了一種經過key或者時間區間來查詢消息的方法

刷盤機制

  1. 同步刷盤:消息被寫入內存的PAGECACHE,返回寫成功狀態,當內存裏的消息量積累到必定程度時,統一觸發寫磁盤操做,快速寫入 。吞吐量低,但不會形成消息丟失
  2. 異步刷盤:消息寫入內存的PAGECACHE後,馬上通知刷盤線程刷盤,而後等待刷盤完成,刷盤線程執行完成後喚醒等待的線程,給應用返回消息寫成功的狀態。吞吐量高,當磁盤損壞時,會丟失消息

主從複製

若是一個broker有master和slave時,就須要將master上的消息複製到slave上,複製的方式有兩種

  1. 同步複製:master和slave均寫成功,才返回客戶端成功。maste掛了之後能夠保證數據不丟失,可是同步複製會增長數據寫入延遲,下降吞吐量
  2. 異步複製:master寫成功,返回客戶端成功。擁有較低的延遲和較高的吞吐量,可是當master出現故障後,有可能形成數據丟失

負載均衡

Producer負載均衡

producer在發送消息時,默認輪詢全部queue,消息就會被髮送到不一樣的queue上。而queue能夠分佈在不一樣的broker上

Consumer負載均衡

默認的分配算法是AllocateMessageQueueAveragely,以下圖還有另一種平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分攤queue,只是以環狀輪流分queue的形式,以下圖:

若是consumer數量比message queue還多,則多會來的consumer會被閒置。因此不要讓consumer的數量多於message queue的數量

圖形化管理工具

在rocketmq-externals這個項目中提供了rocketmq的不少擴展工具

github地址以下:https://github.com/apache/rocketmq-externals

其中有一個子項目rocketmq-console提供了rocketmq的圖像化工具,提供了不少實用的功能,如前面說的經過Topic,Message Id或Key來查詢消息,從新發送消息等,仍是很方便的


本文分享自微信公衆號 - Java識堂(erlieStar)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索