Hbase flusher源碼解析(flush全代碼流程解析)

版權聲明:本文爲博主原創文章,遵循版權協議,轉載請附上原文出處連接和本聲明。java

在介紹HBASE flush源碼以前,咱們先在邏輯上大致梳理一下,便於後續看代碼。flush的總體流程分三個階段服務器

  1.第一階段:prepare階段,這個階段主要是將當前memstore的內存結構作snapshot。HBASE寫入內存的數據結構(memstore以及snapshot)是跳躍表,用的是jdk自帶的ConcurrentSkipListMap結構。這個過程其實就是將memstore賦值給snapshot,並構造一個新的memstore。微信

  2.第二階段:flushcache階段,這個階段主要是將第一階段生成的snapshot flush到disk,可是注意這裏是將其flush到temp文件,此時並無將生成的hfile move到store實際對應的cf路徑下,move是發生在第三階段。
數據結構

  3.第三階段:commit階段。這個階段主要是將第二階段生成的hfile move最終正確的位置。併發

上面是HBASE flush的邏輯流程,flush是region級別,涉及到的類不少,下面咱們開始介紹一下Flush相關的操做mvc

flush線程啓動app

  • 在regionserver啓動時,會調用startServiceThread方法啓動一些服務線程,其中
// Cache flushing
protected MemStoreFlusher cacheFlusher;
。。。。。省略。。。。。。
private void startServiceThreads() throws IOException { 。。。。其餘代碼省略。。。 this.cacheFlusher.start(uncaughtExceptionHandler); }
  •  而cacheFlusher是MemStoreFlusher類的實例,在梳理上述邏輯以前首先介紹兩個MemStoreFlusher的變量
  •  //該變量是一個BlockingQueue<FlushQueueEntry>類型的變量。
      // 主要存儲了FlushRegionEntry類型刷新請求實例,以及一個喚醒隊列WakeupFlushThread實例對象。
      private final BlockingQueue<FlushQueueEntry> flushQueue =
        new DelayQueue<FlushQueueEntry>();
      //同時也會把加入到flushqueue中的requst加入到regionsInQueue中。
      private final Map<HRegion, FlushRegionEntry> regionsInQueue =
        new HashMap<HRegion, FlushRegionEntry>();
  • MemStoreFlusher的start方法以下:
 synchronized void start(UncaughtExceptionHandler eh) {
    ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
        server.getServerName().toShortString() + "-MemStoreFlusher", eh);
    for (int i = 0; i < flushHandlers.length; i++) {
      flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
      flusherThreadFactory.newThread(flushHandlers[i]);
      flushHandlers[i].start();
    }
  }

  會根據配置flusher.handler.count生成相應個數的flushHandler線程。而後對每個flushHandler線程調用start方法。咱們繼續看一下flushHandler。less

private class FlushHandler extends HasThread {
private FlushHandler(String name) {
super(name);
}
@Override
public void run() {
//若是server正常沒有stop
while (!server.isStopped()) {
FlushQueueEntry fqe = null;
try {
wakeupPending.set(false); // allow someone to wake us up again
//阻塞隊列的poll方法,若是沒有會阻塞在這
fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
if (fqe == null || fqe instanceof WakeupFlushThread) {
// 若是沒有flush request或者flush request是一個全局flush的request。
if (isAboveLowWaterMark()) {
// 檢查全部的memstore是否超過max_heap * hbase.regionserver.global.memstore.lowerLimit配置的值,默認0.35
// 超過配置的最小memstore的值,flush最大的一個memstore的region
LOG.debug("Flush thread woke up because memory above low water="
+ TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));

if (!flushOneForGlobalPressure()) {
// 若是沒有任何Region須要flush,但已經超過了lowerLimit。
// 這種狀況不太可能發生,除非可能會在關閉整個服務器時發生,即有另外一個線程正在執行flush regions。
// 只裏只須要sleep一下,而後喚醒任何被阻塞的線程再次檢查。
// Wasn't able to flush any region, but we're above low water mark
// This is unlikely to happen, but might happen when closing the
// entire server - another thread is flushing regions. We'll just
// sleep a little bit to avoid spinning, and then pretend that
// we flushed one, so anyone blocked will check again
Thread.sleep(1000);
wakeUpIfBlocking();
}
// Enqueue another one of these tokens so we'll wake up again
wakeupFlushThread();
}
//阻塞超時後也會繼續continue
continue;
}
// 若是是正常的flush request
// 單個region memstore大小超過hbase.hregion.memstore.flush.size配置的值,默認128M,執行flush操做
FlushRegionEntry fre = (FlushRegionEntry) fqe;
if (!flushRegion(fre)) {
break;
}
} catch (InterruptedException ex) {
continue;
} catch (ConcurrentModificationException ex) {
continue;
} catch (Exception ex) {
LOG.error("Cache flusher failed for entry " + fqe, ex);
if (!server.checkFileSystem()) {
break;
}
}
}
//結束MemStoreFlusher的線程調用,一般是regionserver stop,這個是在while循環以外的
synchronized (regionsInQueue) {
regionsInQueue.clear();
flushQueue.clear();
}

// Signal anyone waiting, so they see the close flag
wakeUpIfBlocking();
LOG.info(getName() + " exiting");
}

  如今咱們看是看梳理一下FlusherHandler的run方法的邏輯ide

  1. 只要rs不掛,就一直循環判斷有沒有flushrequest
  2. 經過flushqueue.poll來阻塞,應該flushqueue是阻塞隊列,當隊列爲空時會阻塞,直到超時。
  3. 若是不爲空,取出一個request,調用MemStoreFlusher.flushRegion(fre)

