請問你知道分佈式系統設計模式的最低水位線思想麼?

最低水位線(Low-Water Mark)

最低水位線是指在 WAL(Write Ahead Log)預寫日誌這種設計模式中,標記在這個位置以前的日誌能夠被丟棄。設計模式

問題背景

WAL(Write Ahead Log)預寫日誌維護了對於存儲的每次更新,隨着時間不斷增加,這個日誌文件會變得無限大。Segmented Log 分割日誌這種設計模式可讓咱們每次只處理一個更小的文件,可是日誌若是不清理,會無休止增加以致於硬盤被佔滿。app

解決方案

最低水位線這種設計模式會告訴系統哪一部分的日誌能夠被刪除了,即在最低水位線以前的全部日誌能夠被清理掉。通常的方式是,程序內有一個線程運行一個定時任務,不斷地檢查哪一部分的日誌能夠被清理而且刪除這些日誌文件。dom

this.logCleaner = newLogCleaner(config);
this.logCleaner.startup();

這裏的 LogCleaner 能夠用定時任務實現:異步

public void startup() {
    scheduleLogCleaning();
}

private void scheduleLogCleaning() {
    singleThreadedExecutor.schedule(() -> {
        cleanLogs();
    }, config.getCleanTaskIntervalMs(), TimeUnit.MILLISECONDS);
}

基於快照的最低水位線實現以及示例

大部分的分佈式一致性系統(例如 Zookeeper(ZAB 簡化 paxos協議),etcd(raft協議)),都實現了快照機制。在這種機制下,他們的存儲引擎會定時的進行全量快照,而且記錄下快照對應的日誌位置,將這個位置做爲最低水位線。分佈式

//進行快照
public SnapShot takeSnapshot() {
    //獲取最近的日誌id
    Long snapShotTakenAtLogIndex = wal.getLastLogEntryId();
    //利用這個日誌 id 做爲標識,生成快照
    return new SnapShot(serializeState(kv), snapShotTakenAtLogIndex);
}

當生成了快照併成功存儲到了磁盤上,對應的最低水位線將用來清理老的日誌:ide

//根據位置獲取這個位置以前的全部日誌文件
List<WALSegment> getSegmentsBefore(Long snapshotIndex) {
    List<WALSegment> markedForDeletion = new ArrayList<>();
    List<WALSegment> sortedSavedSegments = wal.sortedSavedSegments;
    for (WALSegment sortedSavedSegment : sortedSavedSegments) {
        //若是這個日誌文件的最新log id 小於快照位置,證實能夠被清理掉
        if (sortedSavedSegment.getLastLogEntryId() < snapshotIndex) {
            markedForDeletion.add(sortedSavedSegment);
        }
    }
    return markedForDeletion;
}

zookeeper 中的最低水位線實現

定時任務位於DatadirCleanupManagerstart方法:post

public void start() {
    //只啓動一次
    if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
        LOG.warn("Purge task is already running.");
        return;
    }
    //檢查定時間隔有效性
    if (purgeInterval <= 0) {
        LOG.info("Purge task is not scheduled.");
        return;
    }
    //啓動定時任務
    timer = new Timer("PurgeTask", true);
    TimerTask task = new PurgeTask(dataLogDir, snapDir,snapRetainCount);
    timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
    purgeTaskStatus = PurgeTaskStatus.STARTED;
}

核心方法爲PurgeTxnLogpurge方法:ui

public static void purge(File dataDir, File snapDir, int num) throws IOException {
    //保留的snapshot數量不能超過3
    if (num < 3) {
        throw new IllegalArgumentException(COUNT_ERR_MSG);
    }

    FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
    //統計文件數量
    List<File> snaps = txnLog.findNValidSnapshots(num);
    int numSnaps = snaps.size();
    if (numSnaps > 0) {
        //利用上一個文件的日誌偏移,清理log文件和snapshot文件
        purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
    }
}

