[How to] 使用HBase協處理器---基本概念和regionObserver的簡單實現

1. 簡介html

    對於HBase的協處理器概念可由其官方博文了解:https://blogs.apache.org/hbase/entry/coprocessor_introductionjava

  整體來講其包含兩種協處理器:Observers和Endpoint。git

  其中Observers能夠理解問傳統數據庫的觸發器,當發生某一個特定操做的時候出發Observer。github

  1. RegionObserver:提供基於表的region上的Get, Put, Delete, Scan等操做,好比能夠在客戶端進行get操做的時候定義RegionObserver來查詢其時候具備get權限等。具體的方法(攔截點)有:
    preOpen, postOpen: Called before and after the region is reported as online to the master.
    preFlush, postFlush: Called before and after the memstore is flushed into a new store file.
    preGet, postGet: Called before and after a client makes a Get request.
    preExists, postExists: Called before and after the client tests for existence using a Get.
    prePut and postPut: Called before and after the client stores a value.
    preDelete and postDelete: Called before and after the client deletes a value.
  2. WALObserver:提供基於WAL的寫和刷新WAL文件的操做,一個regionserver上只有一個WAL的上下文。具體的方法(攔截點)有:
    preWALWrite/postWALWrite: called before and after a WALEdit written to WAL.
  3. MasterObserver:提供基於諸如ddl的的操做檢查,如create, delete, modify table等,一樣的當客戶端delete表的時候經過邏輯檢查時候具備此權限場景等。其運行於Master進程中。具體的方法(攔截點)有:
    preCreateTable/postCreateTable: Called before and after the region is reported as online to the master.
    preDeleteTable/postDeleteTable

    以上對於Observer的邏輯以RegionObserver舉例來講其時序圖以下:shell

  其中Endpoint能夠理解爲傳統數據庫的存儲過程操做,好比能夠進行某族某列值得加和。無Endpoint特性的狀況下須要全局掃描表,經過Endpoint則能夠在多臺數據庫

  分佈有對應表的regionserver上同步加和,在江加和數返回給客戶端進行全局加和操做,充分利用了集羣資源,增長性能。Endpoint基本概念以下圖:apache

 

2. 二者代碼實現細節的差別編程

  在實現兩種協處理器的時候稍有區別。不管哪一種協處理器都須要運行於Server端的環境中。其中Endpoint還須要經過protocl來定義接口實現客戶端代碼進行rpc通訊以此來進行數據的蒐集歸併。而Observer則不須要客戶端代碼,只在特定操做發生的時候出發服務端代碼的實現。服務器

 

3. Observer協處理器的實現框架

  相對來講Observer的實現來的簡單點,只須要實現服務端代碼邏輯便可。經過實現一個RegionserverObserver來加深瞭解。

  所要實現的功能:

假定某個表有A和B兩個列--------------------------------便於後續描述咱們稱之爲coprocessor_table
1. 當咱們向A列插入數據的時候經過協處理器像B列也插入數據。
2.在讀取數據的時候只容許客戶端讀取B列數據而不能讀取A列數據。換句話說A列是隻寫 B列是隻讀的。(爲了簡單起見,用戶在讀取數據的時候須要制定列名)
3. A列值必須是整數,換句話說B列值也天然都是整數

4.當刪除操做的時候不能指定刪除B列
5.當刪除A列的時候同時須要刪除B列
6.對於其餘列的刪除不作檢查

 在上述功能點肯定後,咱們就要開始實現這兩個功能。好在HBase API中有BaseRegionObserver,這個類已經幫助咱們實現了大部分的默認實現,咱們只要專一於業務上的方法重載便可。

代碼框架:  

public class 協處理器類名稱 extends BaseRegionObserver {
    private static final Log LOG = LogFactory.getLog(協處理器類名稱.class);
    private RegionCoprocessorEnvironment env = null;// 協處理器是運行於region中的,每個region都會加載協處理器
    // 這個方法會在regionserver打開region時候執行(尚未真正打開)
    @Override
    public void start(CoprocessorEnvironment e) throws IOException {
        env = (RegionCoprocessorEnvironment) e;
    }

    // 這個方法會在regionserver關閉region時候執行(尚未真正關閉)
    @Override
    public void stop(CoprocessorEnvironment e) throws IOException {
        // nothing to do here
    }

    /**
     * 出發點,好比能夠重寫prePut postPut等方法,這樣就能夠在數據插入前和插入後作邏輯控制了。
     */
    @Override

業務代碼實現 :

  根據上述需求和代碼框架,具體邏輯實現以下。

  • 在插入須要作檢查因此重寫了prePut方法
  • 在刪除前須要作檢查因此重寫了preDelete方法
public class MyRegionObserver extends BaseRegionObserver {
    private static final Log LOG = LogFactory.getLog(MyRegionObserver.class);
    
