Hbase源碼之 compact源碼(二)

compact一中介紹了HBASE compact的調度流程,本篇文章主要介紹實際進行compact的過程。先從上文中的chore中接入,在HRegionserver中的compactChecker chore方法中,會判斷是否須要compact,以下:java

protected void chore() {
      //遍歷instance下的全部online的region  進行循環檢測
      //onlineRegions是HRegionServer上存儲的全部可以提供有效服務的在線Region集合;
      for (HRegion r : this.instance.onlineRegions.values()) {
        if (r == null)
          continue;
        //取出每一個region的store
        for (Store s : r.getStores().values()) {
          try {
            //檢查是否須要compact的時間間隔,通常狀況是在好比memstore flush後或者其餘事件觸發compact的,可是有時也須要不一樣的compact策略,
            // 因此須要週期性的檢查具體間隔=hbase.server.compactchecker.interval.multiplier * hbase.server.thread.wakefrequency,默認1000;
            long multiplier = s.getCompactionCheckMultiplier();
            assert multiplier > 0;
            // 未到整數倍,跳過,每當迭代因子iteration爲合併檢查倍增器multiplier的整數倍時,纔會發起檢查
            if (iteration % multiplier != 0) continue;
            if (s.needsCompaction()) {//// 須要合併的話,發起SystemCompaction請求
              // Queue a compaction. Will recognize if major is needed.
              this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
                  + " requests compaction");
            } else if (s.isMajorCompaction()) {//若是是majorcompact會走 requestCompaction方法
              if (majorCompactPriority == DEFAULT_PRIORITY
                  || majorCompactPriority > r.getCompactPriority()) {
                this.instance.compactSplitThread.requestCompaction(r, s, getName()
                    + " requests major compaction; use default priority", null);
              } else {
                this.instance.compactSplitThread.requestCompaction(r, s, getName()
                    + " requests major compaction; use configured priority",
                  this.majorCompactPriority, null);
              }
            }
          } catch (IOException e) {
            LOG.warn("Failed major compaction check on " + r, e);
          }
        }
      }
      iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
    }

  當判斷s.needsCompaction(),則調用compactsplitThread.requstSystemCompaction()方法進行compact;若是判斷此時不須要進行compact,則會調用isMajorCompaction判斷是否須要進行major compact,若是是major compact會調用CompactSplitThread.requestCompaction()方法。不論是requestSystemCompaction方法也好,仍是requestCompaction方法也好,最終都是調用的requestCompactionInternal方法,只是方法參數不一樣。下面咱們從requestSystemCompaction開始繼續深刻了解。requestSystemCompaction的具體邏輯以下:算法

 public void requestSystemCompaction(
      final HRegion r, final Store s, final String why) throws IOException {
    requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false);
  }

  繼續跟進到requestCompactionInternal方法:shell