Flush流程

 

 可見是調用的MemStoreFlusher.flushRegion方法進行flush的,咱們繼續跟進flushRegion一探究竟。
private boolean flushRegion(final FlushRegionEntry fqe) {
    //在FlushQueueEntry中取出region信息
    HRegion region = fqe.region;
    //若是region不是metaregion而且含有太多的storefile,則隨機blcoking.
    //tooManyStoreFiles默認的閾值時7,同時也要看hbase.hstore.blockingStoreFiles配置的值,沒有配置取默認值7
    if (!region.getRegionInfo().isMetaRegion() &&
        isTooManyStoreFiles(region)) {

      //判斷是否已經wait了設置的時間
      if (fqe.isMaximumWait(this.blockingWaitTime)) {
        LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
          "ms on a compaction to clean up 'too many store files'; waited " +
          "long enough... proceeding with flush of " +
          region.getRegionNameAsString());
      } else {
        // If this is first time we've been put off, then emit a log message.
        //若是當前flush是第一次加入到flush queue
        if (fqe.getRequeueCount() <= 0) {
          // Note: We don't impose blockingStoreFiles constraint on meta regions
          LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
            "store files; delaying flush up to " + this.blockingWaitTime + "ms");
          //flush前判斷該region是否須要split,若是不須要split,同時由於又太多的storefiles,所以調用過一次compact
          if (!this.server.compactSplitThread.requestSplit(region)) {
            try {
              this.server.compactSplitThread.requestSystemCompaction(
                  region, Thread.currentThread().getName());
            } catch (IOException e) {
              LOG.error(
                "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
                RemoteExceptionHandler.checkIOException(e));
            }
          }
        }

        // Put back on the queue.  Have it come back out of the queue
        // after a delay of this.blockingWaitTime / 100 ms.
        //若是有too manyfile的region已經超過了隨機延遲的時間,加入flushqueue隊列,喚醒handler開始flush
        this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
        // Tell a lie, it's not flushed but it's ok
        return true;
      }
    }
    //正常狀況下的flush
    return flushRegion(region, false, fqe.isForceFlushAllStores());
  }

  該方法中會判斷要flush的region是否有過多的hfile,若是是則隨機wait必定的時間。wait完成後加入flushqueue喚醒handler開始flush。在正常的狀況下最終是調用MemStoreFlusher的重載函數flushRgion(region,flase, isForceFlushAllStores),那咱們繼續跟進該重載函數。函數

private boolean flushRegion(final HRegion region, final boolean emergencyFlush,
      boolean forceFlushAllStores) {
    long startTime = 0;
    //枷鎖
    synchronized (this.regionsInQueue) {
      //在regioninQueue中移除該region
      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
      // Use the start time of the FlushRegionEntry if available
      if (fqe != null) {
        startTime = fqe.createTime;
      }
      if (fqe != null && emergencyFlush) {
        // Need to remove from region from delay queue.  When NOT an
        // emergencyFlush, then item was removed via a flushQueue.poll.
        flushQueue.remove(fqe);
     }
    }
    if (startTime == 0) {
      // Avoid getting the system time unless we don't have a FlushRegionEntry;
      // shame we can't capture the time also spent in the above synchronized
      // block
      startTime = EnvironmentEdgeManager.currentTime();
    }
    lock.readLock().lock();
    try {
      notifyFlushRequest(region, emergencyFlush);
      //最終是調用region的flushcache
      HRegion.FlushResult flushResult = region.flushcache(forceFlushAllStores);
      boolean shouldCompact = flushResult.isCompactionNeeded();
      // We just want to check the size
      boolean shouldSplit = region.checkSplit() != null;
      if (shouldSplit) {
        this.server.compactSplitThread.requestSplit(region);
      } else if (shouldCompact) {
        server.compactSplitThread.requestSystemCompaction(
            region, Thread.currentThread().getName());
      }
      if (flushResult.isFlushSucceeded()) {
        long endTime = EnvironmentEdgeManager.currentTime();
        server.metricsRegionServer.updateFlushTime(endTime - startTime);
      }
    } catch (DroppedSnapshotException ex) {
      // Cache flush can fail in a few places. If it fails in a critical
      // section, we get a DroppedSnapshotException and a replay of wal
      // is required. Currently the only way to do this is a restart of
      // the server. Abort because hdfs is probably bad (HBASE-644 is a case
      // where hdfs was bad but passed the hdfs check).
      server.abort("Replay of WAL required. Forcing server shutdown", ex);
      return false;
    } catch (IOException ex) {
      LOG.error("Cache flush failed" +
        (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) : ""),
        RemoteExceptionHandler.checkIOException(ex));
      if (!server.checkFileSystem()) {
        return false;
      }
    } finally {
      lock.readLock().unlock();
      wakeUpIfBlocking();
    }
    return true;
  }

  其餘無關的代碼這裏再也不細說,之間看標紅的位置,核心邏輯在這裏,能夠看到是調用的region.flushcache(isForceFlushAllStores),所以flush是region級別。同時在flush完成後會判斷是否須要進行split,若是不須要split會將判斷是否須要compact。繼續跟進看下里面作了啥。