static void purgeOlderSnapshots(FileTxnSnapLog txnLog, File snapShot) {
    //名字包括開頭的zxid,就是表明了日誌位置
    final long leastZxidToBeRetain = Util.getZxidFromName(snapShot.getName(), PREFIX_SNAPSHOT);
    final Set<File> retainedTxnLogs = new HashSet<File>();
    retainedTxnLogs.addAll(Arrays.asList(txnLog.getSnapshotLogs(leastZxidToBeRetain)));
    class MyFileFilter implements FileFilter {

        private final String prefix;
        MyFileFilter(String prefix) {
            this.prefix = prefix;
        }
        public boolean accept(File f) {
            if (!f.getName().startsWith(prefix + ".")) {
                return false;
            }
            if (retainedTxnLogs.contains(f)) {
                return false;
            }
            long fZxid = Util.getZxidFromName(f.getName(), prefix);
            //根據文件名稱表明的zxid,過濾出要刪除的文件
            return fZxid < leastZxidToBeRetain;
        }

    }
    //篩選出符合條件的 log 文件和 snapshot 文件
    File[] logs = txnLog.getDataDir().listFiles(new MyFileFilter(PREFIX_LOG));
    List<File> files = new ArrayList<>();
    if (logs != null) {
        files.addAll(Arrays.asList(logs));
    }
    File[] snapshots = txnLog.getSnapDir().listFiles(new MyFileFilter(PREFIX_SNAPSHOT));
    if (snapshots != null) {
        files.addAll(Arrays.asList(snapshots));
    }
    //進行刪除
    for (File f : files) {
        final String msg = String.format(
            "Removing file: %s\t%s",
            DateFormat.getDateTimeInstance().format(f.lastModified()),
            f.getPath());

        LOG.info(msg);
        System.out.println(msg);

        if (!f.delete()) {
            System.err.println("Failed to remove " + f.getPath());
        }
    }

}

那麼是何時 snapshot 呢?查看SyncRequestProcessorrun方法,這個方法時處理請求,處理請求的時候記錄操做日誌到 log 文件,同時在有須要進行 snapshot 的時候進行 snapshot:this

public void run() {
    try {
        //避免全部的server都同時進行snapshot
        resetSnapshotStats();
        lastFlushTime = Time.currentElapsedTime();
        while (true) {
            //獲取請求代碼省略
            // 請求操做紀錄成功
            if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
                //是否須要snapshot
                if (shouldSnapshot()) {
                    //重置是否須要snapshot判斷相關的統計
                    resetSnapshotStats();
                    //另起新文件
                    zks.getZKDatabase().rollLog();
                    //進行snapshot,先獲取鎖,保證只有一個進行中的snapshot
                    if (!snapThreadMutex.tryAcquire()) {
                        LOG.warn("Too busy to snap, skipping");
                    } else {
                        //異步snapshot
                        new ZooKeeperThread("Snapshot Thread") {
                            public void run() {
                                try {
                                    zks.takeSnapshot();
                                } catch (Exception e) {
                                    LOG.warn("Unexpected exception", e);
                                } finally {
                                    //釋放鎖
                                    snapThreadMutex.release();
                                }
                            }
                        }.start();
                    }
                }
            } 
            //省略其餘
        }
    } catch (Throwable t) {
        handleException(this.getName(), t);
    }
}

resetSnapshotStats()設置隨機起始位,避免集羣內全部實例同時進行 snapshot:spa

private void resetSnapshotStats() {
    //生成隨機roll,snapCount(默認100000)
    randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2);
    //生成隨機size,snapSizeInBytes(默認4GB)
    randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2));
}

shouldSnapshot()根據啓動時設置的隨機起始位以及配置,判斷是否須要 snapshot

private boolean shouldSnapshot() {
    //獲取日誌計數
    int logCount = zks.getZKDatabase().getTxnCount();
    //獲取大小
    long logSize = zks.getZKDatabase().getTxnSize();
    //當日志個數大於snapCount(默認100000)/2 + 隨機roll,或者日誌大小大於snapSizeInBytes(默認4GB)/2+隨機size
    return (logCount > (snapCount / 2 + randRoll))
           || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize));
}

