HBase源代碼分析之HRegionServer上MemStore的flush處理流程(一)

        在《HBase源代碼分析之HRegion上MemStore的flsuh流程(一)》《HBase源代碼分析之HRegion上MemStore的flsuh流程(二)》等文中。咱們介紹了HRegion上Memstore flush的主體流程和主要細節。java

但是,HRegion僅僅是HBase表中依照行的方向對一片連續的數據區域的抽象,它並不能對外提供單獨的服務,供client或者HBase其餘實體調用。而HRegion上MemStore的flush仍是要經過HRegionServer來對外提供服務的。如下,咱們就具體探究下HRegionServer上是怎樣實現這點的。數組

        在HRegionServer中。有一個叫作cacheFlusher的東東。它是什麼呢?咱們先看一下它是怎樣被定義的:緩存

  // Cache flushing
  // memstore內存刷新管理對象
  protected MemStoreFlusher cacheFlusher;
        可以發現,cacheFlusher是MemStoreFlusher類型的一個對象,咱們來看下類的凝視及定義:
/**
 * Thread that flushes cache on request
 * 處理刷新緩存請求的線程
 *
 * NOTE: This class extends Thread rather than Chore because the sleep time
 * can be interrupted when there is something to do, rather than the Chore
 * sleep time which is invariant.
 *
 * @see FlushRequester
 */
@InterfaceAudience.Private
class MemStoreFlusher implements FlushRequester {
        cacheFlusher實際上就是HRegionServer上處理刷新緩存請求的線程。那麼接下來的問題就是,cacheFlusher是怎樣被初始化的?它又是怎樣處理flush請求的?帶着這兩個問題,咱們繼續本文。

        

        1、怎樣初始化cacheFlusher服務器

        首先,咱們發現HRegionServer繼承自HasThread,而HasThread實現了Runnable接口,那麼在其內部確定會運行run()方法,而run()方法的開始,有例如如下代碼:
數據結構

try {
      // Do pre-registration initializations; zookeeper, lease threads, etc.
      preRegistrationInitialization();
    } catch (Throwable e) {
      abort("Fatal exception during initialization", e);
    }
        繼續追蹤preRegistrationInitialization()方法,在其內部。調用了initializeThreads()方法,例如如下:
if (!isStopped() && !isAborted()) {
        initializeThreads();
      }
        而這個initializeThreads()方法,作的主要工做就是初始化HRegionServer內部的各類工做線程。當中就包含cacheFlusher,代碼例如如下:

// Cache flushing thread.
	// 緩存刷新線程
    this.cacheFlusher = new MemStoreFlusher(conf, this);
        接下來。咱們在看看這個MemStoreFlusher類是怎樣定義及工做的。首先看下它最基本的幾個成員變量:

        首當其衝的即是flushQueue。其定義例如如下:多線程

private final BlockingQueue<FlushQueueEntry> flushQueue =
    new DelayQueue<FlushQueueEntry>();
        flushQueue是MemStoreFlusher中很重要的一個變量,它是一個存儲了Region刷新緩存請求的隊列。

而與flushQueue同一時候被更新的是regionsInQueue,它存儲的是HRegion到FlushRegionEntry映射關係的集合,FlushRegionEntry是對發起memstore刷新請求的HRegion的一個封裝。不只包括了HRegion實例,還包括HRegion刷新memstore請求的產生時間,到期時間,以及一種相似續約的處理方式,即延長該請求的到期時間等。regionsInQueue的定義例如如下: 併發

private final Map<HRegion, FlushRegionEntry> regionsInQueue =
    new HashMap<HRegion, FlushRegionEntry>();
        flushQueue和regionsInQueue的更新是同步的,即假設在flushQueue中增長或刪除一條記錄,那麼在regionsInQueue中也會同步增長或刪除一條記錄。


        接下來比較重要的即是flushHandlers。它是FlushHandler類型的一個數組,定義例如如下:app

private final FlushHandler[] flushHandlers;

        FlushHandler是什麼呢?它是處理緩存刷新的線程類,線程一旦啓動後,在其run()方法內,就會不停的從flushQueue隊列中拉取flush請求進行處理。其類的定義例如如下:ide

  /**
   * 處理緩存刷新的線程類
   */
  private class FlushHandler extends HasThread {

        以上就是MemStoreFlusher內運行flush流程最重要的幾個成員變量,其它的變量都是一些輔助性的,這裏再也不作具體介紹。函數

