hbase compact流程較多,這裏分章節介紹,首先介紹compact在regionserver中的調用流程,並不會涉及真正的compact讀取合併文件的流程,後續介紹。java
在regionserver啓動時,會初始化compactsplitthread以及CompactionChecker。ide
/* * Check for compactions requests. * 檢查合併請求 */ ScheduledChore compactionChecker; // Compactions public CompactSplitThread compactSplitThread;
compactionChecker是ScheduledChore類型,而ScheduledChore是hbase按期執行的一個task,以下所示,由註釋可知,是hbase週期性執行的一個task。在Regionserver中能夠看到flushChecker成員變量也是ScheduledChore類型的。ScheduledChore繼承自Runable,所以是一個線程,主要邏輯在其run方法中。函數
/** * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for * access to the threads in the core thread pool. If an unhandled exception occurs, the chore * cancellation is logged. Implementers should consider whether or not the Chore will be able to * execute within the defined period. It is bad practice to define a ScheduledChore whose execution * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s * thread pool. * <p> * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as * an entry being added to a queue, etc. */ //scheduledChore繼承自Runnable 因此Chore是一個線程 //1. 是hbase按期執行的一個task, 2.在它所在的線程內執行 3.提供了loop循環和sleep機制 @InterfaceAudience.Private public abstract class ScheduledChore implements Runnable { }
ScheduledChore中的比較重要和成員變量稍做說明,以下:oop
//睡眠週期 private final int period; //上一次執行改task的時間 private long timeOfLastRun = -1; //本次執行的時間 private long timeOfThisRun = -1; //該ScheduledChore是否完成初始化,在第一次執行該check時會執行,調用的是initChore()方法,該方法直接返回true,不作任何邏輯處理。 private boolean initialChoreComplete = false
其中還有一個重要的成員變量stopper,stopper是實現了Stopper接口的任意一個對象。根據註釋可知,stopper是中止ScheduledChore的一種方式,一旦chore察覺到已經stopped了,會cancel它本身。在Regionserver初始化實例化compactionChecker的時候,會將該stopper設置爲this,所以,此處覺得這當RS stop時,該chore會感知到,自動cancel其compact。具體的代碼:ui
在ScheduledChore中
/** * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been * stopped, it will cancel itself. This is particularly useful in the case where a single stopper * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)} * command can cause many chores to stop together. */ private final Stoppable stopper;
在RegionServer中
this.compactionChecker = new CompactionChecker(this,this.frequency, stopper: this)
ScheduledChore中最核心的部分,即其run方法,run()方法經過一系列的判斷 而後週期性執行chore()方法。下面咱們一行行解釋。this
public void run() { //將timeOfLastRun設置爲當前timeOfThisRun,同時將timeOfThisRun設置爲當前時間 updateTimeTrackingBeforeRun(); if (missedStartTime() && isScheduled()) { onChoreMissedStartTime(); if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " missed its start time"); } else if (stopper.isStopped() || !isScheduled()) { cancel(false); cleanup(); if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " was stopped"); } else { try { if (!initialChoreComplete) { initialChoreComplete = initialChore(); } else { chore(); } } catch (Throwable t) { if (LOG.isErrorEnabled()) LOG.error("Caught error", t); if (this.stopper.isStopped()) { cancel(false); cleanup(); } } } }
在run方法中,首先調用updateTimeTrackingBeforeRun()方法,該方法很簡單,只是簡單的update timeOfLastRun和timeOfthsiRun(這兩個變量初始化爲-1)。每次週期性執行時都會更新。spa
/** * Update our time tracking members. Called at the start of an execution of this chore's run() * method so that a correct decision can be made as to whether or not we missed the start time */ private synchronized void updateTimeTrackingBeforeRun() { timeOfLastRun = timeOfThisRun; timeOfThisRun = System.currentTimeMillis(); }
而後對時間進行判斷missedStartTime() && isScheduled(),在compact中isScheduled返回fasle。跳到else if分支,當該chore所依託的載體(此處即爲RS)stop了,該chore會自動退出。最終會進入
最後的else分支。在第一次運行時,initialChoreComplete是false,所以會執行initialChore方法,該方法直接返回true,不會作任何處理。
在一切都準備好後,會週期執行chore方法,在Regionserver中有CompactionChecker,繼承自ScheduledChore, 實現了本身的chore方法,在該方法中會根據判斷執行具體的requestCompact方法,下次介紹,
邏輯中也能夠看到,首先是判斷是否須要compact,若是須要則不會再判斷是否須要majorcompact。以下:
/* * Inner class that runs on a long period checking if regions need compaction. */ private static class CompactionChecker extends ScheduledChore { private final HRegionServer instance; private final int majorCompactPriority; private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; private long iteration = 0; CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { //調用父類的構造方法 super("CompactionChecker", stopper, sleepTime); //將載體h賦值給instance this.instance = h; LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime)); /* MajorCompactPriority is configurable. * If not set, the compaction will use default priority. */ //設置major合併優先級,取參數hbase.regionserver.compactionChecker.majorCompactPriority,默認爲Integer.MAX_VALUE this.majorCompactPriority = this.instance.conf. getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY); } //ScheduledChore的run方法會一直調用chore函數 @Override protected void chore() { //遍歷instance下的全部online的region 進行循環檢測 //onlineRegions是HRegionServer上存儲的全部可以提供有效服務的在線Region集合; for (HRegion r : this.instance.onlineRegions.values()) { if (r == null) continue; //取出每一個region的store for (Store s : r.getStores().values()) { try { //檢查是否須要compact的時間間隔,通常狀況是在好比memstore flush後或者其餘事件觸發compact的,可是有時也須要不一樣的compact策略, // 因此須要週期性的檢查具體間隔=hbase.server.compactchecker.interval.multiplier * hbase.server.thread.wakefrequency,默認1000; long multiplier = s.getCompactionCheckMultiplier(); assert multiplier > 0; // 未到整數倍,跳過,每當迭代因子iteration爲合併檢查倍增器multiplier的整數倍時,纔會發起檢查 if (iteration % multiplier != 0) continue; if (s.needsCompaction()) {//// 須要合併的話,發起SystemCompaction請求,此處最終比較的是是否當前hfile數量減去正在compacting的文件數大於設置的compact min
//值。若知足則執行systemcompact // Queue a compaction. Will recognize if major is needed. this.instance.compactSplitThread.requestSystemCompaction(r, s, getName() + " requests compaction"); } else if (s.isMajorCompaction()) { if (majorCompactPriority == DEFAULT_PRIORITY || majorCompactPriority > r.getCompactPriority()) { this.instance.compactSplitThread.requestCompaction(r, s, getName() + " requests major compaction; use default priority", null); } else { this.instance.compactSplitThread.requestCompaction(r, s, getName() + " requests major compaction; use configured priority", this.majorCompactPriority, null); } } } catch (IOException e) { LOG.warn("Failed major compaction check on " + r, e); } } } iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); } }
其中判斷是否須要compact比較簡單,主要是isMajorCompaction的判斷。最主要的邏輯以下:()線程
獲取下一次majorcompact的時間mcdebug
獲取全部須要compact的file的modify time,已獲得全部的file中最小的時間戳lowTimestamp,若是lowTimestamp<now - mc覺得這須要進行major compact了。code
若是此時只有一個file,則進行以下判斷
若是未過時,且其block的本定性不要求知足,則進行majorcompact,不然不進行major compact
若是過時,則進行major compact
public boolean isMajorCompaction(final Collection<StoreFile> filesToCompact) throws IOException { boolean result = false; //獲取下一次major compact的時間 long mcTime = getNextMajorCompactTime(filesToCompact); if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) { return result; } // TODO: Use better method for determining stamp of last major (HBASE-2990) //獲取待合併文件中modify的最小時間戳 以及當前時間 long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); long now = System.currentTimeMillis(); if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) { //lowTimestamp < (now - mcTime)即意味着當前時間位於進行major compact的時間範圍以內,要進行compact // Major compaction time has elapsed. long cfTtl = this.storeConfigInfo.getStoreFileTtl(); if (filesToCompact.size() == 1) { // Single file StoreFile sf = filesToCompact.iterator().next(); Long minTimestamp = sf.getMinimumTimestamp(); //文件存留時間oldest long oldest = (minTimestamp == null) ? Long.MIN_VALUE : now - minTimestamp.longValue(); if (sf.isMajorCompaction() && (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { float blockLocalityIndex = sf.getHDFSBlockDistribution().getBlockLocalityIndex( RSRpcServices.getHostname(comConf.conf, false) ); if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) { if (LOG.isDebugEnabled()) { LOG.debug("Major compaction triggered on only store " + this + "; to make hdfs blocks local, current blockLocalityIndex is " + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")"); } result = true; } else { if (LOG.isDebugEnabled()) { LOG.debug("Skipping major compaction of " + this + " because one (major) compacted file only, oldestTime " + oldest + "ms is < ttl=" + cfTtl + " and blockLocalityIndex is " + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")"); } } } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {////只有一個hfile(最先的ts>ttl)整個文件過時 => 進行marjor compact LOG.debug("Major compaction triggered on store " + this + ", because keyvalues outdated; time since last major compaction " + (now - lowTimestamp) + "ms"); result = true; } } else { if (LOG.isDebugEnabled()) { LOG.debug("Major compaction triggered on store " + this + "; time since last major compaction " + (now - lowTimestamp) + "ms"); } result = true; } } return result;}