private synchronized CompactionRequest requestCompactionInternal(final HRegion r, final Store s,
      final String why, int priority, CompactionRequest request, boolean selectNow)
          throws IOException {
    //首選作一些必要的環境判斷,好比HRegionServer是否已中止、HRegion對應的表是否容許Compact操做
    if (this.server.isStopped()
        || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
      return null;
    }

    CompactionContext compaction = null;
    //系統自動觸發的system compaction,selectNow參數爲false,若是是hbase shell等人爲觸發的合併,則selectNow爲true
    if (selectNow) {
      // 經過hbase shell觸發的major compaction,selectNow爲true.這裏進行實際的選取待合併文件操做
      compaction = selectCompaction(r, s, priority, request);
      if (compaction == null) return null; // message logged inside
    }

    // We assume that most compactions are small. So, put system compactions into small
    // pool; we will do selection there, and move to large pool if necessary.
    // 咱們假設大部分合並都是small。因此,將系統引起的合併放進small pool,
    // 在那裏咱們會作出選擇,若是有必要的話會挪至large pool
    // 也就是說,若是selectNow爲false,即系統自身引起的合併,好比MemStore flush、compact檢查線程等,統一放入到shortCompactions中,即small pool
    // 而若是是人爲觸發的,好比HBase shell,則還要看HStore中合併請求大小是否超過閾值,超過則放入longCompactions,即large pool,不然仍是small pool

    //size爲compact的全部hfile文件總大小
    long size = selectNow ? compaction.getRequest().getSize() : 0;
    ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size))
      ? longCompactions : shortCompactions;
    pool.execute(new CompactionRunner(s, r, compaction, pool));
    if (LOG.isDebugEnabled()) {
      String type = (pool == shortCompactions) ? "Small " : "Large ";
      LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
          + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
    }
    return selectNow ? compaction.getRequest() : null;
  }

  在requestCompactionInternal方法中,邏輯過程總結以下:app

  1. 首先作一下check,好比判斷當前的regionserver是否stop,若是stop了,則直接return
  2. 判斷selectNow參數是否生效。該參數是判斷此次的compact是因爲人爲觸發仍是系統自動觸發,若是系統自動觸發爲false,好比此處。若是人爲觸發則爲true。
    1. 若是人爲觸發,則經過selectCompaction方法,選擇出實際須要進行compact的文件。
  3. 根據size以及selectNow判斷出當前compact所須要的是哪一個線程池,longcompaction仍是shortcompaction.
  4. 構造CompactionRunner丟到線程池中運行。
    1. 這裏CompactionRunner中的compaction是null,由於在判斷selectNow時,是系統自動執行,值爲false,因此不會調用selectCompaction方法爲其賦值。
    2. 因此,會在CompactionRunner中的run方法中有個判斷邏輯,從新選擇出進行compact的files

  CompactionRunner的詳細流程以下:ide

