RocketMQ源碼解析 - 高性能的祕密武器 - mmap

前言

咱們知道RocketMQ以性能強勁著稱,本篇文章咱們就底層存儲原理並結合代碼,瞭解下RocketMQ高性能的祕密武器到底都有啥。java

開源項目推薦

Pepper Metrics是我與同事開發的一個開源工具(github.com/zrbcool/pep…),其經過收集jedis/mybatis/httpservlet/dubbo/motan的運行性能統計,並暴露成prometheus等主流時序數據庫兼容數據,經過grafana展現趨勢。其插件化的架構也很是方便使用者擴展並集成其餘開源組件。
請你們給個star,同時歡迎你們成爲開發者提交PR一塊兒完善項目。git

底層存儲核心原理概述

咱們就這張圖來解釋一下(本文部分圖片來自於艾瑞克的技術江湖github

  • 簡單來講,RMQ的全部的消息存儲在一個文件當中,這個文件就是圖中的CommitLog,因爲磁盤順序寫特性(不管是機械磁盤或SSD固態硬盤,順序寫的速度都遠大於隨機寫)因此RMQ可很好的利用操做系統特性,將消息內容寫入內存成功後即返回,這部分寫入內存成功但還未刷入硬盤的數據,在內核當中被稱爲髒頁(Dirty Page),操做系統會根據特定的狀況定時或當髒頁超過閾值時觸發一次回寫,而這個過程中對磁盤的寫至關於批量而且是順序寫,這就是RMQ在寫消息時可以高性能的緣由之一。
  • 咱們再來講下讀消息,看圖右下側,消息在邏輯抽象上變成多個隊列,這些隊列被稱爲ConsumeQueue,根據名字就能夠猜出這些隊列是爲讀服務的,咱們寫入CommitLog的消息的消息ID及偏移量等信息,被均勻的寫入到多個ConsumeQueue當中,這些隊列在磁盤上也對應着相應的文件,而咱們知道通常來講消息的消費者有多個或者多線程,這樣就能夠併發的從這些ConsumeQueue中讀取消息的位置,而後再去CommitLog當中讀取具體的消息內容,而因爲通常狀況下消息隊列的讀寫都可以命中內存當中的緩存,因此正常狀況下讀操做其實就是讀內存而已,固然性能高了。例以下面的圖示就可以看到,當RMQ高性能讀寫時,磁盤讀是很是少的

結合源碼分析

CommitLog

前面咱們簡單說了RMQ,下面咱們結合源碼深刻挖掘一下,到底黑科技是如何實現的
org.apache.rocketmq.store.CommitLog就是RMQ對CommitLog的抽象封裝,咱們來重點關注putMessage方法,也就是寫消息的方法,該方法有兩個實現數據庫

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    //單個消息
}
public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
    //批量消息
}
複製代碼

下面咱們以單個消息爲例看下代碼apache

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
        // ...
        //獲取內存映射文件句柄
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        //加鎖
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            //...省略部分代碼
            //重點來了,調用MappedFile.appendMessage方法將消息字節追加到共享內存中,由操做系統或者後臺刷盤線程完成刷盤的動做
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            //...省略部分代碼
        } finally {
            //解鎖
            putMessageLock.unlock();
        }
        //...省略部分代碼
        //觸發刷盤動做,根據配置不一樣選擇同步或者異步刷盤
        handleDiskFlush(result, putMessageResult, msg);
        handleHA(result, putMessageResult, msg);
        return putMessageResult;
    }
複製代碼

能夠看到putMessage最終調用到了MappedFile的appendMessage方法,完成消息字節到內存映射文件的追加,這個內存映射文件又是什麼鬼?數組

內存映射文件(mmap)簡而言之,將文件直接映射到用戶態的內存地址,這樣對文件的操做再也不是write/read,而是直接對內存地址的操做。具體能夠參考這幾篇博文,寫的很詳細
Java文件映射[mmap]全接觸
java中的mmap實現
深度分析mmap:是什麼 爲何 怎麼用 性能總結緩存

MappedFile

接下來咱們須要重點聊下MappedFile,由於RMQ真正高性能的黑科技在於合理的利用了mmap內存映射文件技術及堆外內存操做ByteBuffer,這些操做都被封裝到了這個類當中。性能優化

初始化,在RMQ啓動時相應的線程會構建MappedFile完成內存映射操做,下面兩行即是關鍵代碼微信

this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
//進行mmap操做獲得映射內存mappedByteBuffer
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
複製代碼

可見在程序開始時便預先完成了內存映射操做,fileChannel.map實際上最終經過JNI調用了C庫當中的mmap方法,具體能夠參考文章:Java文件映射[mmap]全接觸
上一節講到的mappedFile.appendMessage(msg, this.appendMessageCallback)調用關係實際爲:
appendMessage->appendMessagesInner->cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, messageExt)
咱們先簡單看一下appendMessagesInner,我挑關鍵代碼列一下:mybatis

//獲得當前位置
int currentPos = this.wrotePosition.get();
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
//定位到寫位置
byteBuffer.position(currentPos);
//調用具體實現追加消息到內存
if (messageExt instanceof MessageExtBrokerInner) {
    result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
    result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
//移動寫位置,增長寫入字節數爲偏移量
this.wrotePosition.addAndGet(result.getWroteBytes());
複製代碼

能夠看到,MappedFile就是不停的寫內存,而後移動末尾指針來實現消息內容到內存映射的追加的,而內存映射文件的實際文件寫入時機多是操做系統按期調用,髒頁過大,程序主動調用byteBuffer.force方法

而cb這個callback回調其實仍是在CommitLog的內部類DefaultAppendMessageCallback定義的,咱們就來看下關鍵部分的代碼:

byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
複製代碼

沒錯,雖然方法中代碼不少,但緊扣本篇的主題實際上關鍵的只有這一句,msgStoreItemMemory也是一個ByteBuffer,它用於臨時保存當前消息的字節數組,消息的元數據定義能夠參考下圖:

這樣對於寫消息線程來講,完成byteBuffer.put並返回,就算消息寫入成功了,真正的落盤由操做系統或後臺線程,定時或根據相應時機調用byteBuffer.force方法完成刷盤

加餐PutMessageLock

前面代碼中咱們看到操做寫消息時進行了加鎖操做,這裏RMQ本身實現了一個PutMessageLock接口,有兩種實現PutMessageReentrantLock及PutMessageSpinLock,其中PutMessageReentrantLock就是直接使用jdk重入鎖的實現,咱們重點說下PutMessageSpinLock
咱們知道Java的鎖(重量級鎖)底層其實是調用pthread的mutex方法競爭鎖,而這是一個內核函數,也就是說會產生大量的上下文切換,另外,若是沒搶到鎖線程進入阻塞狀態,到收到信號喚醒工做也有延遲。在這裏SpinLock的方式,利用CAS加自旋強行讓當前CPU空跑等待搶鎖成功,這樣就避免了上下文切換的損失,可是代價就是大量的空操做浪費CPU時間片,形成CPU使用率高的現象,咱們來看下代碼:

public class PutMessageSpinLock implements PutMessageLock {
    //true: Can lock, false : in lock.
    private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);

    @Override
    public void lock() {
        boolean flag;
        do {
            //自旋 + CAS
            flag = this.putMessageSpinLock.compareAndSet(true, false);
        }
        while (!flag);
    }

    @Override
    public void unlock() {
        this.putMessageSpinLock.compareAndSet(false, true);
    }
}

複製代碼

總結

小結一下,本篇文章咱們從底層實現原理講了RocketMQ高性能的緣由及背後的黑科技,可是RMQ的性能優化部分相信遠不止於此,今天就先寫到這裏,後面做者再進行發掘後整理成文章發佈。

做者其餘文章

github.com/zrbcool/blo…

微信訂閱號

相關文章
相關標籤/搜索