HBase 手動 flush 機制梳理

對應 HBase 版本0.94.1,對照了開源的版本和工做使用的某發行版shell

問題:在 HBase shell 裏面輸入 flush 'table_or_region_name'以後,發生了什麼?具體的實現是怎麼樣的?對於現有的某個表,我如何在作操做以前估算 flush 執行的時間?apache

1. HBase shell 入口

HBase shell 使用 ruby 實現,在 putty 敲hbase shell,調用的是${HBASE_HOME}/bin/hbase這個 bash 腳本,根據shell這個參數,觸發調用 ruby 代碼,相關的部分以下:ruby

if [ "$COMMAND" = "shell" ] ; then
if [ "$JRUBY_HOME" != "" ] ; then
CLASSPATH="$JRUBY_HOME/lib/jruby.jar:$CLASSPATH"
HBASE_OPTS="$HBASE_OPTS -Djruby.home=$JRUBY_HOME -Djruby.lib=$JRUBY_HOME/lib"
fi
CLASS="org.jruby.Main -X+O ${JRUBY_OPTS} ${HBASE_HOME}/bin/hirb.rb"

在 hirb.rb 裏面,引入相關的包(${HBASE_HOME}/lib/ruby目錄下),而後啓動一個運行的 CLI 環境。bash

進入正題了。數據結構

在 hbase shell 裏面,全部執行的命令,都在${HBASE_HOME}/lib/ruby/shell/commands目錄下,有對應的${COMMAND}.rb的對應文件。併發

找到 flush.rb,核心代碼以下:oop

def command(table_or_region_name)
format_simple_command do
admin.flush(table_or_region_name)
end
end

這裏調用了 admin.rb 這個文件裏面的方法:this

@admin = org.apache.hadoop.hbase.client.HBaseAdmin.new(configuration)
def flush(table_or_region_name)
@admin.flush(table_or_region_name)
end

到這裏,就找到了 Java 程序的入口,調用了 HBaseAdmin.flush(table_or_region_name)這個方法。
後續幾部分的類圖以下:spa

2. HBaseAdmin 包裝

HBaseAdmin 類下面包含了三個 flush 方法:code

public void flush(String tableNameOrRegionName) throws IOException, InterruptedException {}
public void flush(byte[] tableNameOrRegionName) throws IOException, InterruptedException {}
private void flush(ServerName sn, HRegionInfo hri) throws IOException {}


  • 第一個,做爲入口,將 String 參數轉化爲 byte[],交給第二個
  • 第二個,主要的工做方法,按輸入參數是 region 名、分區表、不分區表,分別進行處理
  • 第三個,單獨針對 region 進行 flush

第一個略過。

第二個,邏輯清晰:

  • 若是是參數爲 Region,就調用第三個 flush 處理
  • 若是不是分區表,就獲取該表包含的全部 Region,挨個調用第三個 flush 處理,
  • 若是地分區表,處理方式與其餘的不一樣,調用了一個分區表公共處理方法 execPartitionTableAction 訂製實現了匿名類 PartitionTableActionCallableFactory,進行單獨處理。

注意

  • 對於沒有預分區的表,簡單地在一個 for 循環裏面,串行處理
  • 對於分區表,execPartitionTableAction中使用了併發數據結構 Future,對分區是並行處理

第三個,對每一個 Region 進行 flush,其實是第二個 flush 中全部 case 最終的歸宿。

在第三個 flush 中,實現代碼以下:

HRegionInterface rs = this.connection.getHRegionConnection(sn.getHostname(), sn.getPort());
rs.flushRegion(hri);


HRegionInterface 是一個抽象接口,flushRegion 是一個抽象方法。在0.94.1這個版本下,只有 HRegionServer 實現了 HRegionInterface 接口,因此要在 HRegionServer 裏面找到具體的代碼實現。

3. HRegionServer 包裝

在 HRegionServer 類裏面,包含了三個 flush 的實現:

public void flushRegion(byte[] regionName) throws IllegalArgumentException, IOException {}
public void flushRegion(byte[] regionName, long ifOlderThanTS) throws IllegalArgumentException, IOException {}
@QosPriority (priority=100)
public void flushRegion(HRegionInfo regionInfo) throws NotServingRegionException, IOException {}

 

  • 第一個,簡單地傳入 regionName,肯定 Region 在線,而後調用region.flushcache()
  • 第二個,傳入 regionName 和 超時時間戳 ifOlderThanTS ,肯定 Region 在線,且未超時的狀況下,將數據 flush 出去
  • 第三個,@QosPriority (priority=100)標記,使用了自定義聲明,給該方法賦值 rpc 調用的優先級;方法體checkOpen()檢查 RegionServer 在線後,調用region.flushcache()

接下來,查看看下 HRegion 類下面flushcache()的實現。

4. HRegion 實現

flushcache只是個入口方法,會作一些 flush 以前的準備工做,包括:創建任務狀態監控、判斷 Coprocessor、處理未 WAL 的 put 、寫加鎖等。以後,調用內部方法internalFlushcache開始flush。

在 internalFlushcache 方法實現中,作了 MVCC 的一些工做,最終,調用了StoreFlusher的flushCache方法實現。

internalFlushcache 爲了保證數據一致性作了不少的檢查、校驗、加鎖,目前功力不夠,先標記下,進入下一層。

看下 StoreFlusher 的實現。

5. StoreFlusher 實現

StoreFlusher 是個接口,在0.94.1這個版本里面,只有 Store.StoreFlusherImpl 一個實現類。

在 StoreFlusher 接口裏面能夠看到,flush 操做執行的過程當中包含3個部分:

  1. prepare,這是個短操做,建立 snapshot,這個過程當中會暫停寫操做
  2. flushCache,flush 執行的過程當中,是不會阻塞該 store 上的任何操做(讀寫)
  3. commit,將 flush 出的文件添加到 store 目錄下,清除 memstore 快照,短操做,會足暫停 scan

6. StoreFlusherImpl 實現

StoreFlusherImpl 是 Store 類的內部私有類,前面提到的 StoreFlusher 的3個方法,由 StoreFlusherImpl實現後,prepare 是本身實現,flushCache和 commit 都是調用外部 Store類的方法來完成。

6.1 prepare

public void prepare() {
memstore.snapshot();
this.snapshot = memstore.getSnapshot();
this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
}


調用了 MemStore 的方法,作快照。

6.2 flushCache

從 StoreFlusherImpl 調用 Store 類的flushCache方法,包裝了internalFlushCache方法來實現。
邏輯比較清晰:

  • 啓動一個 StoreScanner,根據時間戳和ScanType 參數,找出須要被 Flush 的行
  • 啓動一個StoreFile Writer,把讀出來的數據,寫入到一個 StoreFile 中,並將該 StoreFile 的路徑返回,供後續 commit 階段使用

6.3 commit

StoreFlusherImpl 類的 commit 方法首先調用外部 Store類的commitFile方法,主要作的事情有兩件:

  • 將 flushCache 生成的 StoreFile 移動到 Store所在目錄下
  • 更新 Store 的相關統計參數

而後會調用外部 Store類的updateStorefiles更新 Store 類的 storefile,更新文件後,須要調用needsCompaction(),查看下是否由於本次 flush 執行形成的文件變化會觸發 Compaction。若是觸發 Compaction,會啓動 Compaction 相關的一套機制繼續執行,後續再單獨介紹。

至此,手動 flush 操做背後的實現,初步梳理完畢。前面只是一個調用路徑的梳理,後面繼續豐富和補充。

相關文章
相關標籤/搜索