private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
    private final Store store;
    private final HRegion region;
    private CompactionContext compaction;
    private int queuedPriority;
    private ThreadPoolExecutor parent;

    public CompactionRunner(Store store, HRegion region,
        CompactionContext compaction, ThreadPoolExecutor parent) {
      super();
      this.store = store;
      this.region = region;
      this.compaction = compaction;
      // 合併排隊的優先級,若是合併上下文compaction爲空,則經過HStore的getCompactPriority()方法獲取,不然直接從合併請求中獲取,
      // 而合併請求中的,實際上也是經過調用requestCompactionInternal()方法的priority傳入的
      this.queuedPriority = (this.compaction == null)
          ? store.getCompactPriority() : compaction.getRequest().getPriority();
      this.parent = parent;
    }

    @Override
    public String toString() {
      return (this.compaction != null) ? ("Request = " + compaction.getRequest())
          : ("Store = " + store.toString() + ", pri = " + queuedPriority);
    }

    @Override
    public void run() {
      Preconditions.checkNotNull(server);
      // 首選作一些必要的環境判斷,好比HRegionServer是否已中止、HRegion對應的表是否容許Compact操做

      if (server.isStopped()
          || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
        return;
      }
      // Common case - system compaction without a file selection. Select now.
      // 常見的,系統合併尚未選擇待合併的文件。如今選擇下。

      if (this.compaction == null) {
        int oldPriority = this.queuedPriority;
        this.queuedPriority = this.store.getCompactPriority();
        // 若是當前優先級queuedPriority大於以前的oldPriority

        if (this.queuedPriority > oldPriority) {
          // Store priority decreased while we were in queue (due to some other compaction?),
          // requeue with new priority to avoid blocking potential higher priorities.
          // 將該CompactionRunner在扔回線程池

          this.parent.execute(this);
          return;
        }
        try {
          //選擇候選hfile
          this.compaction = selectCompaction(this.region, this.store, queuedPriority, null);
        } catch (IOException ex) {
          LOG.error("Compaction selection failed " + this, ex);
          server.checkFileSystem();
          return;
        }
        if (this.compaction == null) return; // nothing to do
        // Now see if we are in correct pool for the size; if not, go to the correct one.
        // We might end up waiting for a while, so cancel the selection.
        assert this.compaction.hasSelection();
        ThreadPoolExecutor pool = store.throttleCompaction(
            compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
        if (this.parent != pool) {// 換池了
          this.store.cancelRequestedCompaction(this.compaction);          // HStore取消合併請求

          this.compaction = null;          // 復位compaction爲null

          this.parent = pool;          // 換池

          this.parent.execute(this);          // 放入線程池,後續會再初始化compaction

          return;
        }
      }
      // Finally we can compact something.
      assert this.compaction != null;
      // 執行以前

      this.compaction.getRequest().beforeExecute();
      try {
        // Note: please don't put single-compaction logic here;
        //       put it into region/store/etc. This is CST logic.
        long start = EnvironmentEdgeManager.currentTime();

        // 調用HRegion的compact,針對store執行compact

        boolean completed =
            region.compact(compaction, store, compactionThroughputController);
        long now = EnvironmentEdgeManager.currentTime();
        LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
              this + "; duration=" + StringUtils.formatTimeDiff(now, start));
        if (completed) {
          // degenerate case: blocked regions require recursive enqueues
          if (store.getCompactPriority() <= 0) {
            // 若是優先級Priority小於等於0,意味着當前文件已經太多,則須要發起一次SystemCompaction

            requestSystemCompaction(region, store, "Recursive enqueue");
          } else {
            // 請求分裂,其實是看Region的大小是否超過閾值,從而引發分裂

            // see if the compaction has caused us to exceed max region size
            requestSplit(region);
          }
        }
      } catch (IOException ex) {
        IOException remoteEx = RemoteExceptionHandler.checkIOException(ex);
        LOG.error("Compaction failed " + this, remoteEx);
        if (remoteEx != ex) {
          LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
        }
        server.checkFileSystem();
      } catch (Exception ex) {
        LOG.error("Compaction failed " + this, ex);
        server.checkFileSystem();
      } finally {
        LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
      }
      this.compaction.getRequest().afterExecute();
    }

  如上所示,在CompactRunner中:post

  1. 若是參數compaction爲空
    1. 判斷優先級是否有變更,若是優先級有變更,則將該CompactRunner再扔會線程池
    2. 調用selectCompaction方法,選擇候選的hfile
    3. 經過store.throttleCOmpaction判斷用哪一個線程池,若是換池子了,則cancel compact,而後從新丟回線程池,後續會再進行初始化
  2. 調用compaction.beforeExecute()方法,作一些compact以前的操做:目前默認是一個空方法,不會作任何處理,若是加了coprocessor,則會執行相應的hook
  3. 獲取starttime
  4. 調用region的compact方法。針對store進行compact
  5. 根據compacted結果,若是compact成功,根據compact後的優先級判斷是否繼續執行一次compact或者執行split操做。
  6. 執行compaction.afterExecute方法。

  接下來咱們挨個看每一個階段具體作了啥,首先是selectCompaction方法。該方法選取要進行compact的file,並構造一個compactionContext對象返回,具體邏輯以下:ui

private CompactionContext selectCompaction(final HRegion r, final Store s,
      int priority, CompactionRequest request) throws IOException {

    // 調用HStore的requestCompaction()方法,獲取CompactionContext
    CompactionContext compaction = s.requestCompaction(priority, request);
    if (compaction == null) {
      if(LOG.isDebugEnabled()) {
        LOG.debug("Not compacting " + r.getRegionNameAsString() +
            " because compaction request was cancelled");
      }
      return null;
    }
    // 確保CompactionContext中合併請求request不爲空

    assert compaction.hasSelection();
    if (priority != Store.NO_PRIORITY) {
      compaction.getRequest().setPriority(priority);
    }
    return compaction;
  }

  可見,最終是調用store的requestCompaction方法獲取compactionContext的。繼續跟進到裏面看一下發生了啥。this

