1. 簡介html
對於HBase的協處理器概念可由其官方博文了解:https://blogs.apache.org/hbase/entry/coprocessor_introductionjava
整體來講其包含兩種協處理器:Observers和Endpoint。git
其中Observers能夠理解問傳統數據庫的觸發器,當發生某一個特定操做的時候出發Observer。github
preOpen, postOpen: Called before and after the region is reported as online to the master. preFlush, postFlush: Called before and after the memstore is flushed into a new store file. preGet, postGet: Called before and after a client makes a Get request. preExists, postExists: Called before and after the client tests for existence using a Get. prePut and postPut: Called before and after the client stores a value. preDelete and postDelete: Called before and after the client deletes a value.
preWALWrite/postWALWrite: called before and after a WALEdit written to WAL.
preCreateTable/postCreateTable: Called before and after the region is reported as online to the master. preDeleteTable/postDeleteTable
以上對於Observer的邏輯以RegionObserver舉例來講其時序圖以下:shell
其中Endpoint能夠理解爲傳統數據庫的存儲過程操做,好比能夠進行某族某列值得加和。無Endpoint特性的狀況下須要全局掃描表,經過Endpoint則能夠在多臺數據庫
分佈有對應表的regionserver上同步加和,在江加和數返回給客戶端進行全局加和操做,充分利用了集羣資源,增長性能。Endpoint基本概念以下圖:apache
2. 二者代碼實現細節的差別編程
在實現兩種協處理器的時候稍有區別。不管哪一種協處理器都須要運行於Server端的環境中。其中Endpoint還須要經過protocl來定義接口實現客戶端代碼進行rpc通訊以此來進行數據的蒐集歸併。而Observer則不須要客戶端代碼,只在特定操做發生的時候出發服務端代碼的實現。服務器
3. Observer協處理器的實現框架
相對來講Observer的實現來的簡單點,只須要實現服務端代碼邏輯便可。經過實現一個RegionserverObserver來加深瞭解。
所要實現的功能:
假定某個表有A和B兩個列--------------------------------便於後續描述咱們稱之爲coprocessor_table 1. 當咱們向A列插入數據的時候經過協處理器像B列也插入數據。 2.在讀取數據的時候只容許客戶端讀取B列數據而不能讀取A列數據。換句話說A列是隻寫 B列是隻讀的。(爲了簡單起見,用戶在讀取數據的時候須要制定列名) 3. A列值必須是整數,換句話說B列值也天然都是整數 4.當刪除操做的時候不能指定刪除B列 5.當刪除A列的時候同時須要刪除B列 6.對於其餘列的刪除不作檢查
在上述功能點肯定後,咱們就要開始實現這兩個功能。好在HBase API中有BaseRegionObserver,這個類已經幫助咱們實現了大部分的默認實現,咱們只要專一於業務上的方法重載便可。
代碼框架:
public class 協處理器類名稱 extends BaseRegionObserver { private static final Log LOG = LogFactory.getLog(協處理器類名稱.class); private RegionCoprocessorEnvironment env = null;// 協處理器是運行於region中的,每個region都會加載協處理器 // 這個方法會在regionserver打開region時候執行(尚未真正打開) @Override public void start(CoprocessorEnvironment e) throws IOException { env = (RegionCoprocessorEnvironment) e; } // 這個方法會在regionserver關閉region時候執行(尚未真正關閉) @Override public void stop(CoprocessorEnvironment e) throws IOException { // nothing to do here } /** * 出發點,好比能夠重寫prePut postPut等方法,這樣就能夠在數據插入前和插入後作邏輯控制了。 */ @Override
業務代碼實現 :
根據上述需求和代碼框架,具體邏輯實現以下。
public class MyRegionObserver extends BaseRegionObserver { private static final Log LOG = LogFactory.getLog(MyRegionObserver.class); private RegionCoprocessorEnvironment env = null; // 設定只有F族下的列才能被操做,且A列只寫,B列只讀。的語言 private static final String FAMAILLY_NAME = "F"; private static final String ONLY_PUT_COL = "A"; private static final String ONLY_READ_COL = "B"; // 協處理器是運行於region中的,每個region都會加載協處理器 // 這個方法會在regionserver打開region時候執行(尚未真正打開) @Override public void start(CoprocessorEnvironment e) throws IOException { env = (RegionCoprocessorEnvironment) e; } // 這個方法會在regionserver關閉region時候執行(尚未真正關閉) @Override public void stop(CoprocessorEnvironment e) throws IOException { // nothing to do here } /** * 需求 1.不容許插入B列 2.只能插入A列 3.插入的數據必須爲整數 4.插入A列的時候自動插入B列 */ @Override public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, final WALEdit edit, final Durability durability) throws IOException { // 首先查看單個put中是否有對只讀列有寫操做 List<Cell> cells = put.get(Bytes.toBytes(FAMAILLY_NAME), Bytes.toBytes(ONLY_READ_COL)); if (cells != null && cells.size() != 0) { LOG.warn("User is not allowed to write read_only col."); throw new IOException("User is not allowed to write read_only col."); } // 檢查A列 cells = put.get(Bytes.toBytes(FAMAILLY_NAME), Bytes.toBytes(ONLY_PUT_COL)); if (cells == null || cells.size() == 0) { // 當不存在對於A列的操做的時候則不作任何的處理,直接放行便可 LOG.info("No A col operation, just do it."); return; } // 當A列存在的狀況下在進行值得檢查,查看是否插入了整數 byte[] aValue = null; for (Cell cell : cells) { try { aValue = CellUtil.cloneValue(cell); LOG.warn("aValue = " + Bytes.toString(aValue)); Integer.valueOf(Bytes.toString(aValue)); } catch (Exception e1) { LOG.warn("Can not put un number value to A col."); throw new IOException("Can not put un number value to A col."); } } // 當一切都ok的時候再去構建B列的值,由於按照需求,插入A列的時候須要同時插入B列 LOG.info("B col also been put value!"); put.addColumn(Bytes.toBytes(FAMAILLY_NAME), Bytes.toBytes(ONLY_READ_COL), aValue); } /** * 需求 1.不能刪除B列 2.只能刪除A列 3.刪除A列的時候須要一併刪除B列 */ @Override public void preDelete( final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete, final WALEdit edit, final Durability durability) throws IOException { // 首先查看是否對於B列進行了指定刪除 List<Cell> cells = delete.getFamilyCellMap().get( Bytes.toBytes(FAMAILLY_NAME)); if (cells == null || cells.size() == 0) { // 若是客戶端沒有針對於FAMAILLY_NAME列族的操做則不用關心,讓其繼續操做便可。 LOG.info("NO F famally operation ,just do it."); return; } // 開始檢查F列族內的操做狀況 byte[] qualifierName = null; boolean aDeleteFlg = false; for (Cell cell : cells) { qualifierName = CellUtil.cloneQualifier(cell); // 檢查是否對B列進行了刪除,這個是不容許的 if (Arrays.equals(qualifierName, Bytes.toBytes(ONLY_READ_COL))) { LOG.info("Can not delete read only B col."); throw new IOException("Can not delete read only B col."); } // 檢查是否存在對於A隊列的刪除 if (Arrays.equals(qualifierName, Bytes.toBytes(ONLY_PUT_COL))) { LOG.info("there is A col in delete operation!"); aDeleteFlg = true; } } // 若是對於A列有刪除,則須要對B列也要刪除 if (aDeleteFlg) { LOG.info("B col also been deleted!"); delete.addColumn(Bytes.toBytes(FAMAILLY_NAME), Bytes.toBytes(ONLY_READ_COL)); } } }
4. Observer協處理器上傳加載
完成實現後須要將協處理器類打包成jar文件,對於協處理器的加載一般有三種方法:
1.配置文件加載:即經過hbase-site.xml文件配置加載,通常這樣的協處理器是系統級別的,全局的協處理器,如權限控制等檢查。
2.shell加載:能夠經過alter命令來對錶進行scheme進行修改來加載協處理器。
3.經過API代碼實現:即經過API的方式來加載協處理器。
上述加載方法中,1,3都須要將協處理器jar文件放到集羣的hbase的classpath中。而2方法只須要將jar文件上傳至集羣環境的hdfs便可。
下面咱們只介紹如何經過2方法進行加載。
步驟1:經過以下方法建立表
hbase(main):001:0> create 'coprocessor_table','F'
0 row(s) in 2.7570 seconds
=> Hbase::Table - coprocessor_table
步驟2:經過alter命令將協處理器加載到表中
alter 'coprocessor_table' , METHOD =>'table_att','coprocessor'=>'hdfs://ns1/testdata/Test-HBase-Observer.jar|cn.com.newbee.feng.MyRegionObserver|1001'
其中:'coprocessor'=>'jar文件在hdfs上的絕對路徑|協處理器主類|優先級|協處理器參數
上述協處理器並無參數,因此未給出參數,對於協處理器的優先級不在此作討論。
步驟3:檢查協處理器的加載
hbase(main):021:0> describe 'coprocessor_table' Table coprocessor_table is ENABLED coprocessor_table, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://ns1/testdata/T est-HBase-Observer.jar|cn.com.newbee.feng.MyRegionObserver|1001'} COLUMN FAMILIES DESCRIPTION {NAME => 'F', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_S COPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE => '65536', IN_MEMORY => 'f alse', BLOCKCACHE => 'true'} 1 row(s) in 0.0300 seconds
能夠看到協處理器被表成功加載,其實內部是將update全部此表的region去加載協處理器的。
5. Observer協處理器測試
通過上述代碼完成和加載完成後咱們進行簡單的協處理器測試,因爲Observer並不須要咱們去定製客戶端代碼,因此咱們能夠直接經過shell命令環境來測試。
用例1: 正常插入A列
hbase(main):024:0> scan 'coprocessor_table' ROW COLUMN+CELL 0 row(s) in 0.0100 seconds hbase(main):025:0> put 'coprocessor_table','row1','F:A',123 0 row(s) in 0.0210 seconds hbase(main):026:0> scan 'coprocessor_table' ROW COLUMN+CELL row1 column=F:A, timestamp=1469838240645, value=123 row1 column=F:B, timestamp=1469838240645, value=123 1 row(s) in 0.0180 seconds
結果:B列也被插入,OK
用例2:插入A列,可是值不爲整數
hbase(main):027:0> put 'coprocessor_table','row1','F:A','cc'
ERROR: Failed 1 action: IOException: 1 time,
結果:插入失敗,服務端報以下錯誤,OK
2016-07-29 20:25:45,406 WARN [B.defaultRpcServer.handler=3,queue=0,port=60020] feng.MyRegionObserver: Can not put un number value to A col.
用例3:插入B列
hbase(main):028:0> put 'coprocessor_table','row1','F:B',123
ERROR: Failed 1 action: IOException: 1 time,
結果:插入失敗,服務器報以下錯誤,OK
2016-07-29 20:27:13,342 WARN [B.defaultRpcServer.handler=20,queue=2,port=60020] feng.MyRegionObserver: User is not allowed to write read_only col.
用例4:刪除B列
hbase(main):029:0> delete 'coprocessor_table','row1','F:B'
ERROR: java.io.IOException: Can not delete read only B col.
結果:刪除失敗,OK
用例4:刪除A列
hbase(main):030:0> scan 'coprocessor_table' ROW COLUMN+CELL row1 column=F:A, timestamp=1469838240645, value=123 row1 column=F:B, timestamp=1469838240645, value=123 1 row(s) in 0.0230 seconds hbase(main):031:0> delete 'coprocessor_table','row1','F:A' 0 row(s) in 0.0060 seconds hbase(main):032:0> scan 'coprocessor_table' ROW COLUMN+CELL 0 row(s) in 0.0070 seconds
結果:A列和B列同時被刪除了。
6. Observer協處理器總結
Observer協處理器也能夠看做爲服務端的攔截器,用戶能夠根據需求肯定攔截點,在去重寫這些攔截點對應的方法便可,整個過程當中不須要重啓集羣,在不修改HBase內部代碼的狀況下對HBase擴展更加方便。如擴展權限,擴展索引等都很是有用。
參考:
HBase 協處理器編程詳解第一部分:Server 端代碼編寫
代碼下載: