本文主要研究一下rocketmq的CleanCommitLogServicejava
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.javagit
class CleanCommitLogService { private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20; private final double diskSpaceWarningLevelRatio = Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90")); private final double diskSpaceCleanForciblyRatio = Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85")); private long lastRedeleteTimestamp = 0; private volatile int manualDeleteFileSeveralTimes = 0; private volatile boolean cleanImmediately = false; public void excuteDeleteFilesManualy() { this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES; DefaultMessageStore.log.info("executeDeleteFilesManually was invoked"); } public void run() { try { this.deleteExpiredFiles(); this.redeleteHangedFile(); } catch (Throwable e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } } private void deleteExpiredFiles() { int deleteCount = 0; long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime(); int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval(); int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); boolean timeup = this.isTimeToDelete(); boolean spacefull = this.isSpaceToDelete(); boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; if (timeup || spacefull || manualDelete) { if (manualDelete) this.manualDeleteFileSeveralTimes--; boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately; log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", fileReservedTime, timeup, spacefull, manualDeleteFileSeveralTimes, cleanAtOnce); fileReservedTime *= 60 * 60 * 1000; deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, destroyMapedFileIntervalForcibly, cleanAtOnce); if (deleteCount > 0) { } else if (spacefull) { log.warn("disk space will be full soon, but delete file failed."); } } } private void redeleteHangedFile() { int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval(); long currentTimestamp = System.currentTimeMillis(); if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) { this.lastRedeleteTimestamp = currentTimestamp; int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) { } } } public String getServiceName() { return CleanCommitLogService.class.getSimpleName(); } private boolean isTimeToDelete() { String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen(); if (UtilAll.isItTimeToDo(when)) { DefaultMessageStore.log.info("it's time to reclaim disk space, " + when); return true; } return false; } private boolean isSpaceToDelete() { double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; cleanImmediately = false; { String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); if (physicRatio > diskSpaceWarningLevelRatio) { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); if (diskok) { DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full"); } cleanImmediately = true; } else if (physicRatio > diskSpaceCleanForciblyRatio) { cleanImmediately = true; } else { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); if (!diskok) { DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok"); } } if (physicRatio < 0 || physicRatio > ratio) { DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio); return true; } } { String storePathLogics = StorePathConfigHelper .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir()); double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics); if (logicsRatio > diskSpaceWarningLevelRatio) { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); if (diskok) { DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full"); } cleanImmediately = true; } else if (logicsRatio > diskSpaceCleanForciblyRatio) { cleanImmediately = true; } else { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); if (!diskok) { DefaultMessageStore.log.info("logics disk space OK " + logicsRatio + ", so mark disk ok"); } } if (logicsRatio < 0 || logicsRatio > ratio) { DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio); return true; } } return false; } public int getManualDeleteFileSeveralTimes() { return manualDeleteFileSeveralTimes; } public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) { this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes; } }
(currentTimestamp - this.lastRedeleteTimestamp) > interval
條件成立的狀況下會執行commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)physicRatio > diskSpaceWarningLevelRatio
時執行runningFlags.getAndMakeDiskFull()改變一下runningFlags狀態;若UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics)大於diskSpaceWarningLevelRatio也會執行runningFlags.getAndMakeDiskFull()改變一下runningFlags狀態;兩者在發現低於執行ratio時會執行runningFlags.getAndMakeDiskOK()ocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.javagithub
public class DefaultMessageStore implements MessageStore { //...... private void addScheduleTask() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { DefaultMessageStore.this.cleanFilesPeriodically(); } }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { DefaultMessageStore.this.checkSelf(); } }, 1, 10, TimeUnit.MINUTES); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) { try { if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) { long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock(); if (lockTime > 1000 && lockTime < 10000000) { String stack = UtilAll.jstack(); final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-" + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime; MixAll.string2FileNotSafe(stack, fileName); } } } catch (Exception e) { } } } }, 1, 1, TimeUnit.SECONDS); // this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // @Override // public void run() { // DefaultMessageStore.this.cleanExpiredConsumerQueue(); // } // }, 1, 1, TimeUnit.HOURS); } private void cleanFilesPeriodically() { this.cleanCommitLogService.run(); this.cleanConsumeQueueService.run(); } //...... }
CleanCommitLogService的run方法會先執行deleteExpiredFiles,而後執行redeleteHangedFile;deleteExpiredFiles方法在isTimeToDelete、isSpaceToDelete、manualDelete三個條件任意一個成立的條件下會執行commitLog.deleteExpiredFile方法apache