public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
      throws IOException {
    // don't even select for compaction if writes are disabled
    // 若是對應HRegion不可寫,直接返回null
    if (!this.areWritesEnabled()) {
      return null;
    }

    // Before we do compaction, try to get rid of unneeded files to simplify things.
    // 在咱們作合併以前,試着擺脫沒必要要的文件來簡化事情

    removeUnneededFiles();
    // 經過存儲引擎storeEngine建立合併上下文CompactionContext

    CompactionContext compaction = storeEngine.createCompaction();
    CompactionRequest request = null;
    // 加讀鎖

    this.lock.readLock().lock();
    try {
      synchronized (filesCompacting) {
        // First, see if coprocessor would want to override selection.
        if (this.getCoprocessorHost() != null) {
          // 經過CompactionContext的preSelect()方法,選擇StoreFile,返回StoreFilel列表

          List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
          boolean override = this.getCoprocessorHost().preCompactSelection(
              this, candidatesForCoproc, baseRequest);
          if (override) {
            // Coprocessor is overriding normal file selection.
            compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
          }
        }

        // Normal case - coprocessor is not overriding file selection.

        if (!compaction.hasSelection()) {// 若是合併請求爲空,即不存在協處理器
          // 是否爲UserCompaction

          boolean isUserCompaction = priority == Store.PRIORITY_USER;
          boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
              offPeakCompactionTracker.compareAndSet(false, true);
          try {
            // 調用CompactionContext的select()方法

            compaction.select(this.filesCompacting, isUserCompaction,
              mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
          } catch (IOException e) {
            if (mayUseOffPeak) {
              offPeakCompactionTracker.set(false);
            }
            throw e;
          }
          assert compaction.hasSelection();
          if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
            // Compaction policy doesn't want to take advantage of off-peak.
            offPeakCompactionTracker.set(false);
          }
        }
        if (this.getCoprocessorHost() != null) {
          this.getCoprocessorHost().postCompactSelection(
              this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
        }

        // Selected files; see if we have a compaction with some custom base request.
        // 若是以前傳入的請求不爲空,則合併之
        if (baseRequest != null) {
          // Update the request with what the system thinks the request should be;
          // its up to the request if it wants to listen.
          compaction.forceSelect(
              baseRequest.combineWith(compaction.getRequest()));
        }
        // Finally, we have the resulting files list. Check if we have any files at all.
        // 獲取合併請求request
        request = compaction.getRequest();
        // 從合併請求request中獲取待合併文件集合selectedFiles
        final Collection<StoreFile> selectedFiles = request.getFiles();
        if (selectedFiles.isEmpty()) {
          return null;
        }
        // 將選擇的文件集合加入到filesCompacting中,解答了以前文章的疑問

        addToCompactingFiles(selectedFiles);
        // 是否爲major合併

        // If we're enqueuing a major, clear the force flag.
        this.forceMajor = this.forceMajor && !request.isMajor();

        // Set common request properties.
        // Set priority, either override value supplied by caller or from store.
        request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
        request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
      }
    } finally {
      this.lock.readLock().unlock();
    }

    LOG.debug(getRegionInfo().getEncodedName() + " - "  + getColumnFamilyName()
        + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
        + (request.isAllFiles() ? " (all files)" : ""));
    // 調用HRegion的reportCompactionRequestStart()方法,彙報一個compact請求開始

    this.region.reportCompactionRequestStart(request.isMajor());
    // 返回合併上下文compaction

    return compaction;
  }

  咱們總結一下上面流程的邏輯過程。線程

  1. 首先試着擺脫掉沒必要要的文件簡化流程:removeUnneededFiles
  2. 經過storeEngine createCompaction()
  3. 調用compactContext.select方法選擇文件
  4. 將選出的file加入到compactContext中並返回

  下面先看下removeUnneededFiles方法,其主要是根據file的最大時間戳排除一些不必的文件,將已經expired的file加入到compactingfiles中:debug

private void removeUnneededFiles() throws IOException {
    if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return;
    if (getFamily().getMinVersions() > 0) {
      LOG.debug("Skipping expired store file removal due to min version being " +
          getFamily().getMinVersions());
      return;
    }
    this.lock.readLock().lock();
    Collection<StoreFile> delSfs = null;
    try {
      synchronized (filesCompacting) {
//獲取設置的ttl時間,若是沒設置,默認爲long.maxnium long cfTtl = getStoreFileTtl(); if (cfTtl != Long.MAX_VALUE) {//若是不是forever
//最終調用getUnneededFiles delSfs = storeEngine.getStoreFileManager().getUnneededFiles( EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting);
//將unneede以後的file加入到compactingfiles中 addToCompactingFiles(delSfs); } } } finally { this.lock.readLock().unlock(); } if (delSfs == null || delSfs.isEmpty()) return; Collection<StoreFile> newFiles = new ArrayList<StoreFile>(); // No new files. writeCompactionWalRecord(delSfs, newFiles); replaceStoreFiles(delSfs, newFiles); completeCompaction(delSfs); LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " + this + " of " + this.getRegionInfo().getRegionNameAsString() + "; total size for store is " + TraditionalBinaryPrefix.long2String(storeSize, "", 1)); }

  getUnneededFiles方法邏輯以下

public Collection<StoreFile> getUnneededFiles(long maxTs, List<StoreFile> filesCompacting) {
    Collection<StoreFile> expiredStoreFiles = null;
    ImmutableList<StoreFile> files = storefiles;
    // 1) We can never get rid of the last file which has the maximum seqid.
    // 2) Files that are not the latest can't become one due to (1), so the rest are fair game.
   for (int i = 0; i < files.size() - 1; ++i) { StoreFile sf = files.get(i); long fileTs = sf.getReader().getMaxTimestamp();
//若是文件的最大時間戳小於設置的ttl大小且不在compactingfile中 if (fileTs < maxTs && !filesCompacting.contains(sf)) { LOG.info("Found an expired store file: " + sf.getPath() + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs); if (expiredStoreFiles == null) { expiredStoreFiles = new ArrayList<StoreFile>(); } expiredStoreFiles.add(sf); } }
//返回須要排除的文件列表 return expiredStoreFiles; }  

 

走下去可見是調用compactionContext的select方法進行文件的選取

public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
        boolean mayUseOffPeak, boolean forceMajor) throws IOException {

      // 利用合併策略compactionPolicy的selectCompaction()方法,獲取合併請求request

      request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),
          filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor);

      // 返回是否獲得request的標誌,true or false

      return request != null;
    }

可見,select中,根據指定的compactpolicy策略進行selectCompaction,選取文件。咱們的線上環境沒有指定,則採用的是default的ratio,以下:

public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
      final List<StoreFile> filesCompacting, final boolean isUserCompaction,
      final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
    // Preliminary compaction subject to filters
    // 初步壓縮過濾器,即根據傳入的參數candidateFiles,建立一個候選的StoreFile列表

    ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
    // Stuck and not compacting enough (estimate). It is not guaranteed that we will be
    // able to compact more if stuck and compacting, because ratio policy excludes some
    // non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
    // 肯定futureFiles,若是filesCompacting爲空則爲0,不然爲1

    int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
//根據blockingstorefiles配置,判斷是否阻塞 boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles) >= storeConfigInfo.getBlockingFileCount(); // 從候選列表candidateSelection中排除正在合併的文件,即filesCompacting中的文件 candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting); LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " + filesCompacting.size() + " compacting, " + candidateSelection.size() + " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking"); // If we can't have all files, we cannot do major anyway // 驗證是否包含全部文件,設置標誌位isAllFiles,判斷的條件就是此時的候選列表candidateSelection大小是否等於初始的candidateFiles列表大小, // 而candidateFiles表明了Store下的所有文件 boolean isAllFiles = candidateFiles.size() == candidateSelection.size(); // 若是沒有包含全部文件,則不可能爲一個Major合併 if (!(forceMajor && isAllFiles)) { // 若是不是強制的Major合併,且不包含全部的文件,則調用skipLargeFiles()方法,跳過較大文件 candidateSelection = skipLargeFiles(candidateSelection); // 再次肯定標誌位isAllFiles isAllFiles = candidateFiles.size() == candidateSelection.size(); } // Try a major compaction if this is a user-requested major compaction, // or if we do not have too many files to compact and this was requested as a major compaction // 肯定isTryingMajor,共三種狀況: // 一、強制Major合併爲true,且包含全部問文件,且是一個用戶合併 // 二、強制Major合併,且包含全部問文件,或者自己判斷後就是一個Major合併,同時,必須是candidateSelection的數目小於配置的達到合併條件的最大文件數目 boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction) || (((forceMajor && isAllFiles) || isMajorCompaction(candidateSelection)) && (candidateSelection.size() < comConf.getMaxFilesToCompact())); // Or, if there are any references among the candidates. // candidates中存在引用的話,則視爲是在分裂後的文件 boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection); // 若是不是TryingMajor,且不是在分裂後 if (!isTryingMajor && !isAfterSplit) { // We're are not compacting all files, let's see what files are applicable // 再次篩選文件 //經過filterBulk()方法取出不該該位於Minor合併的文件; candidateSelection = filterBulk(candidateSelection); // 經過applyCompactionPolicy()方法,使用必定的算法,進行文件的篩選; candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck); //經過checkMinFilesCriteria()方法,判斷是否知足合併時最小文件數的要求; candidateSelection = checkMinFilesCriteria(candidateSelection); } // candidateSelection中移除過量的文件 candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, isTryingMajor); // Now we have the final file list, so we can determine if we can do major/all files. // 查看是否爲所有文件 isAllFiles = (candidateFiles.size() == candidateSelection.size()); // 利用candidateSelection構造合併請求CompactionRequest對象result CompactionRequest result = new CompactionRequest(candidateSelection); result.setOffPeak(!candidateSelection.isEmpty() && !isAllFiles && mayUseOffPeak); result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles); return result; }

  其中最主要的邏輯在filterbulk、applyCOmpactPolicy、checkMinFilesCriteria中,下面依次介紹。

 private ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
    candidates.removeAll(Collections2.filter(candidates,
        new Predicate<StoreFile>() {
          @Override
          public boolean apply(StoreFile input) {
            return input.excludeFromMinorCompaction();
          }
        }));
    return candidates;
  }

  在filterbulk中主要是經過hfile的fileinfo字段判斷,是否將其排除在mincompact以外。

  重要的是applyCompactionPolicy方法,該方法具體邏輯以下:

ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
      boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
    if (candidates.isEmpty()) {
      return candidates;
    }

    // we're doing a minor compaction, let's see what files are applicable
    int start = 0;
    // 獲取文件合併比例:取參數hbase.hstore.compaction.ratio,默認爲1.2

    double ratio = comConf.getCompactionRatio();
    if (mayUseOffPeak) {
      // 取參數hbase.hstore.compaction.ratio.offpeak,默認爲5.0

      ratio = comConf.getCompactionRatioOffPeak();
      LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
    }

    // get store file sizes for incremental compacting selection.
    final int countOfFiles = candidates.size();
    long[] fileSizes = new long[countOfFiles];
    long[] sumSize = new long[countOfFiles];
    for (int i = countOfFiles - 1; i >= 0; --i) {
      StoreFile file = candidates.get(i);
      fileSizes[i] = file.getReader().length();
      // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
      // tooFar表示後移動最大文件數位置的文件大小,其實也就是剛剛知足達到最大文件數位置的那個文件,
      // 也就是說,從i至tooFar數目爲合併時容許的最大文件數

      int tooFar = i + comConf.getMaxFilesToCompact() - 1;
      sumSize[i] = fileSizes[i]
        + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
        - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
    }

    // 倒序循環,若是文件數目知足最小合併時容許的最小文件數,且該位置的文件大小,
    // 大於合併時容許的文件最小大小與下一個文件窗口文件總大小乘以必定比例中的較大者,則繼續,
    // 實際上就是選擇出一個文件窗口內能最小能知足的文件大小的一組文件
    while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
      fileSizes[start] > Math.max(comConf.getMinCompactSize(),
          (long) (sumSize[start + 1] * ratio))) {
      ++start;
    }
    if (start < countOfFiles) {
      LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
        + " files from " + countOfFiles + " candidates");
    } else if (mayBeStuck) {
      // We may be stuck. Compact the latest files if we can.
      // 保證最小文件數目的要求

      int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
      if (filesToLeave >= 0) {
        start = filesToLeave;
      }
    }
    candidates.subList(0, start).clear();
    return candidates;
  }

