Hbase 學習(三)Coprocessors

Coprocessors

以前咱們的filter都是在客戶端定義,而後傳到服務端去執行的,這個Coprocessors是在服務端定義,在客戶端調用,而後在服務端執行,他有點兒想咱們熟悉的存儲過程,傳一些參數進去,而後進行咱們事先定義好的操做,咱們經常用它來作一些好比二次索引啊,統計函數什麼的,它也和自定義filter同樣,須要事先定好,而後在hbase-env.sh中的HBASE_CLASSPATH中指明,就像個人上一篇中的寫的那樣。java

Coprocessors分兩種,observer和endpoint。數據庫

(1)observer就像觸發器同樣,當某個事件發生的時候,它就出發。ide

已經有一些內置的接口讓咱們去實現,RegionObserver、MasterObserver、WALObserver,看名字就大概知道他們是幹嗎的。函數

(2)endpoint能夠認爲是自定義函數,能夠把這個理解爲關係數據庫的存儲過程。spa

全部的Coprocessor都是實現自Coprocessor 接口,它分SYSTEM和USER,前者的優先級比後者的優先級高,先執行。3d

它有兩個方法,start和stop方法,兩個方法都有一個相同的上下文對象CoprocessorEnvironment。code

void start(CoprocessorEnvironment env) throws IOException; 
void stop(CoprocessorEnvironment env) throws IOException;orm

這是CoprocessorEnvironment的方法。server

image

Working with Tablesxml

對錶進行操做的時候,必須先調用getTable方法活得HTable,不能夠本身定義一個HTable,目前貌似沒有禁止,可是未來會禁止。

而且在對錶操做的時候,不能對行加鎖。

Coprocessor Loading

Coprocessor加載須要在配置文件裏面全局加載,好比在hbase-site.xml中設置。

複製代碼

<property>
    <name>hbase.coprocessor.region.classes</name>
    <value>coprocessor.RegionObserverExample,coprocessor.AnotherCoprocessor</value>
</property>
<property>
    <name>hbase.coprocessor.master.classes</name>
    <value>coprocessor.MasterObserverExample</value>
</property>
<property>
    <name>hbase.coprocessor.wal.classes</name>
    <value>coprocessor.WALObserverExample,bar.foo.MyWALObserver</value>
</property>

複製代碼

咱們自定義的時間能夠註冊到三個配置項上,分別是hbase.coprocessor.region.classes,hbase.coprocessor.master.classes,

hbase.coprocessor.wal.classes上,他們分別負責region,master,wal,註冊到region的要特別注意當心,由於它是針對全部表的。

<property>
    <name>hbase.coprocessor.region.classes</name>
    <value>coprocessor.RegionObserverExample</value></property>

註冊到這三個觸發器上,能夠監控到幾乎全部咱們的操做上面,很是恐怖。。能夠說是想要什麼就有什麼,詳細的代碼你們本身去摸索。

EndPoint的能夠用來定義聚合函數,咱們能夠調用CoprocessorProtocol中的方法來實現咱們的需求。

調用coprocessorProxy() 傳一個單獨的row key,這是在單獨一個region上操做的。

要在全部region上面操做,咱們要調用coprocessorExec()方法 傳一個開始row key 和結束row key。

Demo

說了那麼多廢話,我都很差意思再說了,來個例子吧,統計行數的。

複製代碼

public interface RowCountProtocol extends CoprocessorProtocol {    long getRowCount() throws IOException;    long getRowCount(Filter filter) throws IOException;    long getKeyValueCount() throws IOException;
}

複製代碼

複製代碼

