在使用 HBase 時,若是你的數據量達到了數十億行或數百萬列,此時可否在查詢中返回大量數據將受制於網絡的帶寬,即使網絡情況容許,可是客戶端的計算處理也未必可以知足要求。在這種狀況下,協處理器(Coprocessors)應運而生。它容許你將業務計算代碼放入在 RegionServer 的協處理器中,將處理好的數據再返回給客戶端,這能夠極大地下降須要傳輸的數據量,從而得到性能上的提高。同時協處理器也容許用戶擴展實現 HBase 目前所不具有的功能,如權限校驗、二級索引、完整性約束等。html
Observer 協處理器相似於關係型數據庫中的觸發器,當發生某些事件的時候這類協處理器會被 Server 端調用。一般能夠用來實現下面功能:java
Get
或 Put
操做以前,您可使用 preGet
或 prePut
方法檢查權限;當前 Observer 協處理器有如下四種類型:node
以上四種類型的 Observer 協處理器均繼承自 Coprocessor
接口,這四個接口中分別定義了全部可用的鉤子方法,以便在對應方法先後執行特定的操做。一般狀況下,咱們並不會直接實現上面接口,而是繼承其 Base 實現類,Base 實現類只是簡單空實現了接口中的方法,這樣咱們在實現自定義的協處理器時,就沒必要實現全部方法,只須要重寫必要方法便可。git
這裏以 RegionObservers
爲例,其接口類中定義了全部可用的鉤子方法,下面截取了部分方法的定義,多數方法都是成對出現的,有 pre
就有 post
:github
prePut()
攔截,該請求繼續送到 region,而後進行處理postPut()
postPut()
攔截該響應,最終結果被返回給客戶端若是你們瞭解 Spring,能夠將這種執行方式類比於其 AOP 的執行原理便可,官方文檔當中也是這樣類比的:redis
If you are familiar with Aspect Oriented Programming (AOP), you can think of a coprocessor as applying advice by intercepting a request and then running some custom code,before passing the request on to its final destination (or even changing the destination).shell
若是您熟悉面向切面編程(AOP),您能夠將協處理器視爲經過攔截請求而後運行一些自定義代碼來使用 Advice,而後將請求傳遞到其最終目標(或者更改目標)。數據庫
Endpoint 協處理器相似於關係型數據庫中的存儲過程。客戶端能夠調用 Endpoint 協處理器在服務端對數據進行處理,而後再返回。apache
以彙集操做爲例,若是沒有協處理器,當用戶須要找出一張表中的最大數據,即 max 聚合操做,就必須進行全表掃描,而後在客戶端上遍歷掃描結果,這必然會加劇了客戶端處理數據的壓力。利用 Coprocessor,用戶能夠將求最大值的代碼部署到 HBase Server 端,HBase 將利用底層 cluster 的多個節點併發執行求最大值的操做。即在每一個 Region 範圍內執行求最大值的代碼,將每一個 Region 的最大值在 Region Server 端計算出來,僅僅將該 max 值返回給客戶端。以後客戶端只須要將每一個 Region 的最大值進行比較而找到其中最大的值便可。編程
要使用咱們本身開發的協處理器,必須經過靜態(使用 HBase 配置)或動態(使用 HBase Shell 或 Java API)加載它。
其加載和卸載方式分別介紹以下。
靜態加載分如下三步:
hbase-site.xml
定義須要加載的協處理器。<property> <name>hbase.coprocessor.region.classes</name> <value>org.myname.hbase.coprocessor.endpoint.SumEndPoint</value> </property>
<name>
標籤的值必須是下面其中之一:
hbase.coprocessor.region.classes
hbase.coprocessor.wal.classes
hbase.coprocessor.master.classes
<value>
必須是協處理器實現類的全限定類名。若是爲加載指定了多個類,則類名必須以逗號分隔。
將 jar(包含代碼和全部依賴項) 放入 HBase 安裝目錄中的 lib
目錄下;
重啓 HBase。
從 hbase-site.xml 中刪除配置的協處理器的<property>元素及其子元素;
從類路徑或 HBase 的 lib 目錄中刪除協處理器的 JAR 文件(可選);
重啓 HBase。
使用動態加載協處理器,不須要從新啓動 HBase。但動態加載的協處理器是基於每一個表加載的,只能用於所指定的表。
此外,在使用動態加載必須使表脫機(disable)以加載協處理器。動態加載一般有兩種方式:Shell 和 Java API 。
如下示例基於兩個前提:
- coprocessor.jar 包含協處理器實現及其全部依賴項。
- JAR 包存放在 HDFS 上的路徑爲:hdfs:// <namenode>:<port> / user / <hadoop-user> /coprocessor.jar
hbase > disable 'tableName'
hbase > alter 'tableName', METHOD => 'table_att', 'Coprocessor'=>'hdfs://<namenode>:<port>/ user/<hadoop-user>/coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823| arg1=1,arg2=2'
Coprocessor
包含由管道(|)字符分隔的四個參數,按順序解釋以下:
容許使用通配符,例如:hdfs://<namenode>:<port>/user/<hadoop-user>/*.jar
來添加指定的 JAR 包;
可使指定目錄,例如:hdfs://<namenode>:<port>/user/<hadoop-user>/
,這會添加目錄中的全部 JAR 包,但不會搜索子目錄中的 JAR 包。
可選參數 :傳遞的協處理器的可選參數。
hbase > enable 'tableName'
hbase > describe 'tableName'
協處理器出如今 TABLE_ATTRIBUTES
屬性中則表明加載成功。
hbase> disable 'tableName'
hbase> alter 'tableName', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
hbase> enable 'tableName'
TableName tableName = TableName.valueOf("users"); String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar"; Configuration conf = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); admin.disableTable(tableName); HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet"); columnFamily1.setMaxVersions(3); hTableDescriptor.addFamily(columnFamily1); HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet"); columnFamily2.setMaxVersions(3); hTableDescriptor.addFamily(columnFamily2); hTableDescriptor.setValue("COPROCESSOR$1", path + "|" + RegionObserverExample.class.getCanonicalName() + "|" + Coprocessor.PRIORITY_USER); admin.modifyTable(tableName, hTableDescriptor); admin.enableTable(tableName);
在 HBase 0.96 及其之後版本中,HTableDescriptor 的 addCoprocessor() 方法提供了一種更爲簡便的加載方法。
TableName tableName = TableName.valueOf("users"); Path path = new Path("hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar"); Configuration conf = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); admin.disableTable(tableName); HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet"); columnFamily1.setMaxVersions(3); hTableDescriptor.addFamily(columnFamily1); HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet"); columnFamily2.setMaxVersions(3); hTableDescriptor.addFamily(columnFamily2); hTableDescriptor.addCoprocessor(RegionObserverExample.class.getCanonicalName(), path, Coprocessor.PRIORITY_USER, null); admin.modifyTable(tableName, hTableDescriptor); admin.enableTable(tableName);
卸載其實就是從新定義表但不設置協處理器。這會刪除全部表上的協處理器。
TableName tableName = TableName.valueOf("users"); String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar"; Configuration conf = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); admin.disableTable(tableName); HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet"); columnFamily1.setMaxVersions(3); hTableDescriptor.addFamily(columnFamily1); HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet"); columnFamily2.setMaxVersions(3); hTableDescriptor.addFamily(columnFamily2); admin.modifyTable(tableName, hTableDescriptor); admin.enableTable(tableName);
這裏給出一個簡單的案例,實現一個相似於 Redis 中 append
命令的協處理器,當咱們對已有列執行 put 操做時候,HBase 默認執行的是 update 操做,這裏咱們修改成執行 append 操做。
# redis append 命令示例 redis> EXISTS mykey (integer) 0 redis> APPEND mykey "Hello" (integer) 5 redis> APPEND mykey " World" (integer) 11 redis> GET mykey "Hello World"
# 建立一張雜誌表 有文章和圖片兩個列族 hbase > create 'magazine','article','picture'
完整代碼可見本倉庫:hbase-observer-coprocessor
新建 Maven 工程,導入下面依賴:
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.0</version> </dependency>
繼承 BaseRegionObserver
實現咱們自定義的 RegionObserver
,對相同的 article:content
執行 put 命令時,將新插入的內容添加到原有內容的末尾,代碼以下:
public class AppendRegionObserver extends BaseRegionObserver { private byte[] columnFamily = Bytes.toBytes("article"); private byte[] qualifier = Bytes.toBytes("content"); @Override public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { if (put.has(columnFamily, qualifier)) { // 遍歷查詢結果,獲取指定列的原值 Result rs = e.getEnvironment().getRegion().get(new Get(put.getRow())); String oldValue = ""; for (Cell cell : rs.rawCells()) if (CellUtil.matchingColumn(cell, columnFamily, qualifier)) { oldValue = Bytes.toString(CellUtil.cloneValue(cell)); } // 獲取指定列新插入的值 List<Cell> cells = put.get(columnFamily, qualifier); String newValue = ""; for (Cell cell : cells) { if (CellUtil.matchingColumn(cell, columnFamily, qualifier)) { newValue = Bytes.toString(CellUtil.cloneValue(cell)); } } // Append 操做 put.addColumn(columnFamily, qualifier, Bytes.toBytes(oldValue + newValue)); } } }
使用 maven 命令進行打包,打包後的文件名爲 hbase-observer-coprocessor-1.0-SNAPSHOT.jar
# mvn clean package
# 上傳項目到HDFS上的hbase目錄 hadoop fs -put /usr/app/hbase-observer-coprocessor-1.0-SNAPSHOT.jar /hbase # 查看上傳是否成功 hadoop fs -ls /hbase
hbase > disable 'magazine'
hbase > alter 'magazine', METHOD => 'table_att', 'Coprocessor'=>'hdfs://hadoop001:8020/hbase/hbase-observer-coprocessor-1.0-SNAPSHOT.jar|com.heibaiying.AppendRegionObserver|1001|'
hbase > enable 'magazine'
hbase > desc 'magazine'
協處理器出如今 TABLE_ATTRIBUTES
屬性中則表明加載成功,以下圖:
插入一組測試數據:
hbase > put 'magazine', 'rowkey1','article:content','Hello' hbase > get 'magazine','rowkey1','article:content' hbase > put 'magazine', 'rowkey1','article:content','World' hbase > get 'magazine','rowkey1','article:content'
能夠看到對於指定列的值已經執行了 append 操做:
插入一組對照數據:
hbase > put 'magazine', 'rowkey1','article:author','zhangsan' hbase > get 'magazine','rowkey1','article:author' hbase > put 'magazine', 'rowkey1','article:author','lisi' hbase > get 'magazine','rowkey1','article:author'
能夠看到對於正常的列仍是執行 update 操做:
hbase > disable 'magazine'
hbase > alter 'magazine', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
hbase > enable 'magazine'
hbase > desc 'magazine'
依次執行下面命令能夠測試卸載是否成功
hbase > get 'magazine','rowkey1','article:content' hbase > put 'magazine', 'rowkey1','article:content','Hello' hbase > get 'magazine','rowkey1','article:content'
更多大數據系列文章能夠參見 GitHub 開源項目: 大數據入門指南