Coprocessors
以前咱們的filter都是在客戶端定義,而後傳到服務端去執行的,這個Coprocessors是在服務端定義,在客戶端調用,而後在服務端執行,他有點兒想咱們熟悉的存儲過程,傳一些參數進去,而後進行咱們事先定義好的操做,咱們經常用它來作一些好比二次索引啊,統計函數什麼的,它也和自定義filter同樣,須要事先定好,而後在hbase-env.sh中的HBASE_CLASSPATH中指明,就像個人上一篇中的寫的那樣。java
Coprocessors分兩種,observer和endpoint。數據庫
(1)observer就像觸發器同樣,當某個事件發生的時候,它就出發。ide
已經有一些內置的接口讓咱們去實現,RegionObserver、MasterObserver、WALObserver,看名字就大概知道他們是幹嗎的。函數
(2)endpoint能夠認爲是自定義函數,能夠把這個理解爲關係數據庫的存儲過程。spa
全部的Coprocessor都是實現自Coprocessor 接口,它分SYSTEM和USER,前者的優先級比後者的優先級高,先執行。3d
它有兩個方法,start和stop方法,兩個方法都有一個相同的上下文對象CoprocessorEnvironment。code
void start(CoprocessorEnvironment env) throws IOException;
void stop(CoprocessorEnvironment env) throws IOException;orm這是CoprocessorEnvironment的方法。server
Working with Tablesxml
對錶進行操做的時候,必須先調用getTable方法活得HTable,不能夠本身定義一個HTable,目前貌似沒有禁止,可是未來會禁止。
而且在對錶操做的時候,不能對行加鎖。
Coprocessor Loading
Coprocessor加載須要在配置文件裏面全局加載,好比在hbase-site.xml中設置。
<property> <name>hbase.coprocessor.region.classes</name> <value>coprocessor.RegionObserverExample,coprocessor.AnotherCoprocessor</value> </property> <property> <name>hbase.coprocessor.master.classes</name> <value>coprocessor.MasterObserverExample</value> </property> <property> <name>hbase.coprocessor.wal.classes</name> <value>coprocessor.WALObserverExample,bar.foo.MyWALObserver</value> </property>咱們自定義的時間能夠註冊到三個配置項上,分別是hbase.coprocessor.region.classes,hbase.coprocessor.master.classes,
hbase.coprocessor.wal.classes上,他們分別負責region,master,wal,註冊到region的要特別注意當心,由於它是針對全部表的。
<property> <name>hbase.coprocessor.region.classes</name> <value>coprocessor.RegionObserverExample</value></property>註冊到這三個觸發器上,能夠監控到幾乎全部咱們的操做上面,很是恐怖。。能夠說是想要什麼就有什麼,詳細的代碼你們本身去摸索。
EndPoint的能夠用來定義聚合函數,咱們能夠調用CoprocessorProtocol中的方法來實現咱們的需求。
調用coprocessorProxy() 傳一個單獨的row key,這是在單獨一個region上操做的。
要在全部region上面操做,咱們要調用coprocessorExec()方法 傳一個開始row key 和結束row key。
Demo
說了那麼多廢話,我都很差意思再說了,來個例子吧,統計行數的。
public interface RowCountProtocol extends CoprocessorProtocol { long getRowCount() throws IOException; long getRowCount(Filter filter) throws IOException; long getKeyValueCount() throws IOException; }public class RowCountEndpoint extends BaseEndpointCoprocessor implements RowCountProtocol { private long getCount(Filter filter, boolean countKeyValues) throws IOException { Scan scan = new Scan(); scan.setMaxVersions(1); if (filter != null) { scan.setFilter(filter); } RegionCoprocessorEnvironment environment = (RegionCoprocessorEnvironment) getEnvironment(); // use an internal scanner to perform scanning. InternalScanner scanner = environment.getRegion().getScanner(scan); int result = 0; try { List<KeyValue> curVals = new ArrayList<KeyValue>(); boolean done = false; do { curVals.clear(); done = scanner.next(curVals); result += countKeyValues ? curVals.size() : 1; } while (done); } finally { scanner.close(); } return result; } @Override public long getRowCount() throws IOException { return getRowCount(new FirstKeyOnlyFilter()); } @Override public long getRowCount(Filter filter) throws IOException { return getCount(filter, false); } @Override public long getKeyValueCount() throws IOException { return getCount(null, true); } }
寫完以後,註冊一下吧。
<property> <name>hbase.coprocessor.region.classes</name> <value>coprocessor.RowCountEndpoint</value></property>
JAVA 客戶端調用
在服務端定義以後,咱們怎麼在客戶端用java代碼調用呢,看下面的例子你就明白啦!
public class EndPointExample { public static void main(String[] args) throws IOException { Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "testtable"); try { Map<byte[], Long> results = table.coprocessorExec( RowCountProtocol.class, null, null, new Batch.Call<RowCountProtocol, Long>() { @Override public Long call(RowCountProtocol counter) throws IOException { return counter.getRowCount(); } }); long total = 0; for (Map.Entry<byte[], Long> entry : results.entrySet()) { total += entry.getValue().longValue(); System.out.println("Region: " + Bytes.toString(entry.getKey()) + ", Count: " + entry.getValue()); } System.out.println("Total Count: " + total); } catch (Throwable throwable) { throwable.printStackTrace(); } } }經過table的coprocessorExec方法調用,而後調用RowCountProtocol接口的getRowCount()方法。
而後遍歷每一個Region返回的結果,合起來就是最終的結果,打印結果以下。
Region: testtable,,1303417572005.51f9e2251c29ccb2...cbcb0c66858f., Count: 2Region: testtable,row3,1303417572005.7f3df4dcba3f...dbc99fce5d87., Count: 3Total Count: 5
在上面的例子當中,咱們是用Batch.Call()方法來調用接口當中的方法,咱們能夠用另一個方法來簡化上述代碼,來看例子。
Batch.Call call =Batch.forMethod(RowCountProtocol.class,"getKeyValueCount"); Map<byte[], Long> results = table.coprocessorExec(RowCountProtocol.class, null, null, call);
採用Batch.Call方法調用同時調用多個方法
Map<byte[], Pair<Long, Long>> results =table.coprocessorExec( RowCountProtocol.class,null, null,new Batch.Call<RowCountProtocol, Pair<Long, Long>>() { public Pair<Long, Long> call(RowCountProtocol counter) throws IOException { return new Pair(counter.getRowCount(),counter.getKeyValueCount()); } });long totalRows = 0;long totalKeyValues = 0;for (Map.Entry<byte[], Pair<Long, Long>> entry :results.entrySet()) { totalRows += entry.getValue().getFirst().longValue(); totalKeyValues +=entry.getValue().getSecond().longValue(); System.out.println("Region: " +Bytes.toString(entry.getKey()) +", Count: " + entry.getValue()); } System.out.println("Total Row Count: " + totalRows); System.out.println("Total KeyValue Count: " +totalKeyValues);
調用coprocessorProxy()在單個region上執行
RowCountProtocol protocol = table.coprocessorProxy(RowCountProtocol.class, Bytes.toBytes("row4"));long rowsInRegion = protocol.getRowCount(); System.out.println("Region Row Count: " +rowsInRegion);上面這個例子是查找row4行所在region的數據條數,這個能夠幫助咱們統計每一個region上面的數據分佈。