public class RowCountEndpoint extends BaseEndpointCoprocessor implements
        RowCountProtocol {    private long getCount(Filter filter, boolean countKeyValues)
            throws IOException {
        Scan scan = new Scan();
        scan.setMaxVersions(1);        if (filter != null) {
            scan.setFilter(filter);
        }
        RegionCoprocessorEnvironment environment = (RegionCoprocessorEnvironment) getEnvironment();        // use an internal scanner to perform scanning.
        InternalScanner scanner = environment.getRegion().getScanner(scan);        int result = 0;        try {
            List<KeyValue> curVals = new ArrayList<KeyValue>();
            boolean done = false;            do {
                curVals.clear();
                done = scanner.next(curVals);
                result += countKeyValues ? curVals.size() : 1;
            } while (done);
        } finally {
            scanner.close();
        }        return result;
    }

    @Override    public long getRowCount() throws IOException {        return getRowCount(new FirstKeyOnlyFilter());
    }

    @Override    public long getRowCount(Filter filter) throws IOException {        return getCount(filter, false);
    }

    @Override    public long getKeyValueCount() throws IOException {        return getCount(null, true);
    }
}

複製代碼

寫完以後,註冊一下吧。

<property>
    <name>hbase.coprocessor.region.classes</name>
    <value>coprocessor.RowCountEndpoint</value></property>

JAVA 客戶端調用

在服務端定義以後,咱們怎麼在客戶端用java代碼調用呢,看下面的例子你就明白啦!

複製代碼

public class EndPointExample {    public static void main(String[] args) throws IOException {
        Configuration conf = HBaseConfiguration.create();
        HTable table = new HTable(conf, "testtable");        try {
            Map<byte[], Long> results = table.coprocessorExec(
                    RowCountProtocol.class, null, null,                    new Batch.Call<RowCountProtocol, Long>() {
                        @Override                        public Long call(RowCountProtocol counter)                                throws IOException {                            return counter.getRowCount();
                        }
                    });            long total = 0;            for (Map.Entry<byte[], Long> entry : results.entrySet()) {
                total += entry.getValue().longValue();
                System.out.println("Region: " + Bytes.toString(entry.getKey())                        + ", Count: " + entry.getValue());
            }
            System.out.println("Total Count: " + total);
        } catch (Throwable throwable) {
            throwable.printStackTrace();
        }
    }

}

複製代碼

經過table的coprocessorExec方法調用,而後調用RowCountProtocol接口的getRowCount()方法。

而後遍歷每一個Region返回的結果,合起來就是最終的結果,打印結果以下。

複製代碼

Region:
testtable,,1303417572005.51f9e2251c29ccb2...cbcb0c66858f.,
Count: 2Region:
testtable,row3,1303417572005.7f3df4dcba3f...dbc99fce5d87.,
Count: 3Total Count: 5

複製代碼

在上面的例子當中,咱們是用Batch.Call()方法來調用接口當中的方法,咱們能夠用另一個方法來簡化上述代碼,來看例子。

Batch.Call call =Batch.forMethod(RowCountProtocol.class,"getKeyValueCount");
Map<byte[], Long> results = table.coprocessorExec(RowCountProtocol.class, null, null, call);

採用Batch.Call方法調用同時調用多個方法

複製代碼

Map<byte[], Pair<Long, Long>> results =table.coprocessorExec(
RowCountProtocol.class,null, null,new Batch.Call<RowCountProtocol, Pair<Long, Long>>()
{    public Pair<Long, Long> call(RowCountProtocol counter) throws IOException {        return new Pair(counter.getRowCount(),counter.getKeyValueCount());
    }
});long totalRows = 0;long totalKeyValues = 0;for (Map.Entry<byte[], Pair<Long, Long>> entry :results.entrySet()) {
    totalRows +=
    entry.getValue().getFirst().longValue();
    totalKeyValues +=entry.getValue().getSecond().longValue();
    System.out.println("Region: " +Bytes.toString(entry.getKey()) +", Count: " + entry.getValue());
}
System.out.println("Total Row Count: " + totalRows);
System.out.println("Total KeyValue Count: " +totalKeyValues);

複製代碼

調用coprocessorProxy()在單個region上執行

RowCountProtocol protocol = table.coprocessorProxy(RowCountProtocol.class, Bytes.toBytes("row4"));long rowsInRegion = protocol.getRowCount();
System.out.println("Region Row Count: " +rowsInRegion);
上面這個例子是查找row4行所在region的數據條數,這個能夠幫助咱們統計每一個region上面的數據分佈。
相關文章
相關標籤/搜索