//flush cache,參數意義爲是否須要flush全部的store
    public FlushResult flushcache(boolean forceFlushAllStores) throws IOException {
        // fail-fast instead of waiting on the lock
        //判斷當前region是否處於closing狀態,
        if (this.closing.get()) {
            String msg = "Skipping flush on " + this + " because closing";
            LOG.debug(msg);
            return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
        }
        MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
        status.setStatus("Acquiring readlock on region");
        // block waiting for the lock for flushing cache
        //此處加了鎖
        lock.readLock().lock();
        try {
            if (this.closed.get()) {
                String msg = "Skipping flush on " + this + " because closed";
                LOG.debug(msg);
                status.abort(msg);
                return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
            }
            if (coprocessorHost != null) {
                status.setStatus("Running coprocessor pre-flush hooks");
                coprocessorHost.preFlush();
            }
            // TODO: this should be managed within memstore with the snapshot, updated only after flush
            // successful
            if (numMutationsWithoutWAL.get() > 0) {
                numMutationsWithoutWAL.set(0);
                dataInMemoryWithoutWAL.set(0);
            }
            synchronized (writestate) {
                //這次flush以前 該region並無在flush,是否還處於write狀態
                if (!writestate.flushing && writestate.writesEnabled) {
                    this.writestate.flushing = true;
                } else {//不然表示該region正處於flushing狀態或者不可寫,abort flush
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("NOT flushing memstore for region " + this
                                + ", flushing=" + writestate.flushing + ", writesEnabled="
                                + writestate.writesEnabled);
                    }
                    String msg = "Not flushing since "
                            + (writestate.flushing ? "already flushing"
                            : "writes not enabled");
                    status.abort(msg);
                    return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
                }
            }

            try {
                //根據參數forceFlushAllStores判斷是否須要全部的store都進行flush,否側按照flush策略進行選擇
                //非全局flush的選擇策略:flushSizeLowerBound是參數hbase.hregion.percolumnfamilyflush.size.lower.bound,默認16M或者不知足大小,
//可是該memstore足夠老 Collection<Store> specificStoresToFlush = forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush(); //調用internalFlushcache進行flush FlushResult fs = internalFlushcache(specificStoresToFlush, status); if (coprocessorHost != null) { status.setStatus("Running post-flush coprocessor hooks"); coprocessorHost.postFlush(); } status.markComplete("Flush successful"); return fs; } finally { synchronized (writestate) { writestate.flushing = false; this.writestate.flushRequested = false; writestate.notifyAll(); } } } finally { lock.readLock().unlock(); status.cleanup(); } }

  核心邏輯在FlushResult fs = internalFlushcache(specificStoresToFlush, status);裏面涉及到了具體的三個階段,其中prepare的第一階段是調用了region.internalPrepareFlushCache()實現的,第二階段flush以及第三階段commit階段,是經過internalFlushAndCommit()進行的。咱們如今看下具體的internalFlushCache方法的邏輯:

protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
                                             final Collection<Store> storesToFlush, MonitoredTask status) throws IOException {
        //internalPrepareFlushCache執行snapshot,打快照
        PrepareFlushResult result
                = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, false);
        //返回的result中的result是null.所以會執行internalFlushchacheAndCommit方法執行第二和第三階段。
        if (result.result == null) {
            return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
        } else {
            return result.result; // early exit due to failure from prepare stage
        }
    }

  如今咱們看一下第一階段: internalPrepareFlushCache。裏面有一把region級別的updatelock。,這個裏面代碼比較多,能夠先忽略不重要的部分

 //該方法用來執行flush的prepare階段
    protected PrepareFlushResult internalPrepareFlushCache(
            final WAL wal, final long myseqid, final Collection<Store> storesToFlush,
            MonitoredTask status, boolean isReplay)
            throws IOException {

        if (this.rsServices != null && this.rsServices.isAborted()) {
            // Don't flush when server aborting, it's unsafe
            throw new IOException("Aborting flush because server is aborted...");
        }
        //便於計算flush耗時,記錄開始時間
        final long startTime = EnvironmentEdgeManager.currentTime();
        // If nothing to flush, return, but we need to safely update the region sequence id
        //若是當前memstroe爲空,不執行flush,可是要更新squenid
        if (this.memstoreSize.get() <= 0) {
            // Take an update lock because am about to change the sequence id and we want the sequence id
            // to be at the border of the empty memstore.
            MultiVersionConsistencyControl.WriteEntry w = null;
            this.updatesLock.writeLock().lock();
            try {
                if (this.memstoreSize.get() <= 0) {
                    // Presume that if there are still no edits in the memstore, then there are no edits for
                    // this region out in the WAL subsystem so no need to do any trickery clearing out
                    // edits in the WAL system. Up the sequence number so the resulting flush id is for
                    // sure just beyond the last appended region edit (useful as a marker when bulk loading,
                    // etc.)
                    // wal can be null replaying edits.
                    if (wal != null) {
                        w = mvcc.beginMemstoreInsert();
                        long flushSeqId = getNextSequenceId(wal);
                        FlushResult flushResult = new FlushResult(
                                FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush");
                        w.setWriteNumber(flushSeqId);
                        mvcc.waitForPreviousTransactionsComplete(w);
                        w = null;
                        return new PrepareFlushResult(flushResult, myseqid);
                    } else {
                        return new PrepareFlushResult(
                                new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"),
                                myseqid);
                    }
                }
            } finally {
                this.updatesLock.writeLock().unlock();
                if (w != null) {
                    mvcc.advanceMemstore(w);
                }
            }
        }

        if (LOG.isInfoEnabled()) {
            LOG.info("Started memstore flush for " + this + ", current region memstore size "
                    + StringUtils.byteDesc(this.memstoreSize.get()) + ", and " + storesToFlush.size() + "/"
                    + stores.size() + " column families' memstores are being flushed."
                    + ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid));
            // only log when we are not flushing all stores.
            //當不是flush全部的store時,打印log
            if (this.stores.size() > storesToFlush.size()) {
                for (Store store : storesToFlush) {
                    LOG.info("Flushing Column Family: " + store.getColumnFamilyName()
                            + " which was occupying "
                            + StringUtils.byteDesc(store.getMemStoreSize()) + " of memstore.");
                }
            }
        }
        // Stop updates while we snapshot the memstore of all of these regions' stores. We only have
        // to do this for a moment.  It is quick. We also set the memstore size to zero here before we
        // allow updates again so its value will represent the size of the updates received
        // during flush
        //中止寫入,直到memstore的snapshot完成。
        MultiVersionConsistencyControl.WriteEntry w = null;
        // We have to take an update lock during snapshot, or else a write could end up in both snapshot
        // and memstore (makes it difficult to do atomic rows then)
        status.setStatus("Obtaining lock to block concurrent updates");
        // block waiting for the lock for internal flush
        //獲取update的寫鎖
        this.updatesLock.writeLock().lock();
        status.setStatus("Preparing to flush by snapshotting stores in " +
                getRegionInfo().getEncodedName());
        //用於統計flush的全部的store的memtore內存大小之和
        long totalFlushableSizeOfFlushableStores = 0;
        //記錄全部flush的store的cfname
        Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
        for (Store store : storesToFlush) {
            flushedFamilyNames.add(store.getFamily().getName());
        }
        //storeFlushCtxs,committedFiles,storeFlushableSize,比較重要的是storeFlushCtxs和committedFiles。他們都被定義爲以CF作key的TreeMap,
        // 分別表明了store的CF實際執行(StoreFlusherImpl)和最終刷寫的HFlile文件。
        //其中storeFlushContext的實現類StoreFlusherImpl裏包含了flush相關的核心操做:prepare,flushcache,commit,abort等。
//因此這裏保存的是每個store的flush實例,後面就是經過這裏的StoreFlushContext進行flush的 TreeMap<byte[], StoreFlushContext> storeFlushCtxs = new TreeMap<byte[], StoreFlushContext>(Bytes.BYTES_COMPARATOR);
//用來存儲每一個store和它對應的hdfs commit路徑的映射 TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>( Bytes.BYTES_COMPARATOR); // The sequence id of this flush operation which is used to log FlushMarker and pass to // createFlushContext to use as the store file's sequence id. long flushOpSeqId = HConstants.NO_SEQNUM; long flushedSeqId = HConstants.NO_SEQNUM; // The max flushed sequence id after this flush operation. Used as completeSequenceId which is // passed to HMaster. byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes(); long trxId = 0; try { try { w = mvcc.beginMemstoreInsert(); if (wal != null) { if (!wal.startCacheFlush(encodedRegionName, flushedFamilyNames)) { // This should never happen. String msg = "Flush will not be started for [" + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; status.setStatus(msg); return new PrepareFlushResult(new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg), myseqid); } flushOpSeqId = getNextSequenceId(wal); long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName); // no oldestUnflushedSeqId means we flushed all stores. // or the unflushed stores are all empty. flushedSeqId = (oldestUnflushedSeqId == HConstants.NO_SEQNUM) ? flushOpSeqId : oldestUnflushedSeqId - 1; } else { // use the provided sequence Id as WAL is not being used for this flush. flushedSeqId = flushOpSeqId = myseqid; } //循環遍歷region下面的storeFile,爲每一個storeFile生成了一個StoreFlusherImpl類, // 生成MemStore的快照就是調用每一個StoreFlusherImpl的prepare方法生成每一個storeFile的快照, // 至於internalFlushCacheAndCommit中的flush和commti行爲也是調用了region中每一個storeFile的flushCache和commit接口。 for (Store s : storesToFlush) { //用於統計flush的全部的store的memtore內存大小之和,而不是snapshot的getCellsCount() totalFlushableSizeOfFlushableStores += s.getFlushableSize(); //爲每個store生成本身的storeFlushImpl storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId)); //此時尚未生成flush的hfile路徑 committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL } // write the snapshot start to WAL if (wal != null && !writestate.readOnly) { FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, getRegionInfo(), flushOpSeqId, committedFiles); // no sync. Sync is below where we do not hold the updates lock //這裏只是向wal中寫入begin flush的marker,真正的sync在後面作,由於這裏加了update的寫鎖,全部耗時操做都不在這裏進行 trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, sequenceId, false); } // Prepare flush (take a snapshot)這裏的StoreFlushContext就是StoreFlusherImpl for (StoreFlushContext flush : storeFlushCtxs.values()) { //迭代region下的每個store,把memstore下的kvset複製到memstore的snapshot中並清空kvset的值 //把memstore的snapshot複製到HStore的snapshot中 flush.prepare();//其prepare方法就是調用store的storeFlushImpl的snapshot方法生成快照 } } catch (IOException ex) { if (wal != null) { if (trxId > 0) { // check whether we have already written START_FLUSH to WAL try { FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, getRegionInfo(), flushOpSeqId, committedFiles); WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, sequenceId, false); } catch (Throwable t) { LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" + StringUtils.stringifyException(t)); // ignore this since we will be aborting the RS with DSE. } } // we have called wal.startCacheFlush(), now we have to abort it wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); throw ex; // let upper layers deal with it. } } finally { //作完snapshot釋放鎖,此時不會阻塞業務的讀寫操做了 this.updatesLock.writeLock().unlock(); } String s = "Finished memstore snapshotting " + this + ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores; status.setStatus(s); if (LOG.isTraceEnabled()) LOG.trace(s); // sync unflushed WAL changes // see HBASE-8208 for details if (wal != null) { try { wal.sync(); // ensure that flush marker is sync'ed } catch (IOException ioe) { LOG.warn("Unexpected exception while wal.sync(), ignoring. Exception: " + StringUtils.stringifyException(ioe)); } } // wait for all in-progress transactions to commit to WAL before // we can start the flush. This prevents // uncommitted transactions from being written into HFiles. // We have to block before we start the flush, otherwise keys that // were removed via a rollbackMemstore could be written to Hfiles. w.setWriteNumber(flushOpSeqId); mvcc.waitForPreviousTransactionsComplete(w); // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block w = null; } finally { if (w != null) { // in case of failure just mark current w as complete mvcc.advanceMemstore(w); } } return new PrepareFlushResult(storeFlushCtxs, committedFiles, startTime, flushOpSeqId, flushedSeqId, totalFlushableSizeOfFlushableStores);
 

  在具體看StoreFlushContext.prepare()以前,咱們先看一下StoreFlushContext接口的說明,如上所述,StoreFlushImpl是Store的內部類,繼承自StoreFlushContext。

interface StoreFlushContext {


  void prepare();


  void flushCache(MonitoredTask status) throws IOException;


  boolean commit(MonitoredTask status) throws IOException;

  
  void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot) throws IOException;


  void abort() throws IOException;


  List<Path> getCommittedFiles();
}

  如今咱們回過頭來繼續看internalPrepareFlushcache中標紅的flush.prepare();

 

public void prepare() {
            //在region調用storeFlusherImpl的prepare的時候,前面提到是在region的update.write.lock中的,所以這裏面全部的耗時操做都會影響業務正在進行的讀寫操做.
            //在snapshot中的邏輯中只是將memstore的跳躍表賦值給snapshot的跳躍表,在返回memstoresnapshot的時候,調用的snapshot的size()方法
            this.snapshot = memstore.snapshot();
            //MemstoreSnapshot的getCellsCount方法即在memstore的shapshot中返回的MemStoresnapshot中傳入的snapshot.size()值,時間複雜度是o(n)
            this.cacheFlushCount = snapshot.getCellsCount();
            this.cacheFlushSize = snapshot.getSize();
            committedFiles = new ArrayList<Path>(1);
        }

  咱們看下memstore的snapshot方法

public MemStoreSnapshot snapshot() {
    // If snapshot currently has entries, then flusher failed or didn't call
    // cleanup.  Log a warning.
    if (!this.snapshot.isEmpty()) {
      LOG.warn("Snapshot called again without clearing previous. " +
          "Doing nothing. Another ongoing flush or did we fail last attempt?");
    } else {
      this.snapshotId = EnvironmentEdgeManager.currentTime();
      //memstore使用的mem大小
      this.snapshotSize = keySize();
      if (!this.cellSet.isEmpty()) {
        //這裏的cellset就是memstore內存中的數據
        this.snapshot = this.cellSet;
//構造一個新的cellset存儲數據 this.cellSet = new CellSkipListSet(this.comparator); this.snapshotTimeRangeTracker = this.timeRangeTracker; this.timeRangeTracker = new TimeRangeTracker(); // Reset heap to not include any keys this.size.set(DEEP_OVERHEAD); this.snapshotAllocator = this.allocator; // Reset allocator so we get a fresh buffer for the new memstore if (allocator != null) { String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName()); this.allocator = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { Configuration.class }, new Object[] { conf }); } timeOfOldestEdit = Long.MAX_VALUE; } }

      prepare中的snapshot.getCellsCount();咱們重點說一下,hbase的內存存儲寫入的數據使用的是跳躍表的數據結構,實現是使用jdk自帶的ConcurrentSkipListMap。在hbase的MemStore(默認是DefaultMemStore)實現中有兩個環境變量,分別是ConcurrentSkipListMap類型的cellset和snapshot。cellset用來存儲寫入到memstore的數據,snapshot是在flush的第一階段是將cellset賦值用的。所以這個的getCellsCount()方法最終調用的是concurrentSkipListMap.size(),concurrentSkipListMap並無一個原子變量來報錯map的大小,由於這裏爲了併發,同時該操做也不經常使用。所以concurrentSkipListMap.size()是遍歷整個跳躍表獲取size大小。

  繼續回到internalPrepareFlushCache中,對每個store調用完prepare後,就將updatelock進行unlock。並返回一個PrepareFlushResult。繼續往上走,
回到internalFlushCache方法。執行完internalPrepareFlushcache後走的是internalFlushAndCommit方法。繼續跟進:
protected FlushResult internalFlushCacheAndCommit(
            final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
            final Collection<Store> storesToFlush)
            throws IOException {

        // prepare flush context is carried via PrepareFlushResult
        //進行flush的store的cf:storeFlushImpl映射
        TreeMap<byte[], StoreFlushContext> storeFlushCtxs = prepareResult.storeFlushCtxs;
        //flush生成的hfile的路徑,當前key是有的,爲cf,可是List<Path>爲null,是在internalPrepareFlushCache中初始化的
        TreeMap<byte[], List<Path>> committedFiles = prepareResult.committedFiles;
        long startTime = prepareResult.startTime;
        long flushOpSeqId = prepareResult.flushOpSeqId;
        long flushedSeqId = prepareResult.flushedSeqId;
        long totalFlushableSizeOfFlushableStores = prepareResult.totalFlushableSize;

        String s = "Flushing stores of " + this;
        status.setStatus(s);
        if (LOG.isTraceEnabled()) LOG.trace(s);

        // Any failure from here on out will be catastrophic requiring server
        // restart so wal content can be replayed and put back into the memstore.
        // Otherwise, the snapshot content while backed up in the wal, it will not
        // be part of the current running servers state.
        boolean compactionRequested = false;
        try {
            // A.  Flush memstore to all the HStores.
            // Keep running vector of all store files that includes both old and the
            // just-made new flush store file. The new flushed file is still in the
            // tmp directory.
            //迭代region下的每個store,調用HStore.storeFlushImpl.flushCache方法,把store中snapshot的數據flush到hfile中,固然這裏是flush到temp文件中,最終是經過commit將其移到正確的路徑下
            //
            //
            for (StoreFlushContext flush : storeFlushCtxs.values()) {
                flush.flushCache(status);
            }

            // Switch snapshot (in memstore) -> new hfile (thus causing
            // all the store scanners to reset/reseek).
            Iterator<Store> it = storesToFlush.iterator();
            // stores.values() and storeFlushCtxs have same order
            for (StoreFlushContext flush : storeFlushCtxs.values()) {
                boolean needsCompaction = flush.commit(status);
                if (needsCompaction) {
                    compactionRequested = true;
                }
                committedFiles.put(it.next().getFamily().getName(), flush.getCommittedFiles());
            }
            storeFlushCtxs.clear();

            // Set down the memstore size by amount of flush.
            this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores);

            if (wal != null) {
                // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
                FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
                        getRegionInfo(), flushOpSeqId, committedFiles);
                WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
                        desc, sequenceId, true);
            }
        } catch (Throwable t) {
            // An exception here means that the snapshot was not persisted.
            // The wal needs to be replayed so its content is restored to memstore.
            // Currently, only a server restart will do this.
            // We used to only catch IOEs but its possible that we'd get other
            // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
            // all and sundry.
            if (wal != null) {
                try {
                    FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
                            getRegionInfo(), flushOpSeqId, committedFiles);
                    WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
                            desc, sequenceId, false);
                } catch (Throwable ex) {
                    LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
                            StringUtils.stringifyException(ex));
                    // ignore this since we will be aborting the RS with DSE.
                }
                wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
            }
            DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
                    Bytes.toStringBinary(getRegionName()));
            dse.initCause(t);
            status.abort("Flush failed: " + StringUtils.stringifyException(t));
            throw dse;
        }

        // If we get to here, the HStores have been written.
        if (wal != null) {
            wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
        }

        // Record latest flush time
        for (Store store : storesToFlush) {
            this.lastStoreFlushTimeMap.put(store, startTime);
        }

        // Update the oldest unflushed sequence id for region.
        this.maxFlushedSeqId = flushedSeqId;

        // C. Finally notify anyone waiting on memstore to clear:
        // e.g. checkResources().
        synchronized (this) {
            notifyAll(); // FindBugs NN_NAKED_NOTIFY
        }

        long time = EnvironmentEdgeManager.currentTime() - startTime;
        long memstoresize = this.memstoreSize.get();
        String msg = "Finished memstore flush of ~"
                + StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/"
                + totalFlushableSizeOfFlushableStores + ", currentsize="
                + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
                + " for region " + this + " in " + time + "ms, sequenceid="
                + flushOpSeqId + ", compaction requested=" + compactionRequested
                + ((wal == null) ? "; wal=null" : "");
        LOG.info(msg);
        status.setStatus(msg);

        return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
                FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId);
    }

  咱們就只看其中兩個方法:flush.flushcache和flush.commit。這裏的flush即StoreFlushImpl。flushcache方法是用來執行第二階段,commit用來執行第三階段。

 public void flushCache(MonitoredTask status) throws IOException {
            //返回的是snapshotflush到臨時文件後,最終須要移到的正確路徑
            tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status);
        }

  轉到store的flushcache方法

protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
                                    MonitoredTask status) throws IOException {
        // If an exception happens flushing, we let it out without clearing
        // the memstore snapshot.  The old snapshot will be returned when we say
        // 'snapshot', the next time flush comes around.
        // Retry after catching exception when flushing, otherwise server will abort
        // itself
        StoreFlusher flusher = storeEngine.getStoreFlusher();
        IOException lastException = null;
        for (int i = 0; i < flushRetriesNumber; i++) {
            try {
                //調用StoreFlusher.flushsnapshot方法將snapshotflush到temp文件
                List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status);
                Path lastPathName = null;
                try {
                    for (Path pathName : pathNames) {
                        lastPathName = pathName;
                        validateStoreFile(pathName);
                    }
                    return pathNames;
                } catch (Exception e) {
                    LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
                    if (e instanceof IOException) {
                        lastException = (IOException) e;
                    } else {
                        lastException = new IOException(e);
                    }
                }
            } catch (IOException e) {
                LOG.warn("Failed flushing store file, retrying num=" + i, e);
                lastException = e;
            }
            if (lastException != null && i < (flushRetriesNumber - 1)) {
                try {
                    Thread.sleep(pauseTime);
                } catch (InterruptedException e) {
                    IOException iie = new InterruptedIOException();
                    iie.initCause(e);
                    throw iie;
                }
            }
        }
        throw lastException;
    }

 其中標紅的部分是主要的邏輯。首先經過storeEngine.getStoreFlusher獲取flush的實例,實際包括了sync到disk的writer以及append等操做。這裏再也不展開說明。咱們重點看一下for循環中的flusher.flushSnapshot方法,涉及到一個重要的環境變量cellsCount