    private RegionCoprocessorEnvironment env = null;
    // 設定只有F族下的列才能被操做,且A列只寫,B列只讀。的語言
    private static final String FAMAILLY_NAME = "F";
    private static final String ONLY_PUT_COL = "A";
    private static final String ONLY_READ_COL = "B";

    // 協處理器是運行於region中的,每個region都會加載協處理器
    // 這個方法會在regionserver打開region時候執行(尚未真正打開)
    @Override
    public void start(CoprocessorEnvironment e) throws IOException {
        env = (RegionCoprocessorEnvironment) e;
    }

    // 這個方法會在regionserver關閉region時候執行(尚未真正關閉)
    @Override
    public void stop(CoprocessorEnvironment e) throws IOException {
        // nothing to do here
    }

    /**
     * 需求 1.不容許插入B列 2.只能插入A列 3.插入的數據必須爲整數 4.插入A列的時候自動插入B列
     */
    @Override
    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
            final Put put, final WALEdit edit, final Durability durability)
            throws IOException {

        // 首先查看單個put中是否有對只讀列有寫操做
        List<Cell> cells = put.get(Bytes.toBytes(FAMAILLY_NAME),
                Bytes.toBytes(ONLY_READ_COL));
        if (cells != null && cells.size() != 0) {
            LOG.warn("User is not allowed to write read_only col.");
            throw new IOException("User is not allowed to write read_only col.");
        }

        // 檢查A列
        cells = put.get(Bytes.toBytes(FAMAILLY_NAME),
                Bytes.toBytes(ONLY_PUT_COL));
        if (cells == null || cells.size() == 0) {
            // 當不存在對於A列的操做的時候則不作任何的處理,直接放行便可
            LOG.info("No A col operation, just do it.");
            return;
        }

        // 當A列存在的狀況下在進行值得檢查,查看是否插入了整數
        byte[] aValue = null;
        for (Cell cell : cells) {
            try {
                aValue = CellUtil.cloneValue(cell);
                LOG.warn("aValue = " + Bytes.toString(aValue));
                Integer.valueOf(Bytes.toString(aValue));
            } catch (Exception e1) {
                LOG.warn("Can not put un number value to A col.");
                throw new IOException("Can not put un number value to A col.");
            }
        }

        // 當一切都ok的時候再去構建B列的值,由於按照需求,插入A列的時候須要同時插入B列
        LOG.info("B col also been put value!");
        put.addColumn(Bytes.toBytes(FAMAILLY_NAME),
                Bytes.toBytes(ONLY_READ_COL), aValue);
    }

    /**
     * 需求 1.不能刪除B列 2.只能刪除A列 3.刪除A列的時候須要一併刪除B列
     */
    @Override
    public void preDelete(
            final ObserverContext<RegionCoprocessorEnvironment> e,
            final Delete delete, final WALEdit edit, final Durability durability)
            throws IOException {

        // 首先查看是否對於B列進行了指定刪除
        List<Cell> cells = delete.getFamilyCellMap().get(
                Bytes.toBytes(FAMAILLY_NAME));
        if (cells == null || cells.size() == 0) {
            // 若是客戶端沒有針對於FAMAILLY_NAME列族的操做則不用關心,讓其繼續操做便可。
            LOG.info("NO F famally operation ,just do it.");
            return;
        }

        // 開始檢查F列族內的操做狀況
        byte[] qualifierName = null;
        boolean aDeleteFlg = false;
        for (Cell cell : cells) {
            qualifierName = CellUtil.cloneQualifier(cell);

            // 檢查是否對B列進行了刪除,這個是不容許的
            if (Arrays.equals(qualifierName, Bytes.toBytes(ONLY_READ_COL))) {
                LOG.info("Can not delete read only B col.");
                throw new IOException("Can not delete read only B col.");
            }

            // 檢查是否存在對於A隊列的刪除
            if (Arrays.equals(qualifierName, Bytes.toBytes(ONLY_PUT_COL))) {
                LOG.info("there is A col in delete operation!");
                aDeleteFlg = true;
            }
        }

        // 若是對於A列有刪除,則須要對B列也要刪除
        if (aDeleteFlg)
        {
            LOG.info("B col also been deleted!");
            delete.addColumn(Bytes.toBytes(FAMAILLY_NAME), Bytes.toBytes(ONLY_READ_COL));
        }
    }

}

4. Observer協處理器上傳加載 

  完成實現後須要將協處理器類打包成jar文件,對於協處理器的加載一般有三種方法:

  1.配置文件加載:即經過hbase-site.xml文件配置加載,通常這樣的協處理器是系統級別的,全局的協處理器,如權限控制等檢查。

  2.shell加載:能夠經過alter命令來對錶進行scheme進行修改來加載協處理器。

  3.經過API代碼實現:即經過API的方式來加載協處理器。

