原文檔地址:http://hbase.apache.org/1.2/book.html#cphtml
HBase Coprocessors協處理器是在Google BigTable的協處理器實現以後才建模的 (http://research.google.com/people/jeff/SOCC2010-keynote-slides.pdf pages 41-42.)。java
協處理器框架提供了在管理你數據的RegionServer上直接運行定製代碼的機制 目前正在努力消除HBase的實現和BigTable的架構之間的差距。獲取更多信息,參考 HBASE-4047。node
本章中的信息主要來源是如下資源,並從如下資源中使用了大量重用:shell
Mingjie Lai’s blog post Coprocessor Introduction.apache
Gaurav Bhardwaj’s blog post The How To Of HBase Coprocessors.編程
Use Coprocessors At Your Own Riskapi 協處理器是HBase的一個高級特性,而且它只是由系統開發者使用的。由於協處理器代碼直接運行在RegionServer上,而且能夠直接訪問你的數據,因此它們引入了數據損壞的風險、中間人攻擊或其餘的數據惡意訪問。當前,沒有機制來防止協處理器形成的數據損壞,雖然這個工做在進行中 HBASE-4047。安全 此外,沒有資源隔離,所以一個善意但行爲不當的協處理器會嚴重下降集羣的性能和穩定性。網絡 |
在HBase中,你使用Get或者Scan fetch數據,而在RDBMS中你使用SQL查詢。爲了只fetch相關的數據,你使用HBase Filter過濾數據,而在RDMBS中你使用 WHERE 謂詞。架構
在fetching數據以後,你在數據上執行運算。這種模式適用於幾千行和幾列的「小數據」。可是,當您擴展到數十億行和數百萬列時,在您的網絡中移動大量的數據將會在網絡層形成瓶頸,而且客戶端須要足夠強大,而且有足夠的內存來處理大量的數據和計算。此外,客戶端代碼能夠變得更大、更復雜。
在這樣的場景中,協處理器多是有意義的。您能夠將業務計算代碼放入一個在RegionServer上運行的coprocessor,在與數據相同的位置,並將結果返回給客戶端。
這是使用協處理器能帶來好處的惟一情景。下面是一些類比,能夠幫助解釋協處理器的一些好處。
觸發器和存儲過程 Triggers and Stored Procedure
一個觀察者協處理器相似於一個RDBMS中的觸發器,它在一個特定的事件(例如Get或Put)發生以前或以後來執行你的代碼。一個終端協處理器相似於RDBMS中的一個存儲過程,由於它容許你在當前RegionServer存有的數據上執行自定義運算,而不是在客戶端執行運算。
MapReduce
MapReduce以把計算移動到數據所在的位置的原則進行運行。協處理器以相同的原則運行。
面向切片的編程設計 AOP
若是你熟悉 面向切片的編程設計 Aspect Oriented Programming (AOP), 你能夠認爲協處理器能夠您能夠在把請求傳遞到最終目的地以前(或者甚至改變目的地),經過截取請求,而後運行一些自定義代碼,而後將請求傳遞到最終目的地。
你的類應該繼承自Coprocessor classes的一種, 如 BaseRegionObserver, 或者你的類要實現 Coprocessor 或者CoprocessorService 接口。
加載這個coprocessor, 能夠靜態加載(經過配置文件)或動態加載(使用HBase Shell命令)。 更多內容參考 Loading Coprocessors.
從你的客戶端代碼中調用coprocessor。HBase對協處理器的處理是透明的。
框架API 在 coprocessor 包裏。
Observer coprocessors 在一個特定的事件發生以前或以後被觸發。在一個事件以前發生的觀察者使用的方法以一個 pre
前綴開頭, 例如 prePut
。在一個事件以後發生的觀察者覆蓋的方法以一個 post
前綴開頭, 例如 postPut
.
安全 Security
在執行一個Get
或 Put
操做以前,你可使用 preGet
或 prePut
方法檢查權限。
參照完整性 Referential Integrity
HBase不直接支持參照完整性(外鍵)這一RDBMS中的概念。你可使用一個協處理器以強制這樣的完整性。例如,若是你有一個業務規則是每一次向users表裏插入的數據必須參照user_daily_attendance表中相應的條目,你能夠實現一個協處理器,使用在user表上的prePut方法向user_daily_attendance表插入一條記錄。
二級索引 Secondary Indexes
你可使用一個協處理器維護多個二級索引。獲取更多信息,參考 SecondaryIndexing。
RegionObserver
一個RegionObserver coprocessor 容許你觀察一個Region上的事件,如 Get
和 Put
操做。參考 RegionObserver。能夠考慮覆蓋類 BaseRegionObserver, 這個類實現了 RegionObserver
接口。
RegionServerObserver
一個 RegionServerObserver 容許你觀察與RegionServer的操做相關的事件, 如 啓動、中止 或 執行合併、提交或者回滾。參考RegionServerObserver。考慮覆蓋類 BaseMasterAndRegionObserver,這個類實現了 MasterObserver
和 RegionServerObserver
接口。
MasterOvserver
一個 MasterObserver 容許你觀察和HBase Master相關的事件, 如表建立、表刪除或者表結構修改。參考 MasterObserver。考慮覆蓋類 BaseMasterAndRegionObserver, 這個類實現了 MasterObserver
和 RegionServerObserver
接口。
WalObserver
一個WalObserver 容許你觀察和向 Write-Ahead Log (WAL)寫入相關的事件。參考 WALObserver。考慮覆蓋類 BaseWALObserver, 這個類實現了 WalObserver
接口。
Examples provides working examples of observer coprocessors.
終端協處理器容許你在數據存放的位置執行運算。參考 Coprocessor Analogy。例如,須要計算一個橫跨數百個Region的完整表的運行平均值或總和。
與自定義代碼透明地執行的觀察者協處理器相比,終端協處理器必須用Table, HTableInterface, 或 HTable 的CoprocessorService() 方法顯示調用。
從HBase 0.96開始,終端協處理器用Google Protocol Buffers (protobuf)實現。獲取更多關於 protobuf的細節,參考Google’s Protocol Buffer Guide. HBase 0.94中編寫的Endpoints Coprocessor 和 0.96 及之後的版本並不兼容。參考 HBASE-5448。要把你的HBase集羣從0.94或更早的版本升級到0.96及之後的版本,你須要從新實現你的協處理器。
Examples provides working examples of endpoint coprocessors.
要使你的協處理器在HBase可用,它必須被加載loaded,可使靜態加載 (經過配置HBase) 或者 動態加載 (使用 HBase Shell 或 Java API).
下列步驟用來靜態加載你的協處理器。請記住,必須從新啓動HBase以卸載一個已被靜態加載的coprocessor。
在 hbase-site.xml 定義一個協處理器,使用 <property> 元素和 <name> <value> 子元素。<name> 應該是下列之一:
hbase.coprocessor.region.classes
for RegionObservers and Endpoints.
hbase.coprocessor.wal.classes
for WALObservers.
hbase.coprocessor.master.classes
for MasterObservers.
<value> 必須包含你的協處理器實現類的完整類名。
例如,要加載一個 Coprocessor (實現類是 SumEndPoint.java) 你必須在 RegionServer’s 'hbase-site.xml' 文件中 (通常在 'conf' 目錄下)建立如下條目:
<property> <name>hbase.coprocessor.region.classes</name> <value>org.myname.hbase.coprocessor.endpoint.SumEndPoint</value> </property>
若是多個類被指定要加載,類名間要用逗號分隔。框架會試圖使用默認類加載器加載全部配置的類。所以,這個jar文件必須位於server端的HBase classpath中。
經過這種方式加載的Coprocessors將對全部表的全部Region都是激活狀態的。這些也被稱做 系統Coprocessor。首先列出的Coprocessors 將被賦予 Coprocessor.Priority.SYSTEM這樣的優先級。列表中每個後續的
coprocessor 將會把它的優先級加一(這是在減少它的優先級,由於優先級是按照整數的天然順序排列的)。
當調用註冊的觀察者時,框架會按照它們的優先級順序執行它們的回調方法。
關係被打破。
把你的代碼放在HBase的 classpath。作這件事的一個簡單的方法是把 jar (包含你的代碼和全部依賴)放到HBase安裝路徑的 lib/
目錄下。
重啓 HBase。
在hbase-site.xml中刪除協處理器的 <property> 元素,包括子元素。
重啓 HBase.
可選的操做是,從HBase的 lib/ 目錄下或者從classpath中移除這個協處理器的 JAR 文件。
你能夠在不重啓HBase的狀況下,動態加載協處理器。這看起來比靜態加載更好,可是動態加載協處理器的是被加載到一張表上,而且只在加載它們的表上可用。由此,動態加載的協處理器有時被稱爲表協處理器Table Coprocessor。
另外,動態加載一個協處理器至關於改變表的結構,表必須下線以加載協處理器。
有三種動態加載 Coprocessor 的方法。
假定 下面的說明有如下的假設:
|
在HBase Shell中禁用表:
hbase> disable 'users'
使用以下命令加載這個 Coprocessor:
hbase> alter 'users', METHOD => 'table_att', 'Coprocessor'=>'hdfs://<namenode>:<port>/ user/<hadoop-user>/coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823| arg1=1,arg2=2'
協處理器框架將試着從協處理器表的屬性值中讀取類的信息。這個值包含由管道符號(|)分割成的4片信息。
文件路徑: 包含 Coprocessor 定義的這個jar文件必須在全部的RegionServer能夠讀取到的位置上。
你能夠拷貝這個文件到每個RegionServer的本地磁盤上,可是推薦把它存放在HDFS上。
類名: Coprocessor的完整類名.
優先級: 一個整數。該框架將肯定在同一個鉤子上使用優先級註冊的全部配置的觀察器的執行順序,並使用優先級。這片信息能夠留空不填,這樣的話,框架將會賦予一個默認的優先級值。
參數 (可選): 這片信息會傳入協處理器的實現類中。這片信息是可選的。
啓用這個表.
hbase(main):003:0> enable 'users'
驗證協處理器是否被加載:
hbase(main):04:0> describe 'users'
協處理器應該在TABLE_ATTRIBUTES
列出。
如下Java代碼展現瞭如何使用 HTableDescriptor的setValue()
方法在users表上加載一個協處理器。
TableName tableName = TableName.valueOf("users"); String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar"; Configuration conf = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); admin.disableTable(tableName); HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet"); columnFamily1.setMaxVersions(3); hTableDescriptor.addFamily(columnFamily1); HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet"); columnFamily2.setMaxVersions(3); hTableDescriptor.addFamily(columnFamily2); hTableDescriptor.setValue("COPROCESSOR$1", path + "|" + RegionObserverExample.class.getCanonicalName() + "|" + Coprocessor.PRIORITY_USER); admin.modifyTable(tableName, hTableDescriptor); admin.enableTable(tableName);
在 HBase 0.96及其後的版本, HTableDescriptor的addCoprocessor()
方法提供了一個簡單的方式去動態加載一個coprocessor 。
TableName tableName = TableName.valueOf("users"); String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar"; Configuration conf = HBaseConfiguration.create(); HBaseAdmin admin = new HBaseAdmin(conf); admin.disableTable(tableName); HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet"); columnFamily1.setMaxVersions(3); hTableDescriptor.addFamily(columnFamily1); HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet"); columnFamily2.setMaxVersions(3); hTableDescriptor.addFamily(columnFamily2); hTableDescriptor.addCoprocessor(RegionObserverExample.class.getCanonicalName(), path, Coprocessor.PRIORITY_USER, null); admin.modifyTable(tableName, hTableDescriptor); admin.enableTable(tableName);
框架不保證將給定的Coprocessor成功加載。例如,shell命令不保證一個jar文件存在於某個位置,也不驗證給定的類是否實際存在於這個Jar文件中。 |
禁用表
hbase> disable 'users'
修改表以移除coprocessor.
hbase> alter 'users', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
啓用表
hbase> enable 'users'
經過使用 setValue()
或 addCoprocessor()
方法在不設置coprocessor的狀況下從新加載表定義。這將移除任意的附在這個表上的coprocessor。
TableName tableName = TableName.valueOf("users"); String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar"; Configuration conf = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); admin.disableTable(tableName); HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet"); columnFamily1.setMaxVersions(3); hTableDescriptor.addFamily(columnFamily1); HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet"); columnFamily2.setMaxVersions(3); hTableDescriptor.addFamily(columnFamily2); admin.modifyTable(tableName, hTableDescriptor); admin.enableTable(tableName);
在 HBase 0.96 及之後的版本中,你可使用 HTableDescriptor類的removeCoprocessor()
方法。
HBase 帶有Observer Coprocessor的例子,在 ZooKeeperScanPolicyObserver 中,也帶有Endpoint Coprocessor的例子,在RowCountEndpoint
一個更詳細的例子以下。
這些例子假定一個表名爲 users
, 有兩個列族 personalDet
和 salaryDet
, 包含我的和收入的詳細信息。下面是 users
表的圖形化展現。
personalDet | salaryDet | |||||
---|---|---|---|---|---|---|
jverne |
Jules |
Verne |
02/08/1828 |
12000 |
9000 |
3000 |
rowkey |
name |
lastname |
dob |
gross |
net |
allowances |
admin |
Admin |
Admin |
||||
cdickens |
Charles |
Dickens |
02/07/1812 |
10000 |
8000 |
2000 |
下面的 Observer coprocessor 防止 user admin
的詳細信息在對 users
表的Get和Scan操做中返回。
寫一個繼承 BaseRegionObserver 的類。
覆蓋preGetOp()
方法 (preGet()
方法已被棄用了) 用以檢查客戶端是否已查詢了rowkey爲 admin的內容。
若是是,返回一個空的結果。不然,正常處理這個請求。
把你的代碼和依賴放到一個JAR 文件中。
把這個JAR放到 HBase能夠找到它的HDFS中。
加載這個 Coprocessor.
寫一個簡單的程序測試它。
如下是上面步驟的實現:
public class RegionObserverExample extends BaseRegionObserver { private static final byte[] ADMIN = Bytes.toBytes("admin"); private static final byte[] COLUMN_FAMILY = Bytes.toBytes("details"); private static final byte[] COLUMN = Bytes.toBytes("Admin_det"); private static final byte[] VALUE = Bytes.toBytes("You can't see Admin details"); @Override public void preGetOp(final ObserverContext e, final Get get, final List results) throws IOException { if (Bytes.equals(get.getRow(),ADMIN)) { Cell c = CellUtil.createCell(get.getRow(),COLUMN _FAMILY, COLUMN, System.currentTimeMillis(), (byte)4, VALUE); results.add(c); e.bypass(); } List kvs = new ArrayList(results.size()); for (Cell c : results) { kvs.add(KeyValueUtil.ensureKeyValue(c)); } preGet(e, get, kvs); results.clear(); results.addAll(kvs); } }
覆蓋preGetOp()
將只針對Get操做有效。你還須要覆蓋 preScannerOpen()
方法從scan的結果中過濾 admin
行。
@Override public RegionScanner preScannerOpen(final ObserverContext e, final Scan scan, final RegionScanner s) throws IOException { Filter filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(ADMIN)); scan.setFilter(filter); return s; }
這個方法會有反作用。若是客戶端在它的scan中使用了一個過濾器,則客戶端的過濾器會被 preScannerOpen()方法中的過濾器替代。這種狀況下,覆蓋preScannerOpen()的替代方法是,顯示地從掃描結果中移除任何 admin
的結果:
@Override public boolean postScannerNext(final ObserverContext e, final InternalScanner s, final List results, final int limit, final boolean hasMore) throws IOException { Result result = null; Iterator iterator = results.iterator(); while (iterator.hasNext()) { result = iterator.next(); if (Bytes.equals(result.getRow(), ROWKEY)) { iterator.remove(); break; } } return hasMore; }
仍然使用 users
表, 這個例子實現了一個協處理器去計算全部員工收入的總和,使用了一個Endpoint Coprocessor.
建立一個 '.proto' 文件定義你的服務。
option java_package = "org.myname.hbase.coprocessor.autogenerated"; option java_outer_classname = "Sum"; option java_generic_services = true; option java_generate_equals_and_hash = true; option optimize_for = SPEED; message SumRequest { required string family = 1; required string column = 2; } message SumResponse { required int64 sum = 1 [default = 0]; } service SumService { rpc getSum(SumRequest) returns (SumResponse); }
執行 protoc
命令從以上的.proto中生成Java代碼。
$ mkdir src $ protoc --java_out=src ./sum.proto
這將生成一個java類 Sum.java
.
寫一個類去繼承這個生成的service類,實現 Coprocessor
和 CoprocessorService
類, 而且覆蓋service()方法。
若是你從 hbase-site.xml 加載一個協處理器,而後使用HBase Shell加載一個同名的協處理器,它會被第二次加載。相同的類會存在兩次,第二個實例將會有一個更高的ID (所以優先級更低). 其結果是,複製的coprocessor實際上被忽略了。 |
public class SumEndPoint extends SumService implements Coprocessor, CoprocessorService { private RegionCoprocessorEnvironment env; @Override public Service getService() { return this; } @Override public void start(CoprocessorEnvironment env) throws IOException { if (env instanceof RegionCoprocessorEnvironment) { this.env = (RegionCoprocessorEnvironment)env; } else { throw new CoprocessorException("Must be loaded on a table region!"); } } @Override public void stop(CoprocessorEnvironment env) throws IOException { // do mothing } @Override public void getSum(RpcController controller, SumRequest request, RpcCallback done) { Scan scan = new Scan(); scan.addFamily(Bytes.toBytes(request.getFamily())); scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn())); SumResponse response = null; InternalScanner scanner = null; try { scanner = env.getRegion().getScanner(scan); List results = new ArrayList(); boolean hasMore = false; long sum = 0L; do { hasMore = scanner.next(results); for (Cell cell : results) { sum = sum + Bytes.toLong(CellUtil.cloneValue(cell)); } results.clear(); } while (hasMore); response = SumResponse.newBuilder().setSum(sum).build(); } catch (IOException ioe) { ResponseConverter.setControllerException(controller, ioe); } finally { if (scanner != null) { try { scanner.close(); } catch (IOException ignored) {} } } done.run(response); } } Configuration conf = HBaseConfiguration.create(); // Use below code for HBase version 1.x.x or above. Connection connection = ConnectionFactory.createConnection(conf); TableName tableName = TableName.valueOf("users"); Table table = connection.getTable(tableName); //Use below code HBase version 0.98.xx or below. //HConnection connection = HConnectionManager.createConnection(conf); //HTableInterface table = connection.getTable("users"); final SumRequest request = SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross") .build(); try { Map<byte[], Long> results = table.CoprocessorService (SumService.class, null, null, new Batch.Call<SumService, Long>() { @Override public Long call(SumService aggregate) throws IOException { BlockingRpcCallback rpcCallback = new BlockingRpcCallback(); aggregate.getSum(null, request, rpcCallback); SumResponse response = rpcCallback.get(); return response.hasSum() ? response.getSum() : 0L; } }); for (Long sum : results.values()) { System.out.println("Sum = " + sum); } } catch (ServiceException e) { e.printStackTrace(); } catch (Throwable e) { e.printStackTrace(); }
加載這個 Coprocessor.
寫一個客戶端代碼去調用這個 Coprocessor.
捆綁協處理器 Bundling Coprocessors
你能夠將一個coprocessor的全部類捆綁到RegionServer的classpath中的單個JAR中,以便進行簡單的部署。不然,將全部依賴項放在RegionServer的classpath中,以便在RegionServer啓動時加載它們。一個RegionServer的classpath在RegionServer的 hbase-env.sh
文件中被設置。
自動部署 Automating Deployment
你可使用工具如 Puppet, Chef, 或 Ansible 把協處理器的JAR傳到你的RegionServer的文件系統中所需的位置,重啓每個RegionServer,以使協處理器自動部署。此類設置的細節超出了本文的範圍。
更新一個協處理器 Updating a Coprocessor
部署一個給定的coprocessor的新版本並不像禁用它、替換JAR並從新啓用協處理器那樣簡單。這是由於你不能在JVM中從新加載一個類,除非你刪除了全部的對它當前的引用。既然當前JVM引用了這個已經存在的協處理器,你必須經過重啓RegionServer來重啓這個JVM,以替代這個協處理器。這種行爲是不可能改變的。
協處理器日誌 Coprocessor Logging
Coprocessor框架不提供超出標準Java日誌記錄的API。
協處理器配置 Coprocessor Configuration
若是你不想從HBase Shell里加載 coprocessors,你能夠把它們的配置屬性加到 hbase-site.xml中
。在 Using HBase Shell中,兩個參數被設置爲: arg1=1,arg2=2
. 這些參數按照如下方式添加到 hbase-site.xml
中:
<property> <name>arg1</name> <value>1</value> </property> <property> <name>arg2</name> <value>2</value> </property>
而後你可使用以下代碼讀取這個配置信息:
Configuration conf = HBaseConfiguration.create(); // Use below code for HBase version 1.x.x or above. Connection connection = ConnectionFactory.createConnection(conf); TableName tableName = TableName.valueOf("users"); Table table = connection.getTable(tableName); //Use below code HBase version 0.98.xx or below. //HConnection connection = HConnectionManager.createConnection(conf); //HTableInterface table = connection.getTable("users"); Get get = new Get(Bytes.toBytes("admin")); Result result = table.get(get); for (Cell c : result.rawCells()) { System.out.println(Bytes.toString(CellUtil.cloneRow(c)) + "==> " + Bytes.toString(CellUtil.cloneFamily(c)) + "{" + Bytes.toString(CellUtil.cloneQualifier(c)) + ":" + Bytes.toLong(CellUtil.cloneValue(c)) + "}"); } Scan scan = new Scan(); ResultScanner scanner = table.getScanner(scan); for (Result res : scanner) { for (Cell c : res.rawCells()) { System.out.println(Bytes.toString(CellUtil.cloneRow(c)) + " ==> " + Bytes.toString(CellUtil.cloneFamily(c)) + " {" + Bytes.toString(CellUtil.cloneQualifier(c)) + ":" + Bytes.toLong(CellUtil.cloneValue(c)) + "}"); } }
HBase 0.98.5 能夠監控一些和執行一個給定的Coprocessor所花費時間量有關的統計數據。你能夠在HBase Metrics框架(參考HBase Metrics)或者 Web UI中給定的RegionServer的Coprocessor Metrics tab中看到這些統計數據。這些統計數據對於在集羣中對給定的Coprocessor的性能影響進行調試和基準測試是頗有價值的。跟蹤的統計數據包括min、max、average(平均)、第90、第9五、第99百分位。全部的時間以毫秒顯示。統計數據是根據 在報告間隔期間記錄的Coprocessor執行樣本 進行計算的,這個報告間隔期間默認狀況下是10秒。度量採樣率在HBase Metrics中描述。
Figure 4. Coprocessor Metrics UI