        如下,咱們來看下MemStoreFlusher的構造及成員變量的初始化,構造函數例如如下:

  /**
   * @param conf
   * @param server
   */
  public MemStoreFlusher(final Configuration conf,
      final HRegionServer server) {
    super();
    
    // 賦值RegionServer實例server
    this.server = server;
    
    // 線程喚醒頻率threadWakeFrequency,取參數hbase.server.thread.wakefrequency配置的值。默以爲10s,即線程的工做頻率
    this.threadWakeFrequency =
      conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
    
    // 獲取最大可用堆內存max
    long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
    
    // 獲取全局memstore所佔堆內存的百分比globalMemStorePercent
    float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
    
    // 獲取全局memstore限制大小值globalMemStoreLimit
    this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
    
    // 獲取全局memstore限制大小值的低水平線百分比globalMemStoreLimitLowMarkPercent
    this.globalMemStoreLimitLowMarkPercent = 
        HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
    
    // 獲取全局memstore限制大小值的低水平線globalMemStoreLimitLowMark
    this.globalMemStoreLimitLowMark = 
        (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);

    // 獲取堵塞等待時間blockingWaitTime,取參數hbase.hstore.blockingWaitTime,默以爲90000
    this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
      90000);
    
    // 獲取flush處理線程數目handlerCount,取參數hbase.hstore.flusher.count,默以爲2
    int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
    
    // 構造handlerCount個flush處理線程數組。默以爲2個。可經過hbase.hstore.flusher.count設置
    this.flushHandlers = new FlushHandler[handlerCount];
    
    // 記錄日誌信息
    LOG.info("globalMemStoreLimit=" +
      StringUtils.humanReadableInt(this.globalMemStoreLimit) +
      ", globalMemStoreLimitLowMark=" +
      StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) +
      ", maxHeap=" + StringUtils.humanReadableInt(max));
  }
        MemStoreFlusher的構造函數比較簡單。咱們重點分析下獲取全局memstore所佔堆內存的百分比globalMemStorePercent的HeapMemorySizeUtil類的getGlobalMemStorePercent()方法。和獲取全局memstore限制大小值的低水平線百分比globalMemStoreLimitLowMarkPercent的HeapMemorySizeUtil類的getGlobalMemStoreLowerMark()方法。

        首先,看下獲取全局memstore所佔堆內存的百分比globalMemStorePercent的HeapMemorySizeUtil類的getGlobalMemStorePercent()方法,代碼例如如下:

/**
   * Retrieve global memstore configured size as percentage of total heap.
   * 獲取配置的全局memstore佔整個heap內存的百分比
   * @param c
   * @param logInvalid
   */
  public static float getGlobalMemStorePercent(final Configuration c, final boolean logInvalid) {
    
	// 獲取全局memstore的大小。優先取參數hbase.regionserver.global.memstore.size,
	// 未配置的話再取參數hbase.regionserver.global.memstore.upperLimit,
	// 假設還未配置的話,默以爲0.4
	float limit = c.getFloat(MEMSTORE_SIZE_KEY,
        c.getFloat(MEMSTORE_SIZE_OLD_KEY, DEFAULT_MEMSTORE_SIZE));
    
	// 假設limit的值在區間(0,0.8]以外的話
	if (limit > 0.8f || limit <= 0.0f) {
      if (logInvalid) {// 依據參數logInvalid肯定是否記錄警告日誌
        LOG.warn("Setting global memstore limit to default of " + DEFAULT_MEMSTORE_SIZE
            + " because supplied value outside allowed range of (0 -> 0.8]");
      }
      
      // 將limit設置爲0.4
      limit = DEFAULT_MEMSTORE_SIZE;
    }
	
	// 返回limit
    return limit;
  }
        這種方法的主要做用就是獲取配置的全局memstore佔整個heap內存的百分比。獲取的邏輯例如如下:

        一、獲取配置的全局memstore佔整個heap內存的百分比limit:優先取參數hbase.regionserver.global.memstore.size,未配置的話再取參數hbase.regionserver.global.memstore.upperLimit,假設還未配置的話,默以爲0.4;

        二、推斷limit是否在區間(0,0.8]以外,依據參數logInvalid肯定是否記錄警告日誌,並將limit設置爲默認值0.4;

        三、返回limit。

        如下,咱們再看下獲取全局memstore限制大小值的低水平線百分比globalMemStoreLimitLowMarkPercent的HeapMemorySizeUtil類的getGlobalMemStoreLowerMark()方法,代碼例如如下:

 /**
   * Retrieve configured size for global memstore lower water mark as percentage of total heap.
   * 獲取配置的全局memstore內存佔全部heap內存的低水平線百分比
   * @param c
   * @param globalMemStorePercent
   */
  public static float getGlobalMemStoreLowerMark(final Configuration c, float globalMemStorePercent) {
    
	// 取新參數hbase.regionserver.global.memstore.size.lower.limit  
	String lowMarkPercentStr = c.get(MEMSTORE_SIZE_LOWER_LIMIT_KEY);
	
	// 假設新參數配置了的話。直接轉化爲double並返回
    if (lowMarkPercentStr != null) {
      return Float.parseFloat(lowMarkPercentStr);
    }
    
    // 取舊參數hbase.regionserver.global.memstore.lowerLimit"
    String lowerWaterMarkOldValStr = c.get(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY);
    
    // 假設舊參數配置的話,記錄警告日誌信息
    if (lowerWaterMarkOldValStr != null) {
      LOG.warn(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " is deprecated. Instead use "
          + MEMSTORE_SIZE_LOWER_LIMIT_KEY);
      
      // 轉化爲double類型lowerWaterMarkOldVal
      float lowerWaterMarkOldVal = Float.parseFloat(lowerWaterMarkOldValStr);
      
      // 假設參數值大於計算獲得的全局memstore所佔堆內存的百分比,賦值爲globalMemStorePercent。並記錄日誌信息
      if (lowerWaterMarkOldVal > globalMemStorePercent) {
        lowerWaterMarkOldVal = globalMemStorePercent;
        LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " + "because supplied "
            + MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " was > " + MEMSTORE_SIZE_OLD_KEY);
      }
      
      // 返回lowerWaterMarkOldVal / globalMemStorePercent
      return lowerWaterMarkOldVal / globalMemStorePercent;
    }
    
    // 假設新舊參數均未配置的話,默以爲0.95
    return DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT;
  }
        這種方法的主要做用就是獲取配置的全局memstore內存佔全部heap內存的低水平線百分比。

獲取的邏輯例如如下:

        一、取新參數hbase.regionserver.global.memstore.size.lower.limit配置的值,假設新參數配置了的話,直接轉化爲double並返回;

        二、假設新參數未配置的話,取舊參數hbase.regionserver.global.memstore.lowerLimit配置的值,假設舊參數配置的話,記錄警告日誌信息,並:

              2.一、將舊參數配置的值轉化爲double類型lowerWaterMarkOldVal;

              2.二、假設舊參數值大於計算獲得的全局memstore所佔堆內存的百分比,賦值爲globalMemStorePercent,並記錄日誌信息;

              2.三、返回lowerWaterMarkOldVal / globalMemStorePercent。

        三、假設新舊參數均未配置的話,默以爲0.95。

       

        2、cacheFlusher怎樣處理flush請求

        經過怎樣初始化cacheFlusher部分的介紹,咱們已經知道,在MemStoreFlusher內部,存在兩個存儲flush請求及其HRegion封裝類的隊列和集合。即flushQueue和regionsInQueue,而MemStoreFlusher對外提供了一個requestFlush()方法。咱們大致看下這種方法:

/**
   * 請求刷新,
   * 即將需要刷新MemStore的HRegion放置到regionsInQueue中,
   * 同一時候依據HRegion構造FlushRegionEntry實例。加入到flushQueue中
   */
  public void requestFlush(HRegion r) {
    synchronized (regionsInQueue) {// 使用synchronizedkeyword對regionsInQueue進行線程同步
      
      if (!regionsInQueue.containsKey(r)) {// 假設regionsInQueue中不存在相應HRegion
    	
        // This entry has no delay so it will be added at the top of the flush
        // queue.  It'll come out near immediately.
    	// 將HRegion類型的r封裝成FlushRegionEntry類型的fqe
    	// 這個fqe沒有delay,即延遲運行時間,因此它被加入到flush隊列的頂部。不久它將出列被處理。
        FlushRegionEntry fqe = new FlushRegionEntry(r);
        
        // 將HRegion->FlushRegionEntry的相應關係加入到regionsInQueue集合
        // 將flush請求FlushRegionEntry加入到flushQueue隊列
        // 從這裏可以看出regionsInQueue、flushQueue這兩個成員變量go together
        this.regionsInQueue.put(r, fqe);
        this.flushQueue.add(fqe);
      }
    }
  }
        requestFlush()方法的主要做用,就是加入一個flush region的請求至MemStoreFlusher內部隊列。其主要邏輯例如如下:

        一、首先需要使用synchronizedkeyword對regionsInQueue進行線程同步,這麼作是爲了防止多線程的併發。

        二、而後推斷regionsInQueue中是否存在相應的HRegion,假設regionsInQueue集合中不存在相應HRegion的話繼續,不然直接返回;

        三、既然regionsInQueue集合中不存在相應HRegion,將HRegion類型的r封裝成FlushRegionEntry類型的fqe;

        四、將HRegion->FlushRegionEntry的相應關係加入到regionsInQueue集合;

        五、將flush請求FlushRegionEntry加入到flushQueue隊列。

        從上述四、5步就可以看出regionsInQueue、flushQueue這兩個成員變量go together。並且這個fqe沒有delay,即延遲運行時間,因此它被加入到flush隊列的頂部。不久它將出列被處理。

這個該怎麼理解呢?咱們仍是回到flushQueue的定義,flushQueue是一個存儲了Region刷新緩存請求的隊列,裏面存儲的是實現了FlushQueueEntry接口的對象。FlushQueueEntry未定義不論什麼行爲。但是繼承了java.util.concurrent.Delayed接口,故flushQueue是java中的DelayQueue,隊列裏存儲的對象有一個過時時間的概念。

        既然flush的請求已經被加入至flushQueue隊列,至關於生產者已經把產品生產出來了,那麼誰來消費呢?這個消費者的角色就是由FlushHandler線程來擔任的。既然是線程,那麼處理的邏輯確定在其run()方法內,但是在研究其run()方法前,咱們先看下flushQueue中存儲的都是什麼東西?

        咱們再回想下flushQueue的定義。它是一個存儲了FlushQueueEntry的隊列DelayQueue。咱們先看下FlushQueueEntry的定義:

interface FlushQueueEntry extends Delayed {
  }
        一個集成了java的Delayed接口的無不論什麼方法的空接口而已,那麼它都有哪些實現類呢?答案就是WakeupFlushThread和FlushRegionEntry。在介紹這兩者以前。咱們首先介紹下flushQueue相應的隊列類型---Java中的DelayQueue。

        衆所周知,DelayQueue是一個無界的BlockingQueue,其內部存儲的一定是實現了Delayed接口的對象。因此,FlushQueueEntry必須實現java的Delayed接口。

而這樣的隊列中的成員有一個最大特色,就是僅僅有在其到期後才幹出列,並且該隊列內的成員都是有序的。從頭到尾依照延遲到期時間的長短來排序。

那麼怎樣推斷成員是否到期呢?相應成員對象的getDelay()方法返回一個小於等於0的值,就說明相應對象在隊列中已到期,可以被取走。

        既然DelayQueue中存儲的成員對象都是有序的,那麼實現了Delayed接口的類,必須提供compareTo()方法。用以排序。並且需要實現上述getDelay()方法,推斷隊內成員是否到期可以被取走。

        接下來,咱們分別來研究下WakeupFlushThread和FlushRegionEntry。

        首先。WakeupFlushThread很easy,沒有不論什麼實質內容,代碼例如如下:

/**
   * Token to insert into the flush queue that ensures that the flusher does not sleep
   * 增長到刷新隊列的確保刷新器不睡眠的令牌
   */
  static class WakeupFlushThread implements FlushQueueEntry {
    @Override
    public long getDelay(TimeUnit unit) {
      return 0;
    }

    @Override
    public int compareTo(Delayed o) {
      return -1;
    }

    @Override
    public boolean equals(Object obj) {
      return (this == obj);
    }
  }
        它的主要做用是作爲一個佔位符或令牌插入到刷新隊列flushQueue,以確保FlushHandler不會休眠。而且,其getDelay()方法返回值爲0,說明其不存在延遲時間。入列後就能夠出列。而它的compareTo()方法返回的值是-1。說明它與其餘WakeupFlushThread在隊內的順序是等價的,無先後之分。實際上WakeupFlushThread區分先後也沒有意義,它自己也沒有實質性的內容。

        接下來。咱們再看下FlushRegionEntry類,其定義例如如下:

/**
   * Datastructure used in the flush queue.  Holds region and retry count.
   * Keeps tabs on how old this object is.  Implements {@link Delayed}.  On
   * construction, the delay is zero. When added to a delay queue, we'll come
   * out near immediately.  Call {@link #requeue(long)} passing delay in
   * milliseconds before readding to delay queue if you want it to stay there
   * a while.
   * 
   * 用在刷新隊列裏的數據結構。

保存region和重試次數。

* 跟蹤對象多大(ps.即時間) * 實現了java的Delayed接口。 * 在構造方法裏。delay爲0。 * 假設你想要它在隊列中保持在在被又一次增長delay隊列以前 * * */ static class FlushRegionEntry implements FlushQueueEntry { // 待flush的HRegion private final HRegion region; // 建立時間 private final long createTime; // 什麼時候到期 private long whenToExpire; // 重入隊列次數 private int requeueCount = 0; FlushRegionEntry(final HRegion r) { // 待flush的HRegion this.region = r; // 建立時間爲當前時間 this.createTime = EnvironmentEdgeManager.currentTime(); // 什麼時候到期也爲當前時間。意味着首次入隊列時是沒有延遲時間的。入列就能夠出列 this.whenToExpire = this.createTime; } /** * @param maximumWait * @return True if we have been delayed > <code>maximumWait</code> milliseconds. */ public boolean isMaximumWait(final long maximumWait) { return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait; } /** * @return Count of times {@link #requeue(long)} was called; i.e this is * number of times we've been requeued. */ public int getRequeueCount() { return this.requeueCount; } /** * 類似又一次入列的處理方法,又一次入列次數requeueCount加1。什麼時候到期未當前時間加參數when * * @param when When to expire, when to come up out of the queue. * Specify in milliseconds. This method adds EnvironmentEdgeManager.currentTime() * to whatever you pass. * @return This. */ public FlushRegionEntry requeue(final long when) { this.whenToExpire = EnvironmentEdgeManager.currentTime() + when; this.requeueCount++; return this; } /** * 推斷什麼時候到期的方法 */ @Override public long getDelay(TimeUnit unit) { // 什麼時候到期減去當前時間 return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(), TimeUnit.MILLISECONDS); } /** * 排序比較方法。依據推斷什麼時候到期的getDelay()方法來決定順序 */ @Override public int compareTo(Delayed other) { // Delay is compared first. If there is a tie, compare region's hash code int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS)).intValue(); if (ret != 0) { return ret; } // 什麼時候到期時間一直的話,依據hashCode()來排序。事實上也就是依據HRegion的hashCode()方法返回值來排序 FlushQueueEntry otherEntry = (FlushQueueEntry) other; return hashCode() - otherEntry.hashCode(); } @Override public String toString() { return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]"; } @Override public int hashCode() { int hash = (int) getDelay(TimeUnit.MILLISECONDS); return hash ^ region.hashCode(); } @Override public boolean equals(Object obj) { if (this == obj) { return true; } if (obj == null || getClass() != obj.getClass()) { return false; } Delayed other = (Delayed) obj; return compareTo(other) == 0; } } }

        FlushRegionEntry類有幾個很是重要的對像:

        一、HRegion region:待flush的HRegion;

        二、long createTime:建立時間。

        三、long whenToExpire:什麼時候到期;

        四、int requeueCount = 0:重入隊列次數。

        而它的對象在初始化時。建立時間createTime設置爲當前時間。什麼時候到期whenToExpire也爲當前時間,它推斷是否到期的getDelay()方法爲什麼時到期減去當前時間,也就意味着首次入隊列時是沒有延遲時間的,入列就能夠出列。

另外,它在隊列內部用於排序的compareTo()方法。也是首先依據推斷什麼時候到期的getDelay()方法來決定順序,什麼時候到期時間一致的話,依據hashCode()來排序,事實上也就是依據HRegion的hashCode()方法返回值來排序。比較特別的是,這個類還提供了相似又一次入列的處理方法,又一次入列次數requeueCount加1。什麼時候到期未當前時間加參數when,那麼就至關於延期的了when時間變量。

        說了那麼多。接下來咱們看下flush請求的實際處理流程,即FlushHandler的run()方法,其代碼爲:

@Override
    public void run() {
      while (!server.isStopped()) {// HRegionServer未中止的話。run()方法一直執行
        FlushQueueEntry fqe = null;
        
        try {
          
        	// 標誌位AtomicBoolean類型的wakeupPending設置爲false
          wakeupPending.set(false); // allow someone to wake us up again
          
          // 從flushQueue隊列中拉取一個FlushQueueEntry,即fqe
          fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
          
          if (fqe == null || fqe instanceof WakeupFlushThread) {// 假設fqe爲空,或者爲WakeupFlushThread
          
            if (isAboveLowWaterMark()) {
              // 由於內存高於低閾值。flush線程喚醒
              LOG.debug("Flush thread woke up because memory above low water="
                  + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
              
              // 調用flushOneForGlobalPressure()方法,flush一個HRegion的MemStore,
              // 減小MemStore的大小。預防OOM等異常狀況的發生
              if (!flushOneForGlobalPressure()) {
                // Wasn't able to flush any region, but we're above low water mark
                // This is unlikely to happen, but might happen when closing the
            	// 這是不可能發生的。但是當關閉全部服務器時可能發生,另一個線程正在flush region;
                // entire server - another thread is flushing regions. We'll just
                // sleep a little bit to avoid spinning, and then pretend that
                // we flushed one, so anyone blocked will check again
            	  // 咱們將會休眠一段時間。以免旋轉,而後僞裝咱們flush了一個region,以使得被堵塞線程再次檢查
                Thread.sleep(1000);
                wakeUpIfBlocking();// 喚醒其它堵塞線程
              }
              // Enqueue another one of these tokens so we'll wake up again
              // 入列還有一個令牌。以使咱們以後再次被喚醒
              wakeupFlushThread();
            }
            continue;
          }
          
          // fre不爲空,且不爲WakeupFlushThread的話,轉化爲FlushRegionEntry類型的fre
          FlushRegionEntry fre = (FlushRegionEntry) fqe;
          
          // 調用flushRegion()方法。並且假設結果爲false的話。跳出循環
          if (!flushRegion(fre)) {
            break;
          }
        } catch (InterruptedException ex) {
          continue;
        } catch (ConcurrentModificationException ex) {
          continue;
        } catch (Exception ex) {
          LOG.error("Cache flusher failed for entry " + fqe, ex);
          if (!server.checkFileSystem()) {
            break;
          }
        }
      }
      
      // 同一時候清空regionsInQueue和flushQueue
      // 又是在一塊兒啊
      synchronized (regionsInQueue) {
        regionsInQueue.clear();
        flushQueue.clear();
      }

      // Signal anyone waiting, so they see the close flag
      // 喚醒全部的等待着,使得它們能夠看到close標誌
      wakeUpIfBlocking();
      
      // 記錄日誌信息
      LOG.info(getName() + " exiting");
    }
        它的主要處理邏輯爲:

        一、首先HRegionServer未中止的話,run()方法一直執行。

        二、將標誌位AtomicBoolean類型的wakeupPending設置爲false。

        三、從flushQueue隊列中拉取一個FlushQueueEntry。即fqe:

             3.一、假設fqe爲空,或者爲WakeupFlushThread:

                      3.1.一、假設經過isAboveLowWaterMark()方法推斷全局MemStore的大小高於限制值得低水平線。調用flushOneForGlobalPressure()方法,依照必定策略。flush一個HRegion的MemStore,減小MemStore的大小。預防OOM等異常狀況的發生。併入列還有一個令牌,以使該線程以後再次被喚醒;

             3.二、fre不爲空,且不爲WakeupFlushThread的話,轉化爲FlushRegionEntry類型的fre:調用flushRegion()方法,並且假設結果爲false的話,跳出循環;

        四、假設循環結束,同一時候清空regionsInQueue和flushQueue(ps:又是在一塊兒啊O(∩_∩)O~)

        五、喚醒所有的等待着,使得它們能夠看到close標誌。

        六、記錄日誌。

        咱們注意到。WakeupFlushThread的主要做用是作爲一個佔位符或令牌插入到刷新隊列flushQueue,以確保FlushHandler不會休眠,實際上WakeupFlushThread起到的做用不不過這個,在FlushHandler線程不斷的poll刷新隊列flushQueue中的元素時,假設獲取到的是一個WakeupFlushThread,它會發起 一個檢測,即RegionServer的全局MemStore大小是否超太低水平線,假設未超過,WakeupFlushThread只起到了一個佔位符的做用,不然,WakeupFlushThread不只作爲佔位符,保證刷新線程不休眠。還依照必定策略選擇該RegionServer上的一個Region刷新memstore,以緩解RegionServer內存壓力。

        至於。假設全局MemStore的大小高於限制值得低水平線時。怎樣選擇一個HRegion進行flush以緩解MemStore壓力,還有HRegion的flush是怎樣發起的,咱們下節再講,敬請期待。

相關文章
相關標籤/搜索