在《HBase源代碼分析之HRegion上MemStore的flsuh流程(一)》、《HBase源代碼分析之HRegion上MemStore的flsuh流程(二)》等文中。咱們介紹了HRegion上Memstore flush的主體流程和主要細節。java
但是,HRegion僅僅是HBase表中依照行的方向對一片連續的數據區域的抽象,它並不能對外提供單獨的服務,供client或者HBase其餘實體調用。而HRegion上MemStore的flush仍是要經過HRegionServer來對外提供服務的。如下,咱們就具體探究下HRegionServer上是怎樣實現這點的。數組
在HRegionServer中。有一個叫作cacheFlusher的東東。它是什麼呢?咱們先看一下它是怎樣被定義的:緩存
// Cache flushing // memstore內存刷新管理對象 protected MemStoreFlusher cacheFlusher;可以發現,cacheFlusher是MemStoreFlusher類型的一個對象,咱們來看下類的凝視及定義:
/** * Thread that flushes cache on request * 處理刷新緩存請求的線程 * * NOTE: This class extends Thread rather than Chore because the sleep time * can be interrupted when there is something to do, rather than the Chore * sleep time which is invariant. * * @see FlushRequester */ @InterfaceAudience.Private class MemStoreFlusher implements FlushRequester {cacheFlusher實際上就是HRegionServer上處理刷新緩存請求的線程。那麼接下來的問題就是,cacheFlusher是怎樣被初始化的?它又是怎樣處理flush請求的?帶着這兩個問題,咱們繼續本文。
1、怎樣初始化cacheFlusher服務器
首先,咱們發現HRegionServer繼承自HasThread,而HasThread實現了Runnable接口,那麼在其內部確定會運行run()方法,而run()方法的開始,有例如如下代碼:
數據結構
try { // Do pre-registration initializations; zookeeper, lease threads, etc. preRegistrationInitialization(); } catch (Throwable e) { abort("Fatal exception during initialization", e); }繼續追蹤preRegistrationInitialization()方法,在其內部。調用了initializeThreads()方法,例如如下:
if (!isStopped() && !isAborted()) { initializeThreads(); }而這個initializeThreads()方法,作的主要工做就是初始化HRegionServer內部的各類工做線程。當中就包含cacheFlusher,代碼例如如下:
// Cache flushing thread. // 緩存刷新線程 this.cacheFlusher = new MemStoreFlusher(conf, this);接下來。咱們在看看這個MemStoreFlusher類是怎樣定義及工做的。首先看下它最基本的幾個成員變量:
首當其衝的即是flushQueue。其定義例如如下:多線程
private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<FlushQueueEntry>();flushQueue是MemStoreFlusher中很重要的一個變量,它是一個存儲了Region刷新緩存請求的隊列。
而與flushQueue同一時候被更新的是regionsInQueue,它存儲的是HRegion到FlushRegionEntry映射關係的集合,FlushRegionEntry是對發起memstore刷新請求的HRegion的一個封裝。不只包括了HRegion實例,還包括HRegion刷新memstore請求的產生時間,到期時間,以及一種相似續約的處理方式,即延長該請求的到期時間等。regionsInQueue的定義例如如下: 併發
private final Map<HRegion, FlushRegionEntry> regionsInQueue = new HashMap<HRegion, FlushRegionEntry>();flushQueue和regionsInQueue的更新是同步的,即假設在flushQueue中增長或刪除一條記錄,那麼在regionsInQueue中也會同步增長或刪除一條記錄。
接下來比較重要的即是flushHandlers。它是FlushHandler類型的一個數組,定義例如如下:app
private final FlushHandler[] flushHandlers;
FlushHandler是什麼呢?它是處理緩存刷新的線程類,線程一旦啓動後,在其run()方法內,就會不停的從flushQueue隊列中拉取flush請求進行處理。其類的定義例如如下:ide
/** * 處理緩存刷新的線程類 */ private class FlushHandler extends HasThread {
以上就是MemStoreFlusher內運行flush流程最重要的幾個成員變量,其它的變量都是一些輔助性的,這裏再也不作具體介紹。函數
如下,咱們來看下MemStoreFlusher的構造及成員變量的初始化,構造函數例如如下:
/** * @param conf * @param server */ public MemStoreFlusher(final Configuration conf, final HRegionServer server) { super(); // 賦值RegionServer實例server this.server = server; // 線程喚醒頻率threadWakeFrequency,取參數hbase.server.thread.wakefrequency配置的值。默以爲10s,即線程的工做頻率 this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); // 獲取最大可用堆內存max long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); // 獲取全局memstore所佔堆內存的百分比globalMemStorePercent float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true); // 獲取全局memstore限制大小值globalMemStoreLimit this.globalMemStoreLimit = (long) (max * globalMemStorePercent); // 獲取全局memstore限制大小值的低水平線百分比globalMemStoreLimitLowMarkPercent this.globalMemStoreLimitLowMarkPercent = HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent); // 獲取全局memstore限制大小值的低水平線globalMemStoreLimitLowMark this.globalMemStoreLimitLowMark = (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent); // 獲取堵塞等待時間blockingWaitTime,取參數hbase.hstore.blockingWaitTime,默以爲90000 this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); // 獲取flush處理線程數目handlerCount,取參數hbase.hstore.flusher.count,默以爲2 int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2); // 構造handlerCount個flush處理線程數組。默以爲2個。可經過hbase.hstore.flusher.count設置 this.flushHandlers = new FlushHandler[handlerCount]; // 記錄日誌信息 LOG.info("globalMemStoreLimit=" + StringUtils.humanReadableInt(this.globalMemStoreLimit) + ", globalMemStoreLimitLowMark=" + StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) + ", maxHeap=" + StringUtils.humanReadableInt(max)); }MemStoreFlusher的構造函數比較簡單。咱們重點分析下獲取全局memstore所佔堆內存的百分比globalMemStorePercent的HeapMemorySizeUtil類的getGlobalMemStorePercent()方法。和獲取全局memstore限制大小值的低水平線百分比globalMemStoreLimitLowMarkPercent的HeapMemorySizeUtil類的getGlobalMemStoreLowerMark()方法。
首先,看下獲取全局memstore所佔堆內存的百分比globalMemStorePercent的HeapMemorySizeUtil類的getGlobalMemStorePercent()方法,代碼例如如下:
/** * Retrieve global memstore configured size as percentage of total heap. * 獲取配置的全局memstore佔整個heap內存的百分比 * @param c * @param logInvalid */ public static float getGlobalMemStorePercent(final Configuration c, final boolean logInvalid) { // 獲取全局memstore的大小。優先取參數hbase.regionserver.global.memstore.size, // 未配置的話再取參數hbase.regionserver.global.memstore.upperLimit, // 假設還未配置的話,默以爲0.4 float limit = c.getFloat(MEMSTORE_SIZE_KEY, c.getFloat(MEMSTORE_SIZE_OLD_KEY, DEFAULT_MEMSTORE_SIZE)); // 假設limit的值在區間(0,0.8]以外的話 if (limit > 0.8f || limit <= 0.0f) { if (logInvalid) {// 依據參數logInvalid肯定是否記錄警告日誌 LOG.warn("Setting global memstore limit to default of " + DEFAULT_MEMSTORE_SIZE + " because supplied value outside allowed range of (0 -> 0.8]"); } // 將limit設置爲0.4 limit = DEFAULT_MEMSTORE_SIZE; } // 返回limit return limit; }這種方法的主要做用就是獲取配置的全局memstore佔整個heap內存的百分比。獲取的邏輯例如如下:
一、獲取配置的全局memstore佔整個heap內存的百分比limit:優先取參數hbase.regionserver.global.memstore.size,未配置的話再取參數hbase.regionserver.global.memstore.upperLimit,假設還未配置的話,默以爲0.4;
二、推斷limit是否在區間(0,0.8]以外,依據參數logInvalid肯定是否記錄警告日誌,並將limit設置爲默認值0.4;
三、返回limit。
如下,咱們再看下獲取全局memstore限制大小值的低水平線百分比globalMemStoreLimitLowMarkPercent的HeapMemorySizeUtil類的getGlobalMemStoreLowerMark()方法,代碼例如如下:
/** * Retrieve configured size for global memstore lower water mark as percentage of total heap. * 獲取配置的全局memstore內存佔全部heap內存的低水平線百分比 * @param c * @param globalMemStorePercent */ public static float getGlobalMemStoreLowerMark(final Configuration c, float globalMemStorePercent) { // 取新參數hbase.regionserver.global.memstore.size.lower.limit String lowMarkPercentStr = c.get(MEMSTORE_SIZE_LOWER_LIMIT_KEY); // 假設新參數配置了的話。直接轉化爲double並返回 if (lowMarkPercentStr != null) { return Float.parseFloat(lowMarkPercentStr); } // 取舊參數hbase.regionserver.global.memstore.lowerLimit" String lowerWaterMarkOldValStr = c.get(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY); // 假設舊參數配置的話,記錄警告日誌信息 if (lowerWaterMarkOldValStr != null) { LOG.warn(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " is deprecated. Instead use " + MEMSTORE_SIZE_LOWER_LIMIT_KEY); // 轉化爲double類型lowerWaterMarkOldVal float lowerWaterMarkOldVal = Float.parseFloat(lowerWaterMarkOldValStr); // 假設參數值大於計算獲得的全局memstore所佔堆內存的百分比,賦值爲globalMemStorePercent。並記錄日誌信息 if (lowerWaterMarkOldVal > globalMemStorePercent) { lowerWaterMarkOldVal = globalMemStorePercent; LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " + "because supplied " + MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " was > " + MEMSTORE_SIZE_OLD_KEY); } // 返回lowerWaterMarkOldVal / globalMemStorePercent return lowerWaterMarkOldVal / globalMemStorePercent; } // 假設新舊參數均未配置的話,默以爲0.95 return DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT; }這種方法的主要做用就是獲取配置的全局memstore內存佔全部heap內存的低水平線百分比。
獲取的邏輯例如如下:
一、取新參數hbase.regionserver.global.memstore.size.lower.limit配置的值,假設新參數配置了的話,直接轉化爲double並返回;
二、假設新參數未配置的話,取舊參數hbase.regionserver.global.memstore.lowerLimit配置的值,假設舊參數配置的話,記錄警告日誌信息,並:
2.一、將舊參數配置的值轉化爲double類型lowerWaterMarkOldVal;
2.二、假設舊參數值大於計算獲得的全局memstore所佔堆內存的百分比,賦值爲globalMemStorePercent,並記錄日誌信息;
2.三、返回lowerWaterMarkOldVal / globalMemStorePercent。
三、假設新舊參數均未配置的話,默以爲0.95。
2、cacheFlusher怎樣處理flush請求
經過怎樣初始化cacheFlusher部分的介紹,咱們已經知道,在MemStoreFlusher內部,存在兩個存儲flush請求及其HRegion封裝類的隊列和集合。即flushQueue和regionsInQueue,而MemStoreFlusher對外提供了一個requestFlush()方法。咱們大致看下這種方法:
/** * 請求刷新, * 即將需要刷新MemStore的HRegion放置到regionsInQueue中, * 同一時候依據HRegion構造FlushRegionEntry實例。加入到flushQueue中 */ public void requestFlush(HRegion r) { synchronized (regionsInQueue) {// 使用synchronizedkeyword對regionsInQueue進行線程同步 if (!regionsInQueue.containsKey(r)) {// 假設regionsInQueue中不存在相應HRegion // This entry has no delay so it will be added at the top of the flush // queue. It'll come out near immediately. // 將HRegion類型的r封裝成FlushRegionEntry類型的fqe // 這個fqe沒有delay,即延遲運行時間,因此它被加入到flush隊列的頂部。不久它將出列被處理。 FlushRegionEntry fqe = new FlushRegionEntry(r); // 將HRegion->FlushRegionEntry的相應關係加入到regionsInQueue集合 // 將flush請求FlushRegionEntry加入到flushQueue隊列 // 從這裏可以看出regionsInQueue、flushQueue這兩個成員變量go together this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); } } }requestFlush()方法的主要做用,就是加入一個flush region的請求至MemStoreFlusher內部隊列。其主要邏輯例如如下:
一、首先需要使用synchronizedkeyword對regionsInQueue進行線程同步,這麼作是爲了防止多線程的併發。
二、而後推斷regionsInQueue中是否存在相應的HRegion,假設regionsInQueue集合中不存在相應HRegion的話繼續,不然直接返回;
三、既然regionsInQueue集合中不存在相應HRegion,將HRegion類型的r封裝成FlushRegionEntry類型的fqe;
四、將HRegion->FlushRegionEntry的相應關係加入到regionsInQueue集合;
五、將flush請求FlushRegionEntry加入到flushQueue隊列。
從上述四、5步就可以看出regionsInQueue、flushQueue這兩個成員變量go together。並且這個fqe沒有delay,即延遲運行時間,因此它被加入到flush隊列的頂部。不久它將出列被處理。
這個該怎麼理解呢?咱們仍是回到flushQueue的定義,flushQueue是一個存儲了Region刷新緩存請求的隊列,裏面存儲的是實現了FlushQueueEntry接口的對象。FlushQueueEntry未定義不論什麼行爲。但是繼承了java.util.concurrent.Delayed接口,故flushQueue是java中的DelayQueue,隊列裏存儲的對象有一個過時時間的概念。
既然flush的請求已經被加入至flushQueue隊列,至關於生產者已經把產品生產出來了,那麼誰來消費呢?這個消費者的角色就是由FlushHandler線程來擔任的。既然是線程,那麼處理的邏輯確定在其run()方法內,但是在研究其run()方法前,咱們先看下flushQueue中存儲的都是什麼東西?
咱們再回想下flushQueue的定義。它是一個存儲了FlushQueueEntry的隊列DelayQueue。咱們先看下FlushQueueEntry的定義:
interface FlushQueueEntry extends Delayed { }一個集成了java的Delayed接口的無不論什麼方法的空接口而已,那麼它都有哪些實現類呢?答案就是WakeupFlushThread和FlushRegionEntry。在介紹這兩者以前。咱們首先介紹下flushQueue相應的隊列類型---Java中的DelayQueue。
衆所周知,DelayQueue是一個無界的BlockingQueue,其內部存儲的一定是實現了Delayed接口的對象。因此,FlushQueueEntry必須實現java的Delayed接口。
而這樣的隊列中的成員有一個最大特色,就是僅僅有在其到期後才幹出列,並且該隊列內的成員都是有序的。從頭到尾依照延遲到期時間的長短來排序。
那麼怎樣推斷成員是否到期呢?相應成員對象的getDelay()方法返回一個小於等於0的值,就說明相應對象在隊列中已到期,可以被取走。
既然DelayQueue中存儲的成員對象都是有序的,那麼實現了Delayed接口的類,必須提供compareTo()方法。用以排序。並且需要實現上述getDelay()方法,推斷隊內成員是否到期可以被取走。
接下來,咱們分別來研究下WakeupFlushThread和FlushRegionEntry。
首先。WakeupFlushThread很easy,沒有不論什麼實質內容,代碼例如如下:
/** * Token to insert into the flush queue that ensures that the flusher does not sleep * 增長到刷新隊列的確保刷新器不睡眠的令牌 */ static class WakeupFlushThread implements FlushQueueEntry { @Override public long getDelay(TimeUnit unit) { return 0; } @Override public int compareTo(Delayed o) { return -1; } @Override public boolean equals(Object obj) { return (this == obj); } }它的主要做用是作爲一個佔位符或令牌插入到刷新隊列flushQueue,以確保FlushHandler不會休眠。而且,其getDelay()方法返回值爲0,說明其不存在延遲時間。入列後就能夠出列。而它的compareTo()方法返回的值是-1。說明它與其餘WakeupFlushThread在隊內的順序是等價的,無先後之分。實際上WakeupFlushThread區分先後也沒有意義,它自己也沒有實質性的內容。
接下來。咱們再看下FlushRegionEntry類,其定義例如如下:
/** * Datastructure used in the flush queue. Holds region and retry count. * Keeps tabs on how old this object is. Implements {@link Delayed}. On * construction, the delay is zero. When added to a delay queue, we'll come * out near immediately. Call {@link #requeue(long)} passing delay in * milliseconds before readding to delay queue if you want it to stay there * a while. * * 用在刷新隊列裏的數據結構。FlushRegionEntry類有幾個很是重要的對像:保存region和重試次數。
* 跟蹤對象多大(ps.即時間) * 實現了java的Delayed接口。 * 在構造方法裏。delay爲0。 * 假設你想要它在隊列中保持在在被又一次增長delay隊列以前 * * */ static class FlushRegionEntry implements FlushQueueEntry { // 待flush的HRegion private final HRegion region; // 建立時間 private final long createTime; // 什麼時候到期 private long whenToExpire; // 重入隊列次數 private int requeueCount = 0; FlushRegionEntry(final HRegion r) { // 待flush的HRegion this.region = r; // 建立時間爲當前時間 this.createTime = EnvironmentEdgeManager.currentTime(); // 什麼時候到期也爲當前時間。意味着首次入隊列時是沒有延遲時間的。入列就能夠出列 this.whenToExpire = this.createTime; } /** * @param maximumWait * @return True if we have been delayed > <code>maximumWait</code> milliseconds. */ public boolean isMaximumWait(final long maximumWait) { return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait; } /** * @return Count of times {@link #requeue(long)} was called; i.e this is * number of times we've been requeued. */ public int getRequeueCount() { return this.requeueCount; } /** * 類似又一次入列的處理方法,又一次入列次數requeueCount加1。什麼時候到期未當前時間加參數when * * @param when When to expire, when to come up out of the queue. * Specify in milliseconds. This method adds EnvironmentEdgeManager.currentTime() * to whatever you pass. * @return This. */ public FlushRegionEntry requeue(final long when) { this.whenToExpire = EnvironmentEdgeManager.currentTime() + when; this.requeueCount++; return this; } /** * 推斷什麼時候到期的方法 */ @Override public long getDelay(TimeUnit unit) { // 什麼時候到期減去當前時間 return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(), TimeUnit.MILLISECONDS); } /** * 排序比較方法。依據推斷什麼時候到期的getDelay()方法來決定順序 */ @Override public int compareTo(Delayed other) { // Delay is compared first. If there is a tie, compare region's hash code int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS)).intValue(); if (ret != 0) { return ret; } // 什麼時候到期時間一直的話,依據hashCode()來排序。事實上也就是依據HRegion的hashCode()方法返回值來排序 FlushQueueEntry otherEntry = (FlushQueueEntry) other; return hashCode() - otherEntry.hashCode(); } @Override public String toString() { return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]"; } @Override public int hashCode() { int hash = (int) getDelay(TimeUnit.MILLISECONDS); return hash ^ region.hashCode(); } @Override public boolean equals(Object obj) { if (this == obj) { return true; } if (obj == null || getClass() != obj.getClass()) { return false; } Delayed other = (Delayed) obj; return compareTo(other) == 0; } } }
一、HRegion region:待flush的HRegion;
二、long createTime:建立時間。
三、long whenToExpire:什麼時候到期;
四、int requeueCount = 0:重入隊列次數。
而它的對象在初始化時。建立時間createTime設置爲當前時間。什麼時候到期whenToExpire也爲當前時間,它推斷是否到期的getDelay()方法爲什麼時到期減去當前時間,也就意味着首次入隊列時是沒有延遲時間的,入列就能夠出列。
另外,它在隊列內部用於排序的compareTo()方法。也是首先依據推斷什麼時候到期的getDelay()方法來決定順序,什麼時候到期時間一致的話,依據hashCode()來排序,事實上也就是依據HRegion的hashCode()方法返回值來排序。比較特別的是,這個類還提供了相似又一次入列的處理方法,又一次入列次數requeueCount加1。什麼時候到期未當前時間加參數when,那麼就至關於延期的了when時間變量。
說了那麼多。接下來咱們看下flush請求的實際處理流程,即FlushHandler的run()方法,其代碼爲:
@Override public void run() { while (!server.isStopped()) {// HRegionServer未中止的話。run()方法一直執行 FlushQueueEntry fqe = null; try { // 標誌位AtomicBoolean類型的wakeupPending設置爲false wakeupPending.set(false); // allow someone to wake us up again // 從flushQueue隊列中拉取一個FlushQueueEntry,即fqe fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); if (fqe == null || fqe instanceof WakeupFlushThread) {// 假設fqe爲空,或者爲WakeupFlushThread if (isAboveLowWaterMark()) { // 由於內存高於低閾值。flush線程喚醒 LOG.debug("Flush thread woke up because memory above low water=" + StringUtils.humanReadableInt(globalMemStoreLimitLowMark)); // 調用flushOneForGlobalPressure()方法,flush一個HRegion的MemStore, // 減小MemStore的大小。預防OOM等異常狀況的發生 if (!flushOneForGlobalPressure()) { // Wasn't able to flush any region, but we're above low water mark // This is unlikely to happen, but might happen when closing the // 這是不可能發生的。但是當關閉全部服務器時可能發生,另一個線程正在flush region; // entire server - another thread is flushing regions. We'll just // sleep a little bit to avoid spinning, and then pretend that // we flushed one, so anyone blocked will check again // 咱們將會休眠一段時間。以免旋轉,而後僞裝咱們flush了一個region,以使得被堵塞線程再次檢查 Thread.sleep(1000); wakeUpIfBlocking();// 喚醒其它堵塞線程 } // Enqueue another one of these tokens so we'll wake up again // 入列還有一個令牌。以使咱們以後再次被喚醒 wakeupFlushThread(); } continue; } // fre不爲空,且不爲WakeupFlushThread的話,轉化爲FlushRegionEntry類型的fre FlushRegionEntry fre = (FlushRegionEntry) fqe; // 調用flushRegion()方法。並且假設結果爲false的話。跳出循環 if (!flushRegion(fre)) { break; } } catch (InterruptedException ex) { continue; } catch (ConcurrentModificationException ex) { continue; } catch (Exception ex) { LOG.error("Cache flusher failed for entry " + fqe, ex); if (!server.checkFileSystem()) { break; } } } // 同一時候清空regionsInQueue和flushQueue // 又是在一塊兒啊 synchronized (regionsInQueue) { regionsInQueue.clear(); flushQueue.clear(); } // Signal anyone waiting, so they see the close flag // 喚醒全部的等待着,使得它們能夠看到close標誌 wakeUpIfBlocking(); // 記錄日誌信息 LOG.info(getName() + " exiting"); }它的主要處理邏輯爲:
一、首先HRegionServer未中止的話,run()方法一直執行。
二、將標誌位AtomicBoolean類型的wakeupPending設置爲false。
三、從flushQueue隊列中拉取一個FlushQueueEntry。即fqe:
3.一、假設fqe爲空,或者爲WakeupFlushThread:
3.1.一、假設經過isAboveLowWaterMark()方法推斷全局MemStore的大小高於限制值得低水平線。調用flushOneForGlobalPressure()方法,依照必定策略。flush一個HRegion的MemStore,減小MemStore的大小。預防OOM等異常狀況的發生。併入列還有一個令牌,以使該線程以後再次被喚醒;
3.二、fre不爲空,且不爲WakeupFlushThread的話,轉化爲FlushRegionEntry類型的fre:調用flushRegion()方法,並且假設結果爲false的話,跳出循環;
四、假設循環結束,同一時候清空regionsInQueue和flushQueue(ps:又是在一塊兒啊O(∩_∩)O~)
五、喚醒所有的等待着,使得它們能夠看到close標誌。
六、記錄日誌。
咱們注意到。WakeupFlushThread的主要做用是作爲一個佔位符或令牌插入到刷新隊列flushQueue,以確保FlushHandler不會休眠,實際上WakeupFlushThread起到的做用不不過這個,在FlushHandler線程不斷的poll刷新隊列flushQueue中的元素時,假設獲取到的是一個WakeupFlushThread,它會發起 一個檢測,即RegionServer的全局MemStore大小是否超太低水平線,假設未超過,WakeupFlushThread只起到了一個佔位符的做用,不然,WakeupFlushThread不只作爲佔位符,保證刷新線程不休眠。還依照必定策略選擇該RegionServer上的一個Region刷新memstore,以緩解RegionServer內存壓力。
至於。假設全局MemStore的大小高於限制值得低水平線時。怎樣選擇一個HRegion進行flush以緩解MemStore壓力,還有HRegion的flush是怎樣發起的,咱們下節再講,敬請期待。