public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
      MonitoredTask status) throws IOException {
    ArrayList<Path> result = new ArrayList<Path>();
    //這裏會調用snapshot的getCellsCount方法,之因此這裏提了這個方法,是由於其實一個prepare階段耗時較大的過程。
    int cellsCount = snapshot.getCellsCount();
    if (cellsCount == 0) return result; // don't flush if there are no entries

    // Use a store scanner to find which rows to flush.
    long smallestReadPoint = store.getSmallestReadPoint();
    InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
    if (scanner == null) {
      return result; // NULL scanner returned from coprocessor hooks means skip normal processing
    }

    StoreFile.Writer writer;
    try {
      // TODO:  We can fail in the below block before we complete adding this flush to
      //        list of store files.  Add cleanup of anything put on filesystem if we fail.
      synchronized (flushLock) {
        status.setStatus("Flushing " + store + ": creating writer");
        // Write the map out to the disk
        //這裏傳入的cellsCount實際並無用,多是預置的變量?
        writer = store.createWriterInTmp(
            cellsCount, store.getFamily().getCompression(), false, true, true);
        writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
        IOException e = null;
        try {
          //真正的將snapshot寫入臨時文件
          performFlush(scanner, writer, smallestReadPoint);
        } catch (IOException ioe) {
          e = ioe;
          // throw the exception out
          throw ioe;
        } finally {
          if (e != null) {
            writer.close();
          } else {
            finalizeWriter(writer, cacheFlushId, status);
          }
        }
      }
    } finally {
      scanner.close();
    }
    LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
        + StringUtils.humanReadableInt(snapshot.getSize()) +
        ", hasBloomFilter=" + writer.hasGeneralBloom() +
        ", into tmp file " + writer.getPath());
    result.add(writer.getPath());
    return result;
  }

  能夠看到store.createWriterInTmp中使用了該變量,繼續跟進

