最低水位線是指在 WAL(Write Ahead Log)預寫日誌這種設計模式中,標記在這個位置以前的日誌能夠被丟棄。設計模式
WAL(Write Ahead Log)預寫日誌維護了對於存儲的每次更新,隨着時間不斷增加,這個日誌文件會變得無限大。Segmented Log 分割日誌這種設計模式可讓咱們每次只處理一個更小的文件,可是日誌若是不清理,會無休止增加以致於硬盤被佔滿。app
最低水位線這種設計模式會告訴系統哪一部分的日誌能夠被刪除了,即在最低水位線以前的全部日誌能夠被清理掉。通常的方式是,程序內有一個線程運行一個定時任務,不斷地檢查哪一部分的日誌能夠被清理而且刪除這些日誌文件。dom
this.logCleaner = newLogCleaner(config); this.logCleaner.startup();
這裏的 LogCleaner 能夠用定時任務實現:異步
public void startup() { scheduleLogCleaning(); } private void scheduleLogCleaning() { singleThreadedExecutor.schedule(() -> { cleanLogs(); }, config.getCleanTaskIntervalMs(), TimeUnit.MILLISECONDS); }
大部分的分佈式一致性系統(例如 Zookeeper(ZAB 簡化 paxos協議),etcd(raft協議)),都實現了快照機制。在這種機制下,他們的存儲引擎會定時的進行全量快照,而且記錄下快照對應的日誌位置,將這個位置做爲最低水位線。分佈式
//進行快照 public SnapShot takeSnapshot() { //獲取最近的日誌id Long snapShotTakenAtLogIndex = wal.getLastLogEntryId(); //利用這個日誌 id 做爲標識,生成快照 return new SnapShot(serializeState(kv), snapShotTakenAtLogIndex); }
當生成了快照併成功存儲到了磁盤上,對應的最低水位線將用來清理老的日誌:ide
//根據位置獲取這個位置以前的全部日誌文件 List<WALSegment> getSegmentsBefore(Long snapshotIndex) { List<WALSegment> markedForDeletion = new ArrayList<>(); List<WALSegment> sortedSavedSegments = wal.sortedSavedSegments; for (WALSegment sortedSavedSegment : sortedSavedSegments) { //若是這個日誌文件的最新log id 小於快照位置,證實能夠被清理掉 if (sortedSavedSegment.getLastLogEntryId() < snapshotIndex) { markedForDeletion.add(sortedSavedSegment); } } return markedForDeletion; }
定時任務位於DatadirCleanupManager
的start
方法:post
public void start() { //只啓動一次 if (PurgeTaskStatus.STARTED == purgeTaskStatus) { LOG.warn("Purge task is already running."); return; } //檢查定時間隔有效性 if (purgeInterval <= 0) { LOG.info("Purge task is not scheduled."); return; } //啓動定時任務 timer = new Timer("PurgeTask", true); TimerTask task = new PurgeTask(dataLogDir, snapDir,snapRetainCount); timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval)); purgeTaskStatus = PurgeTaskStatus.STARTED; }
核心方法爲PurgeTxnLog
的purge
方法:ui
public static void purge(File dataDir, File snapDir, int num) throws IOException { //保留的snapshot數量不能超過3 if (num < 3) { throw new IllegalArgumentException(COUNT_ERR_MSG); } FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir); //統計文件數量 List<File> snaps = txnLog.findNValidSnapshots(num); int numSnaps = snaps.size(); if (numSnaps > 0) { //利用上一個文件的日誌偏移,清理log文件和snapshot文件 purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1)); } } static void purgeOlderSnapshots(FileTxnSnapLog txnLog, File snapShot) { //名字包括開頭的zxid,就是表明了日誌位置 final long leastZxidToBeRetain = Util.getZxidFromName(snapShot.getName(), PREFIX_SNAPSHOT); final Set<File> retainedTxnLogs = new HashSet<File>(); retainedTxnLogs.addAll(Arrays.asList(txnLog.getSnapshotLogs(leastZxidToBeRetain))); class MyFileFilter implements FileFilter { private final String prefix; MyFileFilter(String prefix) { this.prefix = prefix; } public boolean accept(File f) { if (!f.getName().startsWith(prefix + ".")) { return false; } if (retainedTxnLogs.contains(f)) { return false; } long fZxid = Util.getZxidFromName(f.getName(), prefix); //根據文件名稱表明的zxid,過濾出要刪除的文件 return fZxid < leastZxidToBeRetain; } } //篩選出符合條件的 log 文件和 snapshot 文件 File[] logs = txnLog.getDataDir().listFiles(new MyFileFilter(PREFIX_LOG)); List<File> files = new ArrayList<>(); if (logs != null) { files.addAll(Arrays.asList(logs)); } File[] snapshots = txnLog.getSnapDir().listFiles(new MyFileFilter(PREFIX_SNAPSHOT)); if (snapshots != null) { files.addAll(Arrays.asList(snapshots)); } //進行刪除 for (File f : files) { final String msg = String.format( "Removing file: %s\t%s", DateFormat.getDateTimeInstance().format(f.lastModified()), f.getPath()); LOG.info(msg); System.out.println(msg); if (!f.delete()) { System.err.println("Failed to remove " + f.getPath()); } } }
那麼是何時 snapshot 呢?查看SyncRequestProcessor
的run
方法,這個方法時處理請求,處理請求的時候記錄操做日誌到 log 文件,同時在有須要進行 snapshot 的時候進行 snapshot:this
public void run() { try { //避免全部的server都同時進行snapshot resetSnapshotStats(); lastFlushTime = Time.currentElapsedTime(); while (true) { //獲取請求代碼省略 // 請求操做紀錄成功 if (!si.isThrottled() && zks.getZKDatabase().append(si)) { //是否須要snapshot if (shouldSnapshot()) { //重置是否須要snapshot判斷相關的統計 resetSnapshotStats(); //另起新文件 zks.getZKDatabase().rollLog(); //進行snapshot,先獲取鎖,保證只有一個進行中的snapshot if (!snapThreadMutex.tryAcquire()) { LOG.warn("Too busy to snap, skipping"); } else { //異步snapshot new ZooKeeperThread("Snapshot Thread") { public void run() { try { zks.takeSnapshot(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { //釋放鎖 snapThreadMutex.release(); } } }.start(); } } } //省略其餘 } } catch (Throwable t) { handleException(this.getName(), t); } }
resetSnapshotStats()
設置隨機起始位,避免集羣內全部實例同時進行 snapshot:spa
private void resetSnapshotStats() { //生成隨機roll,snapCount(默認100000) randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2); //生成隨機size,snapSizeInBytes(默認4GB) randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2)); }
shouldSnapshot()
根據啓動時設置的隨機起始位以及配置,判斷是否須要 snapshot
private boolean shouldSnapshot() { //獲取日誌計數 int logCount = zks.getZKDatabase().getTxnCount(); //獲取大小 long logSize = zks.getZKDatabase().getTxnSize(); //當日志個數大於snapCount(默認100000)/2 + 隨機roll,或者日誌大小大於snapSizeInBytes(默認4GB)/2+隨機size return (logCount > (snapCount / 2 + randRoll)) || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize)); }
``
在某些系統中,日誌不是用來更新系統的狀態,能夠在一段時間以後刪除,而且不用考慮任何子系統這個最低水位線以前的是否能夠刪除。例如,kafka 默認保留 7 天的 log,RocketMQ 默認保留 3 天的 commit log。
在 DefaultMeesageStore
的addScheduleTask()
方法中,定義了清理的定時任務:
private void addScheduleTask() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { DefaultMessageStore.this.cleanFilesPeriodically(); } }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); //忽略其餘定時任務 } private void cleanFilesPeriodically() { //清理消息存儲文件 this.cleanCommitLogService.run(); //清理消費隊列文件 this.cleanConsumeQueueService.run(); }
咱們這裏只關心清理消息存儲文件,即DefaultMessageStore
的deleteExpiredFiles
方法:
private void deleteExpiredFiles() { int deleteCount = 0; //文件保留時間,就是文件最後一次更新時間到如今的時間間隔,若是超過了這個時間間隔,就認爲能夠被清理掉了 long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime(); //刪除文件的間隔,每次清理可能不止刪除一個文件,這個配置指定兩個文件刪除之間的最小間隔 int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval(); //清理文件時,可能文件被其餘線程佔用,例如讀取消息,這時不能輕易刪除 //在第一次觸發時,記錄一個當前時間戳,當與當前時間間隔超過這個配置以後,強制刪除 int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); //判斷是否要刪除的時間到了 boolean timeup = this.isTimeToDelete(); //判斷磁盤空間是否還充足 boolean spacefull = this.isSpaceToDelete(); //是不是手工觸發 boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; //知足其一,就執行清理 if (timeup || spacefull || manualDelete) { if (manualDelete) this.manualDeleteFileSeveralTimes--; boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately; fileReservedTime *= 60 * 60 * 1000; //清理文件 deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, destroyMapedFileIntervalForcibly, cleanAtOnce); if (deleteCount > 0) { } else if (spacefull) { log.warn("disk space will be full soon, but delete file failed."); } } }
清理文件的代碼MappedFile
的deleteExpiredFileByTime
方法:
public int deleteExpiredFileByTime(final long expiredTime, final int deleteFilesInterval, final long intervalForcibly, final boolean cleanImmediately) { Object[] mfs = this.copyMappedFiles(0); if (null == mfs) return 0; //刨除最新的那個文件 int mfsLength = mfs.length - 1; int deleteCount = 0; List<MappedFile> files = new ArrayList<MappedFile>(); if (null != mfs) { for (int i = 0; i < mfsLength; i++) { MappedFile mappedFile = (MappedFile) mfs[i]; long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime; //若是超過了過時時間,或者須要當即清理 if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { //關閉,清理並刪除文件 if (mappedFile.destroy(intervalForcibly)) { files.add(mappedFile); deleteCount++; if (files.size() >= DELETE_FILES_BATCH_MAX) { break; } //若是配置了刪除文件時間間隔,則須要等待 if (deleteFilesInterval > 0 && (i + 1) < mfsLength) { try { Thread.sleep(deleteFilesInterval); } catch (InterruptedException e) { } } } else { break; } } else { //avoid deleting files in the middle break; } } } //從文件列表裏面裏將本次刪除的文件剔除 deleteExpiredFile(files); return deleteCount; }
每日一刷,輕鬆提高技術,斬獲各類offer: