hbase源碼之 compact源碼(一)

hbase compact流程較多,這裏分章節介紹,首先介紹compact在regionserver中的調用流程,並不會涉及真正的compact讀取合併文件的流程,後續介紹。java

在regionserver啓動時,會初始化compactsplitthread以及CompactionChecker。ide

/*
   * Check for compactions requests.
   * 檢查合併請求
   */
  ScheduledChore compactionChecker;

  // Compactions
  public CompactSplitThread compactSplitThread;
   其中compactsplitthread是用來實現時機的compact以及split流程的類,而compactchecker是用來週期性檢查是否執行compact的。如下首先介紹compactchecker線程。

  compactionChecker是ScheduledChore類型,而ScheduledChore是hbase按期執行的一個task,以下所示,由註釋可知,是hbase週期性執行的一個task。在Regionserver中能夠看到flushChecker成員變量也是ScheduledChore類型的。ScheduledChore繼承自Runable,所以是一個線程,主要邏輯在其run方法中。函數

/**
 * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once
 * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The
 * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for
 * access to the threads in the core thread pool. If an unhandled exception occurs, the chore
 * cancellation is logged. Implementers should consider whether or not the Chore will be able to
 * execute within the defined period. It is bad practice to define a ScheduledChore whose execution
 * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s
 * thread pool.
 * <p>
 * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as
 * an entry being added to a queue, etc.
 */
//scheduledChore繼承自Runnable  因此Chore是一個線程
//1. 是hbase按期執行的一個task, 2.在它所在的線程內執行  3.提供了loop循環和sleep機制
@InterfaceAudience.Private
public abstract class ScheduledChore implements Runnable {
}

  ScheduledChore中的比較重要和成員變量稍做說明,以下:oop

 //睡眠週期
  private final int period;
//上一次執行改task的時間
  private long timeOfLastRun = -1;
//本次執行的時間
  private long timeOfThisRun = -1;
//該ScheduledChore是否完成初始化,在第一次執行該check時會執行,調用的是initChore()方法,該方法直接返回true,不作任何邏輯處理。
  private boolean initialChoreComplete = false

  其中還有一個重要的成員變量stopper,stopper是實現了Stopper接口的任意一個對象。根據註釋可知,stopper是中止ScheduledChore的一種方式,一旦chore察覺到已經stopped了,會cancel它本身。在Regionserver初始化實例化compactionChecker的時候,會將該stopper設置爲this,所以,此處覺得這當RS stop時,該chore會感知到,自動cancel其compact。具體的代碼:ui

在ScheduledChore中
/** * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been * stopped, it will cancel itself. This is particularly useful in the case where a single stopper * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)} * command can cause many chores to stop together. */ private final Stoppable stopper;

在RegionServer中
this.compactionChecker = new CompactionChecker(this,this.frequency, stopper: this)

 

 

  ScheduledChore中最核心的部分,即其run方法,run()方法經過一系列的判斷 而後週期性執行chore()方法。下面咱們一行行解釋。this

public void run() {
    //將timeOfLastRun設置爲當前timeOfThisRun,同時將timeOfThisRun設置爲當前時間
    updateTimeTrackingBeforeRun();
    if (missedStartTime() && isScheduled()) {
      onChoreMissedStartTime();
      if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " missed its start time");
    } else if (stopper.isStopped() || !isScheduled()) {
      cancel(false);
      cleanup();
      if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " was stopped");
    } else {
      try {
        if (!initialChoreComplete) {
          initialChoreComplete = initialChore();
        } else {
          chore();
        }
      } catch (Throwable t) {
        if (LOG.isErrorEnabled()) LOG.error("Caught error", t);
        if (this.stopper.isStopped()) {
          cancel(false);
          cleanup();
        }
      }
    }
  }

  在run方法中,首先調用updateTimeTrackingBeforeRun()方法,該方法很簡單,只是簡單的update timeOfLastRun和timeOfthsiRun(這兩個變量初始化爲-1)。每次週期性執行時都會更新。spa

/**
   * Update our time tracking members. Called at the start of an execution of this chore's run()
   * method so that a correct decision can be made as to whether or not we missed the start time
   */
  private synchronized void updateTimeTrackingBeforeRun() {
    timeOfLastRun = timeOfThisRun;
    timeOfThisRun = System.currentTimeMillis();
  }
  而後對時間進行判斷missedStartTime() && isScheduled(),在compact中isScheduled返回fasle。跳到else if分支,當該chore所依託的載體(此處即爲RS)stop了,該chore會自動退出。最終會進入
最後的else分支。在第一次運行時,initialChoreComplete是false,所以會執行initialChore方法,該方法直接返回true,不會作任何處理。
  在一切都準備好後,會週期執行chore方法,在Regionserver中有CompactionChecker,繼承自ScheduledChore, 實現了本身的chore方法,在該方法中會根據判斷執行具體的requestCompact方法,下次介紹,
邏輯中也能夠看到,首先是判斷是否須要compact,若是須要則不會再判斷是否須要majorcompact。以下
/*
   * Inner class that runs on a long period checking if regions need compaction.
   */
  private static class CompactionChecker extends ScheduledChore {
    private final HRegionServer instance;
    private final int majorCompactPriority;
    private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
    private long iteration = 0;

    CompactionChecker(final HRegionServer h, final int sleepTime,
        final Stoppable stopper) {
      //調用父類的構造方法
      super("CompactionChecker", stopper, sleepTime);
      //將載體h賦值給instance
      this.instance = h;
      LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));

      /* MajorCompactPriority is configurable.
       * If not set, the compaction will use default priority.
       */
      //設置major合併優先級,取參數hbase.regionserver.compactionChecker.majorCompactPriority,默認爲Integer.MAX_VALUE
      this.majorCompactPriority = this.instance.conf.
        getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
        DEFAULT_PRIORITY);
    }

    //ScheduledChore的run方法會一直調用chore函數
    @Override
    protected void chore() {
      //遍歷instance下的全部online的region  進行循環檢測
      //onlineRegions是HRegionServer上存儲的全部可以提供有效服務的在線Region集合;
      for (HRegion r : this.instance.onlineRegions.values()) {
        if (r == null)
          continue;
        //取出每一個region的store
        for (Store s : r.getStores().values()) {
          try {
            //檢查是否須要compact的時間間隔,通常狀況是在好比memstore flush後或者其餘事件觸發compact的,可是有時也須要不一樣的compact策略,
            // 因此須要週期性的檢查具體間隔=hbase.server.compactchecker.interval.multiplier * hbase.server.thread.wakefrequency,默認1000;
            long multiplier = s.getCompactionCheckMultiplier();
            assert multiplier > 0;
            // 未到整數倍,跳過,每當迭代因子iteration爲合併檢查倍增器multiplier的整數倍時,纔會發起檢查
            if (iteration % multiplier != 0) continue;
            if (s.needsCompaction()) {//// 須要合併的話,發起SystemCompaction請求,此處最終比較的是是否當前hfile數量減去正在compacting的文件數大於設置的compact min
                      //值。若知足則執行systemcompact // Queue a compaction. Will recognize if major is needed. this.instance.compactSplitThread.requestSystemCompaction(r, s, getName() + " requests compaction"); } else if (s.isMajorCompaction()) { if (majorCompactPriority == DEFAULT_PRIORITY || majorCompactPriority > r.getCompactPriority()) { this.instance.compactSplitThread.requestCompaction(r, s, getName() + " requests major compaction; use default priority", null); } else { this.instance.compactSplitThread.requestCompaction(r, s, getName() + " requests major compaction; use configured priority", this.majorCompactPriority, null); } } } catch (IOException e) { LOG.warn("Failed major compaction check on " + r, e); } } } iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); } }

  其中判斷是否須要compact比較簡單,主要是isMajorCompaction的判斷。最主要的邏輯以下:()線程

    獲取下一次majorcompact的時間mcdebug

    獲取全部須要compact的file的modify time,已獲得全部的file中最小的時間戳lowTimestamp,若是lowTimestamp<now - mc覺得這須要進行major compact了。code

      若是此時只有一個file,則進行以下判斷

        若是未過時,且其block的本定性不要求知足,則進行majorcompact,不然不進行major compact

        若是過時,則進行major compact

public boolean isMajorCompaction(final Collection<StoreFile> filesToCompact)    throws IOException {  boolean result = false;  //獲取下一次major compact的時間  long mcTime = getNextMajorCompactTime(filesToCompact);  if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {    return result;  }  // TODO: Use better method for determining stamp of last major (HBASE-2990)  //獲取待合併文件中modify的最小時間戳 以及當前時間  long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);  long now = System.currentTimeMillis();  if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {    //lowTimestamp < (now - mcTime)即意味着當前時間位於進行major compact的時間範圍以內,要進行compact    // Major compaction time has elapsed.    long cfTtl = this.storeConfigInfo.getStoreFileTtl();    if (filesToCompact.size() == 1) {      // Single file      StoreFile sf = filesToCompact.iterator().next();      Long minTimestamp = sf.getMinimumTimestamp();      //文件存留時間oldest      long oldest = (minTimestamp == null)          ? Long.MIN_VALUE          : now - minTimestamp.longValue();      if (sf.isMajorCompaction() &&          (cfTtl == HConstants.FOREVER || oldest < cfTtl)) {        float blockLocalityIndex = sf.getHDFSBlockDistribution().getBlockLocalityIndex(            RSRpcServices.getHostname(comConf.conf, false)        );        if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {          if (LOG.isDebugEnabled()) {            LOG.debug("Major compaction triggered on only store " + this +                "; to make hdfs blocks local, current blockLocalityIndex is " +                blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +                ")");          }          result = true;        } else {          if (LOG.isDebugEnabled()) {            LOG.debug("Skipping major compaction of " + this +                " because one (major) compacted file only, oldestTime " +                oldest + "ms is < ttl=" + cfTtl + " and blockLocalityIndex is " +                blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +                ")");          }        }      } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {////只有一個hfile(最先的ts>ttl)整個文件過時 => 進行marjor compact        LOG.debug("Major compaction triggered on store " + this +          ", because keyvalues outdated; time since last major compaction " +          (now - lowTimestamp) + "ms");        result = true;      }    } else {      if (LOG.isDebugEnabled()) {        LOG.debug("Major compaction triggered on store " + this +            "; time since last major compaction " + (now - lowTimestamp) + "ms");      }      result = true;    }  }  return result;}
相關文章
相關標籤/搜索