上述加載方法中,1,3都須要將協處理器jar文件放到集羣的hbase的classpath中。而2方法只須要將jar文件上傳至集羣環境的hdfs便可。

下面咱們只介紹如何經過2方法進行加載。

  步驟1:經過以下方法建立表

hbase(main):001:0> create 'coprocessor_table','F'
0 row(s) in 2.7570 seconds

=> Hbase::Table - coprocessor_table

 

   步驟2:經過alter命令將協處理器加載到表中

alter 'coprocessor_table' , METHOD =>'table_att','coprocessor'=>'hdfs://ns1/testdata/Test-HBase-Observer.jar|cn.com.newbee.feng.MyRegionObserver|1001'

  其中:'coprocessor'=>'jar文件在hdfs上的絕對路徑|協處理器主類|優先級|協處理器參數

  上述協處理器並無參數,因此未給出參數,對於協處理器的優先級不在此作討論。

  步驟3:檢查協處理器的加載

hbase(main):021:0> describe 'coprocessor_table'
Table coprocessor_table is ENABLED                                              
coprocessor_table, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://ns1/testdata/T
est-HBase-Observer.jar|cn.com.newbee.feng.MyRegionObserver|1001'}               
COLUMN FAMILIES DESCRIPTION                                                     
{NAME => 'F', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_S
COPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL =>
 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE => '65536', IN_MEMORY => 'f
alse', BLOCKCACHE => 'true'}                                                    
1 row(s) in 0.0300 seconds

   能夠看到協處理器被表成功加載,其實內部是將update全部此表的region去加載協處理器的。

 

5. Observer協處理器測試

  通過上述代碼完成和加載完成後咱們進行簡單的協處理器測試,因爲Observer並不須要咱們去定製客戶端代碼,因此咱們能夠直接經過shell命令環境來測試。

  用例1: 正常插入A列

hbase(main):024:0> scan 'coprocessor_table'
ROW                   COLUMN+CELL                                               
0 row(s) in 0.0100 seconds

hbase(main):025:0> put 'coprocessor_table','row1','F:A',123
0 row(s) in 0.0210 seconds

hbase(main):026:0> scan 'coprocessor_table'
ROW                   COLUMN+CELL                                               
 row1                 column=F:A, timestamp=1469838240645, value=123            
 row1                 column=F:B, timestamp=1469838240645, value=123            
1 row(s) in 0.0180 seconds

 

  結果:B列也被插入,OK

  

  用例2:插入A列,可是值不爲整數

hbase(main):027:0> put 'coprocessor_table','row1','F:A','cc'

ERROR: Failed 1 action: IOException: 1 time, 

 

   結果:插入失敗,服務端報以下錯誤,OK

2016-07-29 20:25:45,406 WARN  [B.defaultRpcServer.handler=3,queue=0,port=60020] feng.MyRegionObserver: Can not put un number value to A col.

 

 

  用例3:插入B列

hbase(main):028:0> put 'coprocessor_table','row1','F:B',123

ERROR: Failed 1 action: IOException: 1 time,

 

  結果:插入失敗,服務器報以下錯誤,OK

2016-07-29 20:27:13,342 WARN  [B.defaultRpcServer.handler=20,queue=2,port=60020] feng.MyRegionObserver: User is not allowed to write read_only col.

 

 

  用例4:刪除B列

hbase(main):029:0> delete 'coprocessor_table','row1','F:B'

ERROR: java.io.IOException: Can not delete read only B col.

 

   結果:刪除失敗,OK

  

  用例4:刪除A列

hbase(main):030:0> scan 'coprocessor_table'
ROW                   COLUMN+CELL                                                
 row1                 column=F:A, timestamp=1469838240645, value=123             
 row1                 column=F:B, timestamp=1469838240645, value=123             
1 row(s) in 0.0230 seconds

hbase(main):031:0> delete 'coprocessor_table','row1','F:A'
0 row(s) in 0.0060 seconds

hbase(main):032:0> scan 'coprocessor_table'
ROW                   COLUMN+CELL                                                
0 row(s) in 0.0070 seconds

  結果:A列和B列同時被刪除了。

 

6. Observer協處理器總結

  Observer協處理器也能夠看做爲服務端的攔截器,用戶能夠根據需求肯定攔截點,在去重寫這些攔截點對應的方法便可,整個過程當中不須要重啓集羣,在不修改HBase內部代碼的狀況下對HBase擴展更加方便。如擴展權限,擴展索引等都很是有用。

 

參考:

  HBase 協處理器編程詳解第一部分:Server 端代碼編寫

  Coprocessor Introduction

 

代碼下載:

  https://github.com/xufeng79x/Test-HBase-Observer

相關文章
相關標籤/搜索