public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
                                              boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag)
            throws IOException {
。。。。。忽略不重要邏輯。。。。。 //這裏傳入的maxkeyCount沒有用 StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, this.getFileSystem()) .withFilePath(fs.createTempName()) .withComparator(comparator) .withBloomType(family.getBloomFilterType()) .withMaxKeyCount(maxKeyCount) .withFavoredNodes(favoredNodes) .withFileContext(hFileContext) .build(); return w; }

  可見將cellscount以參數的形式傳給了writer。而後執行performFlush方法,該方法經過scanner遍歷,而後使用hfile.writer將數據羅盤。咱們看一下Writer中將cellscount用來幹啥了。在整個writer中只有這兩個地方用到了

generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
          conf, cacheConf, bloomType,
          (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
this.deleteFamilyBloomFilterWriter = BloomFilterFactory
            .createDeleteBloomAtWrite(conf, cacheConf,
                (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);  

繼續跟進這兩個

 public static BloomFilterWriter createDeleteBloomAtWrite(Configuration conf,
      CacheConfig cacheConf, int maxKeys, HFile.Writer writer) {
    if (!isDeleteFamilyBloomEnabled(conf)) {
      LOG.info("Delete Bloom filters are disabled by configuration for "
          + writer.getPath()
          + (conf == null ? " (configuration is null)" : ""));
      return null;
    }

    float err = getErrorRate(conf);

    int maxFold = getMaxFold(conf);
    // In case of compound Bloom filters we ignore the maxKeys hint.
    CompoundBloomFilterWriter bloomWriter = new CompoundBloomFilterWriter(getBloomBlockSize(conf),
        err, Hash.getHashType(conf), maxFold, cacheConf.shouldCacheBloomsOnWrite(),
        KeyValue.RAW_COMPARATOR);
    writer.addInlineBlockWriter(bloomWriter);
    return bloomWriter;
  }

  可見maxKeys沒有使用,另外一個方法同理,因此這裏的cellscount變量在flush的第二階段沒有使用。

  到如今爲止咱們判斷出在第二階段cellcount沒有使用,咱們繼續跟進第三階段:回到internalFlushAndCOmmit中的flush.commit(status)
public boolean commit(MonitoredTask status) throws IOException {
            if (this.tempFiles == null || this.tempFiles.isEmpty()) {
                return false;
            }
            List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
            for (Path storeFilePath : tempFiles) {
                try {
                    storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status));
                } catch (IOException ex) {
                    LOG.error("Failed to commit store file " + storeFilePath, ex);
                    // Try to delete the files we have committed before.
                    for (StoreFile sf : storeFiles) {
                        Path pathToDelete = sf.getPath();
                        try {
                            sf.deleteReader();
                        } catch (IOException deleteEx) {
                            LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
                            Runtime.getRuntime().halt(1);
                        }
                    }
                    throw new IOException("Failed to commit the flush", ex);
                }
            }

            for (StoreFile sf : storeFiles) {
                if (HStore.this.getCoprocessorHost() != null) {
                    HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
                }
                committedFiles.add(sf.getPath());
            }

            HStore.this.flushedCellsCount += cacheFlushCount;
            HStore.this.flushedCellsSize += cacheFlushSize;

            // Add new file to store files.  Clear snapshot too while we have the Store write lock.
            return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
        }

  第三階段比較簡單,將flush的文件移動到hdfs正確的路徑下。同時可見在這裏用到了cellscount。這裏是賦值給store的flushedCellsCount,這裏主要是用來進行metric收集flushedCellsSize的。根據經驗這個metric可忽略,未使用過。

總結

這裏之因此老是提到cellscount變量,是由於給其賦值調用ConcurrentSkipListMap.size()方法在flush的第一階段中最耗時的,同時持有hbase region 級別的updatelock,可是經過梳理並無太大的用處,能夠幹掉。不然會所以一些毛刺,pct99比較高。已有patch,可是是應用在2.+的版本的、

整個flush的流程就結束了,若有不對的地方,歡迎指正。歡迎加微信相互交流:940184856

相關文章
相關標籤/搜索