``

基於時間的最低水位線實現與示例

在某些系統中,日誌不是用來更新系統的狀態,能夠在一段時間以後刪除,而且不用考慮任何子系統這個最低水位線以前的是否能夠刪除。例如,kafka 默認保留 7 天的 log,RocketMQ 默認保留 3 天的 commit log。

RocketMQ中最低水位線實現

DefaultMeesageStoreaddScheduleTask()方法中,定義了清理的定時任務:

private void addScheduleTask() {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            DefaultMessageStore.this.cleanFilesPeriodically();
        }
    }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
    //忽略其餘定時任務
}

private void cleanFilesPeriodically() {
    //清理消息存儲文件
    this.cleanCommitLogService.run();
    //清理消費隊列文件
    this.cleanConsumeQueueService.run();
}

咱們這裏只關心清理消息存儲文件,即DefaultMessageStoredeleteExpiredFiles方法:

private void deleteExpiredFiles() {
    int deleteCount = 0;
    //文件保留時間,就是文件最後一次更新時間到如今的時間間隔,若是超過了這個時間間隔,就認爲能夠被清理掉了
    long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
    //刪除文件的間隔,每次清理可能不止刪除一個文件,這個配置指定兩個文件刪除之間的最小間隔
    int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
    //清理文件時,可能文件被其餘線程佔用,例如讀取消息,這時不能輕易刪除
    //在第一次觸發時,記錄一個當前時間戳,當與當前時間間隔超過這個配置以後,強制刪除
    int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

    //判斷是否要刪除的時間到了
    boolean timeup = this.isTimeToDelete();
    //判斷磁盤空間是否還充足
    boolean spacefull = this.isSpaceToDelete();
    //是不是手工觸發
    boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;

    //知足其一,就執行清理
    if (timeup || spacefull || manualDelete) {

        if (manualDelete)
            this.manualDeleteFileSeveralTimes--;
        boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
        fileReservedTime *= 60 * 60 * 1000;

        //清理文件
        deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
            destroyMapedFileIntervalForcibly, cleanAtOnce);
        if (deleteCount > 0) {
        } else if (spacefull) {
            log.warn("disk space will be full soon, but delete file failed.");
        }
    }
}

清理文件的代碼MappedFiledeleteExpiredFileByTime方法:

public int deleteExpiredFileByTime(final long expiredTime,
    final int deleteFilesInterval,
    final long intervalForcibly,
    final boolean cleanImmediately) {
    Object[] mfs = this.copyMappedFiles(0);

    if (null == mfs)
        return 0;

    //刨除最新的那個文件
    int mfsLength = mfs.length - 1;
    int deleteCount = 0;
    List<MappedFile> files = new ArrayList<MappedFile>();
    if (null != mfs) {
        for (int i = 0; i < mfsLength; i++) {
            MappedFile mappedFile = (MappedFile) mfs[i];
            long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
            //若是超過了過時時間,或者須要當即清理
            if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
                //關閉,清理並刪除文件
                if (mappedFile.destroy(intervalForcibly)) {
                    files.add(mappedFile);
                    deleteCount++;

                    if (files.size() >= DELETE_FILES_BATCH_MAX) {
                        break;
                    }

                    //若是配置了刪除文件時間間隔,則須要等待
                    if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                        try {
                            Thread.sleep(deleteFilesInterval);
                        } catch (InterruptedException e) {
                        }
                    }
                } else {
                    break;
                }
            } else {
                //avoid deleting files in the middle
                break;
            }
        }
    }

    //從文件列表裏面裏將本次刪除的文件剔除
    deleteExpiredFile(files);

    return deleteCount;
}

每日一刷,輕鬆提高技術,斬獲各類offer:

image

相關文章
相關標籤/搜索