咱們知道RocketMQ以性能強勁著稱,本篇文章咱們就底層存儲原理並結合代碼,瞭解下RocketMQ高性能的祕密武器到底都有啥。java
Pepper Metrics是我與同事開發的一個開源工具(github.com/zrbcool/pep…),其經過收集jedis/mybatis/httpservlet/dubbo/motan的運行性能統計,並暴露成prometheus等主流時序數據庫兼容數據,經過grafana展現趨勢。其插件化的架構也很是方便使用者擴展並集成其餘開源組件。
請你們給個star,同時歡迎你們成爲開發者提交PR一塊兒完善項目。git
咱們就這張圖來解釋一下(本文部分圖片來自於艾瑞克的技術江湖) github
前面咱們簡單說了RMQ,下面咱們結合源碼深刻挖掘一下,到底黑科技是如何實現的
org.apache.rocketmq.store.CommitLog就是RMQ對CommitLog的抽象封裝,咱們來重點關注putMessage方法,也就是寫消息的方法,該方法有兩個實現數據庫
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
//單個消息
}
public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
//批量消息
}
複製代碼
下面咱們以單個消息爲例看下代碼apache
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// ...
//獲取內存映射文件句柄
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//加鎖
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
//...省略部分代碼
//重點來了,調用MappedFile.appendMessage方法將消息字節追加到共享內存中,由操做系統或者後臺刷盤線程完成刷盤的動做
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
//...省略部分代碼
} finally {
//解鎖
putMessageLock.unlock();
}
//...省略部分代碼
//觸發刷盤動做,根據配置不一樣選擇同步或者異步刷盤
handleDiskFlush(result, putMessageResult, msg);
handleHA(result, putMessageResult, msg);
return putMessageResult;
}
複製代碼
能夠看到putMessage最終調用到了MappedFile的appendMessage方法,完成消息字節到內存映射文件的追加,這個內存映射文件又是什麼鬼?數組
內存映射文件(mmap)簡而言之,將文件直接映射到用戶態的內存地址,這樣對文件的操做再也不是write/read,而是直接對內存地址的操做。具體能夠參考這幾篇博文,寫的很詳細
Java文件映射[mmap]全接觸
java中的mmap實現
深度分析mmap:是什麼 爲何 怎麼用 性能總結緩存
接下來咱們須要重點聊下MappedFile,由於RMQ真正高性能的黑科技在於合理的利用了mmap內存映射文件技術及堆外內存操做ByteBuffer,這些操做都被封裝到了這個類當中。性能優化
初始化,在RMQ啓動時相應的線程會構建MappedFile完成內存映射操做,下面兩行即是關鍵代碼微信
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
//進行mmap操做獲得映射內存mappedByteBuffer
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
複製代碼
可見在程序開始時便預先完成了內存映射操做,fileChannel.map實際上最終經過JNI調用了C庫當中的mmap方法,具體能夠參考文章:Java文件映射[mmap]全接觸
上一節講到的mappedFile.appendMessage(msg, this.appendMessageCallback)調用關係實際爲:
appendMessage->appendMessagesInner->cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, messageExt)
咱們先簡單看一下appendMessagesInner,我挑關鍵代碼列一下:mybatis
//獲得當前位置
int currentPos = this.wrotePosition.get();
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
//定位到寫位置
byteBuffer.position(currentPos);
//調用具體實現追加消息到內存
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
//移動寫位置,增長寫入字節數爲偏移量
this.wrotePosition.addAndGet(result.getWroteBytes());
複製代碼
能夠看到,MappedFile就是不停的寫內存,而後移動末尾指針來實現消息內容到內存映射的追加的,而內存映射文件的實際文件寫入時機多是操做系統按期調用,髒頁過大,程序主動調用byteBuffer.force方法
而cb這個callback回調其實仍是在CommitLog的內部類DefaultAppendMessageCallback定義的,咱們就來看下關鍵部分的代碼:
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
複製代碼
沒錯,雖然方法中代碼不少,但緊扣本篇的主題實際上關鍵的只有這一句,msgStoreItemMemory也是一個ByteBuffer,它用於臨時保存當前消息的字節數組,消息的元數據定義能夠參考下圖:
前面代碼中咱們看到操做寫消息時進行了加鎖操做,這裏RMQ本身實現了一個PutMessageLock接口,有兩種實現PutMessageReentrantLock及PutMessageSpinLock,其中PutMessageReentrantLock就是直接使用jdk重入鎖的實現,咱們重點說下PutMessageSpinLock
咱們知道Java的鎖(重量級鎖)底層其實是調用pthread的mutex方法競爭鎖,而這是一個內核函數,也就是說會產生大量的上下文切換,另外,若是沒搶到鎖線程進入阻塞狀態,到收到信號喚醒工做也有延遲。在這裏SpinLock的方式,利用CAS加自旋強行讓當前CPU空跑等待搶鎖成功,這樣就避免了上下文切換的損失,可是代價就是大量的空操做浪費CPU時間片,形成CPU使用率高的現象,咱們來看下代碼:
public class PutMessageSpinLock implements PutMessageLock {
//true: Can lock, false : in lock.
private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
@Override
public void lock() {
boolean flag;
do {
//自旋 + CAS
flag = this.putMessageSpinLock.compareAndSet(true, false);
}
while (!flag);
}
@Override
public void unlock() {
this.putMessageSpinLock.compareAndSet(false, true);
}
}
複製代碼
小結一下,本篇文章咱們從底層實現原理講了RocketMQ高性能的緣由及背後的黑科技,可是RMQ的性能優化部分相信遠不止於此,今天就先寫到這裏,後面做者再進行發掘後整理成文章發佈。