RocketMQ存儲機制01-存儲文件組織與內存映射

image

上圖是以CommitLog文件爲例,展現了commitlog文件與MappedFile、MapppedFileQueue的關係。
你能夠把磁盤裏面commitlog文件夾下每一個文件對應成MappedFile,而這個文件夾對應成MappedFileQueue。緩存

先從MappedFileQueue看起app

MappedFileQueue

private final String storePath;//存儲目錄

    private final int mappedFileSize;//一個存儲文件的大小

    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();//寫時複製MappedFile列表,按順序存儲各個commitlog對於的MappedFile

    private final AllocateMappedFileService allocateMappedFileService;//分配MappedFile服務

    private long flushedWhere = 0;//當前刷盤指針的位置,表示以前的數據已經刷到磁盤
    private long committedWhere = 0;//當前數據提交指針

MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound)

/*
            此處獲得的第一個MappedFile的文件起始offset(即文件名)不必定是0,
            以前的文件有可能已經被清除了。
             */
            MappedFile mappedFile = this.getFirstMappedFile();
            if (mappedFile != null) {
                /*
                以前的文件有可能已經被清除了(從this.mappedFiles裏也會刪掉)。所以不能直接用offset / this.mappedFileSize
                計算offset對應的文件索引。
                 */
                int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize));
                if (index < 0 || index >= this.mappedFiles.size()) {
                    LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " +
                            "mappedFileSize: {}, mappedFiles count: {}",
                        mappedFile,
                        offset,
                        index,
                        this.mappedFileSize,
                        this.mappedFiles.size());
                }

                try {
                    return this.mappedFiles.get(index);
                } catch (Exception e) {
                    if (returnFirstOnNotFound) {
                        return mappedFile;
                    }
                    LOG_ERROR.warn("findMappedFileByOffset failure. ", e);
                }
            }

image

MappedFile

public static final int OS_PAGE_SIZE = 1024 * 4;//系統頁緩存大小
    protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);//當前JVM實例中MappedFile虛擬內存

    private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);//當前JVM實例中MappedFile對象個數
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);//當前文件的寫指針
    //ADD BY ChenYang
    protected final AtomicInteger committedPosition = new AtomicInteger(0);//當前文件的提交指針
    private final AtomicInteger flushedPosition = new AtomicInteger(0);//刷寫磁盤的指針
    protected int fileSize;//文件大小
    protected FileChannel fileChannel;//文件通道
    /**
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     */
    protected ByteBuffer writeBuffer = null;//該buffer從transientStorePool申請,消息首先會放到這裏,而後再提交到FileChannel
    protected TransientStorePool transientStorePool = null;//堆外內存池,與上面的writeBuffer共同起做用
    private String fileName;//文件名稱
    private long fileFromOffset;//文件起始物理偏移地址
    private File file;//文件自己
    private MappedByteBuffer mappedByteBuffer;//FileChannel對應的內存映射
    private volatile long storeTimestamp = 0;//文件最後一次內容寫入時間
    private boolean firstCreateInQueue = false;//是不是MappedFileQueue中的第一個文件

init(final String fileName, final int fileSize)

this.fileName = fileName;
        this.fileSize = fileSize;
        this.file = new File(fileName);
        this.fileFromOffset = Long.parseLong(this.file.getName());//文件名即爲文件的起始物理偏移量
        boolean ok = false;

        ensureDirOK(this.file.getParent());//確保文件的父目錄存在

        try {
            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();//建立文件讀寫通道
            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);//得到映射buffer
            TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);//增長
            TOTAL_MAPPED_FILES.incrementAndGet();//增長
            ok = true;
        }

image

int commit(final int commitLeastPages)

if (writeBuffer == null) {
            //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
            //沒開啓堆外內存池,不須要提交數據到fileChannel
            return this.wrotePosition.get();//返回當前寫的位置
        }
        if (this.isAbleToCommit(commitLeastPages)) {
            if (this.hold()) {//???
                commit0(commitLeastPages);
                this.release();//???
            } else {
                log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
            }
        }
        // All dirty data has been committed to FileChannel.
        //全部數據已經被提交到了FileChannel
        if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
            this.transientStorePool.returnBuffer(writeBuffer);//將writeBuffer歸還給transientStorePool
            this.writeBuffer = null;
        }

        return this.committedPosition.get();//返回當前提交的位置

boolean isAbleToCommit(final int commitLeastPages)

int flush = this.committedPosition.get();
        int write = this.wrotePosition.get();

        if (this.isFull()) {
            return true;//當前文件已經寫滿;,能夠提交
        }

        if (commitLeastPages > 0) {
            //本次要提交的數據頁大於等於容許提交的最小閾值,能夠提交
            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
        }

        return write > flush;//當前寫的位置大於提交的位置,能夠提交

void commit0(final int commitLeastPages)

int writePos = this.wrotePosition.get();
        int lastCommittedPosition = this.committedPosition.get();

        if (writePos - this.committedPosition.get() > 0) {
            try {
                ByteBuffer byteBuffer = writeBuffer.slice();//建立一個buffer,與writeBuffer指向同一緩存區
                byteBuffer.position(lastCommittedPosition);//回退buffer當前指針位置爲lastCommittedPosition
                byteBuffer.limit(writePos);//設置當前最大有效數據指針
                this.fileChannel.position(lastCommittedPosition);
                this.fileChannel.write(byteBuffer);
                this.committedPosition.set(writePos);
            } catch (Throwable e) {
                log.error("Error occurred when commit data to FileChannel.", e);
            }
        }
相關文章
相關標籤/搜索