上述過程能夠參照ratioCompactionPolicy策略,應該有大量文章介紹,此處再也不詳細介紹其過程

下面是checkMinFilesCriteria方法,判斷applyCompactionPolicy策略選出的file是否知足合併時的最小文件數要求。若是不知足要求,則直接清空candidates。

  private ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> candidates) {
    int minFiles = comConf.getMinFilesToCompact();
    if (candidates.size() < minFiles) {
      if(LOG.isDebugEnabled()) {
        LOG.debug("Not compacting files because we only have " + candidates.size() +
          " files ready for compaction. Need " + minFiles + " to initiate.");
      }
      candidates.clear();
    }
    return candidates;
  }

  選出candidatesfile後,須要經過removeExcessFiles方法判斷選出的文件數是否大於了配置中的compact.files.max參數的值。若是超過,則刪除值知足配置要求。

  最後根據candidatesfiles構造compactionRequest

  說了這麼多,都是CompactRunner run方法中的selectCompaction部分,下面是真正的執行compact的環節,該環節是經過region.compact方法執行。

public boolean compact(CompactionContext compaction, Store store,
      CompactionThroughputController throughputController) throws IOException {
    assert compaction != null && compaction.hasSelection();
    assert !compaction.getRequest().getFiles().isEmpty();
    //若是這個region正在執行close操做或者已經closed,則取消compact
    if (this.closing.get() || this.closed.get()) {
      LOG.debug("Skipping compaction on " + this + " because closing/closed");
      store.cancelRequestedCompaction(compaction);
      return false;
    }
    MonitoredTask status = null;
    boolean requestNeedsCancellation = true;
    // block waiting for the lock for compaction
    lock.readLock().lock();
    try {
      byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
      //執行一系列檢查
      if (stores.get(cf) != store) {
        LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
            + " has been re-instantiated, cancel this compaction request. "
            + " It may be caused by the roll back of split transaction");
        return false;
      }

      status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
      if (this.closed.get()) {
        String msg = "Skipping compaction on " + this + " because closed";
        LOG.debug(msg);
        status.abort(msg);
        return false;
      }
      boolean wasStateSet = false;
      try {
        synchronized (writestate) {
          if (writestate.writesEnabled) {//該狀態不許讀,默認是readonly爲false  writeEnabled爲true
              //將writestate的compacting值加一
            wasStateSet = true;
            ++writestate.compacting;
          } else {
            String msg = "NOT compacting region " + this + ". Writes disabled.";
            LOG.info(msg);
            status.abort(msg);
            return false;
          }
        }
        LOG.info("Starting compaction on " + store + " in region " + this
            + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
        doRegionCompactionPrep();
        try {
          status.setStatus("Compacting store " + store);
          // We no longer need to cancel the request on the way out of this
          // method because Store#compact will clean up unconditionally
          requestNeedsCancellation = false;
          //最終調用store的compact方法進行compact
          store.compact(compaction, throughputController);
        } catch (InterruptedIOException iioe) {
          String msg = "compaction interrupted";
          LOG.info(msg, iioe);
          status.abort(msg);
          return false;
        }
      } finally {
        if (wasStateSet) {
          synchronized (writestate) {
            --writestate.compacting;
            if (writestate.compacting <= 0) {
              writestate.notifyAll();
            }
          }
        }
      }
      status.markComplete("Compaction complete");
      return true;
    } finally {
      try {
        if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
        if (status != null) status.cleanup();
      } finally {
        lock.readLock().unlock();
      }
    }
  }

  下面是store.compact方法,該方法須要花費必定的時間,裏面調用的是compactContext的compact方法,裏面又是調用的compactor執行compact。具體邏輯待續

相關文章
相關標籤/搜索