Coprocessor簡介java
(1)實現目的
數據庫
(2)靈感來源
靈感來源於Bigtable的協處理器,包含以下特性:
apache
(3)提供接口
json
(4)應用範圍
服務器
Endpoint服務端實現
網絡
在傳統關係型數據庫裏面,能夠隨時的對某列進行求和sum,可是目前HBase目前所提供的接口,直接求和是比較困難的,因此先編寫好服務端代碼,並加載到對應的Table上,加載協處理器有幾種方法,能夠經過HTableDescriptor的addCoprocessor方法直接加載,同理也能夠經過removeCoprocessor方法卸載協處理器。
Endpoint協處理器相似傳統數據庫的存儲過程,客戶端調用Endpoint協處理器執行一段Server端代碼,並將Server端代碼的結果返回給Client進一步處理,最多見的用法就是進行聚合操做。舉個例子說明:若是沒有協處理器,當用戶須要找出一張表中的最大數據即max聚合操做,必須進行全表掃描,客戶端代碼遍歷掃描結果並執行求max操做,這樣的方法沒法利用底層集羣的併發能力,而將全部計算都集中到Client端統一執行, 效率很是低。可是使用Coprocessor,用戶將求max的代碼部署到HBase Server端,HBase將利用底層Cluster的多個節點並行執行求max的操做即在每一個Region範圍內執行求最大值邏輯,將每一個Region的最大值在Region Server端計算出,僅僅將該max值返回給客戶端。客戶端進一步將多個Region的max進一步處理而找到其中的max,這樣總體執行效率提升不少。可是必定要注意的是Coprocessor必定要寫正確,不然致使RegionServer宕機。架構
Protobuf定義併發
如前所述,客戶端和服務端之間須要進行RPC通訊,因此二者間須要肯定接口,當前版本的HBase的協處理器是經過Google Protobuf協議來實現數據交換的,因此須要經過Protobuf來定義接口。
以下所示:框架
option java_package = "com.my.hbase.protobuf.generated"; option java_outer_classname = "AggregateProtos"; option java_generic_services = true; option java_generate_equals_and_hash = true; option optimize_for = SPEED; import "Client.proto"; message AggregateRequest { required string interpreter_class_name = 1; required Scan scan = 2; optional bytes interpreter_specific_bytes = 3; } message AggregateResponse { repeated bytes first_part = 1; optional bytes second_part = 2; } service AggregateService { rpc GetMax (AggregateRequest) returns (AggregateResponse); rpc GetMin (AggregateRequest) returns (AggregateResponse); rpc GetSum (AggregateRequest) returns (AggregateResponse); rpc GetRowNum (AggregateRequest) returns (AggregateResponse); rpc GetAvg (AggregateRequest) returns (AggregateResponse); rpc GetStd (AggregateRequest) returns (AggregateResponse); rpc GetMedian (AggregateRequest) returns (AggregateResponse); }
能夠看到這裏定義7個聚合服務RPC,名字分別叫作GetMax、GetMin、GetSum等,本文經過GetSum進行舉例,其餘的聚合RPC也是相似的內部實現。RPC有一個入口參數,用消息AggregateRequest表示;RPC的返回值用消息AggregateResponse表示。Service是一個抽象概念,RPC的Server端能夠看做一個用來提供服務的Service。在HBase Coprocessor中Service就是Server端須要提供的Endpoint Coprocessor服務,主要用來給HBase的Client提供服務。AggregateService.java是由Protobuf軟件經過終端命令「protoc filename.proto--java_out=OUT_DIR」自動生成的,其做用是將.proto文件定義的消息結構以及服務轉換成對應接口的RPC實現,其中包括如何構建request消息和response響應以及消息包含的內容的處理方式,而且將AggregateService包裝成一個抽象類,具體的服務以類的方法的形式提供。AggregateService.java定義Client端與Server端通訊的協議,代碼中包含請求信息結構AggregateRequest、響應信息結構AggregateResponse、提供的服務種類AggregateService,其中AggregateRequest中的interpreter_class_name指的是column interpreter的類名,此類的做用在於將數據格式從存儲類型解析成所需類型。ide
服務端的架構
首先,Endpoint Coprocessor是一個Protobuf Service的實現,所以須要它必須繼承某個ProtobufService。咱們在前面已經經過proto文件定義Service,命名爲AggregateService,所以Server端代碼須要重載該類,其次做爲HBase的協處理器,Endpoint 還必須實現HBase定義的協處理器協議,用Java的接口來定義。具體來講就是CoprocessorService和Coprocessor,這些HBase接口負責將協處理器和HBase 的RegionServer等實例聯繫起來以便協同工做。Coprocessor接口定義兩個接口函數:start和stop。
加載Coprocessor以後Region打開的時候被RegionServer自動加載,並會調用器start 接口完成初始化工做。通常狀況該接口函數僅僅須要將協處理器的運行上下文環境變量CoprocessorEnvironment保存到本地便可。
CoprocessorEnvironment保存協處理器的運行環境,每一個協處理器都是在一個RegionServer進程內運行並隸屬於某個Region。經過該變量獲取Region的實例等 HBase運行時環境對象。
Coprocessor接口還定義stop()接口函數,該函數在Region被關閉時調用,用來進行協處理器的清理工做。本文裏咱們沒有進行任何清理工做,所以該函數什麼也不幹。
咱們的協處理器還須要實現CoprocessorService接口。該接口僅僅定義一個接口函數 getService()。咱們僅須要將本實例返回便可。HBase的Region Server在接收到客戶端的調用請求時,將調用該接口獲取實現RPCService的實例,所以本函數通常狀況下就是返回自身實例便可。
完成以上三個接口函數以後,Endpoint的框架代碼就已完成。每一個Endpoint協處理器都必須實現這些框架代碼並且寫法雷同。
Server端的代碼就是一個Protobuf RPC的Service實現,即經過Protobuf提供的某種服務。其開發內容主要包括:
Endpoint 協處理的基本框架
Endpoint 是一個Server端Service的具體實現,其實現有一些框架代碼,這些框架代碼與具體的業務需求邏輯無關。僅僅是爲了和HBase運行時環境協同工做而必須遵循和完成的一些粘合代碼。所以多數狀況下僅僅須要從一個例子程序拷貝過來並進行命名修改便可。不過咱們仍是完整地對這些粘合代碼進行粗略的講解以便更好地理解代碼。
public Service getService() { return this; } public void start(CoprocessorEnvironment env) throws IOException { if(env instanceof RegionCoprocessorEnvironment) { this.env = (RegionCoprocessorEnvironment)env; } else { throw new CoprocessorException("Must be loaded on a table region!"); } } public void stop(CoprocessorEnvironment env) throws IOException { }
Endpoint協處理器真正的業務代碼都在每個RPC函數的具體實現中。
在本文中,咱們的Endpoint協處理器僅提供一個RPC函數即getSUM。我將分別介紹編寫該函數的幾個主要工做:瞭解函數的定義,參數列表;處理入口參數;實現業務邏輯;設置返回參數。
public void getSum(RpcController controller, AggregateRequest request, RpcCallbackdone) { AggregateResponse response = null; RegionScanner scanner = null; long sum = 0L; try { ColumnInterpreter ignored = this.constructColumnInterpreterFromRequest(request); Object sumVal = null; Scan scan = ProtobufUtil.toScan(request.getScan()); scanner = this.env.getRegion().getScanner(scan); byte[] colFamily = scan.getFamilies()[0]; NavigableSet qualifiers = (NavigableSet) scan.getFamilyMap().get(colFamily); byte[] qualifier = null; if (qualifiers != null && !qualifiers.isEmpty()) { qualifier = (byte[]) qualifiers.pollFirst(); } ArrayList results = new ArrayList(); boolean hasMoreRows = false; do { hasMoreRows = scanner.next(results); int listSize = results.size(); for (int i = 0; i < listSize; ++i) { //取出列值 Object temp = ignored.getValue(colFamily, qualifier, (Cell) results.get(i)); if (temp != null) { sumVal = ignored.add(sumVal, ignored.castToReturnType(temp)); } } results.clear(); } while (hasMoreRows); if (sumVal != null) { response = AggregateResponse.newBuilder().addFirstPart( ignored.getProtoForPromotedType(sumVal).toByteString()).build(); } } catch (IOException var27) { ResponseConverter.setControllerException(controller, var27); } finally { if (scanner != null) { try { scanner.close(); } catch (IOException var26) { ; } } } log.debug("Sum from this region is " + this.env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + sum); done.run(response); }
Endpoint類比於數據庫的存儲過程,其觸發服務端的基於Region的同步運行再將各個結果在客戶端蒐集後歸併計算。特色相似於傳統的MapReduce框架,服務端Map客戶端Reduce。
Endpoint客戶端實現
HBase提供客戶端Java包org.apache.hadoop.hbase.client.HTable,提供如下三種方法來調用協處理器提供的服務:
該方法採用rowkey指定Region。這是由於HBase客戶端不多會直接操做Region,通常不須要知道Region的名字;何況在HBase中Region名會隨時改變,因此用rowkey來指定Region是最合理的方式。使用rowkey能夠指定惟一的一個Region,若是給定的Rowkey並不存在,只要在某個Region的rowkey範圍內依然用來指定該Region。好比Region 1處理[row1, row100]這個區間內的數據,則rowkey=row1就由Region 1來負責處理,換句話說咱們能夠用row1來指定Region 1,不管rowkey等於」row1」的記錄是否存在。CoprocessorService方法返回類型爲CoprocessorRpcChannel的對象,該 RPC通道鏈接到由rowkey指定的Region上面,經過此通道能夠調用該Region上面部署的協處理器RPC。
有時候客戶端須要調用多個 Region上的同一個協處理器,好比須要統計整個Table的sum,在這種狀況下,須要全部的Region都參與進來,分別統計自身Region內部的sum並返回客戶端,最終客戶端將全部Region的返回結果彙總,就能夠獲得整張表的sum。
這意味着該客戶端同時和多個Region進行批處理交互。一個可行的方法是,收集每一個 Region的startkey,而後循環調用第一種coprocessorService方法:用每個Region的startkey 做爲入口參數,得到RPC通道建立stub對象,進而逐一調用每一個Region上的協處理器RPC。這種作法須要寫不少的代碼,爲此HBase提供兩種更加簡單的 coprocessorService方法來處理多個Region的協處理器調用。先來看第一種方法 coprocessorService(Class, byte[],byte[],Batch.Call)
該方法有 4 個入口參數。第一個參數是實現RPC的Service 類,即前文中的AggregateService類。經過它,HBase就能夠找到相應的部署在Region上的協處理器,一個Region上能夠部署多個協處理器,客戶端必須經過指定Service類來區分究竟須要調用哪一個協處理器提供的服務。
要調用哪些Region上的服務則由startkey和endkey來肯定,經過 rowkey範圍便可肯定多個 Region。爲此,coprocessorService方法的第二個和第三個參數分別是 startkey和endkey,凡是落在[startkey,endkey]區間內的Region都會參與本次調用。
第四個參數是接口類Batch.Call。它定義瞭如何調用協處理器,用戶經過重載該接口的call()方法來實現客戶端的邏輯。在call()方法內,能夠調用RPC,並對返回值進行任意處理。即前文代碼清單1中所作的事情。coprocessorService將負責對每一個 Region調用這個call()方法。
coprocessorService方法的返回值是一個Map類型的集合。該集合的key是Region名字,value是Batch.Call.call方法的返回值。該集合能夠看做是全部Region的協處理器 RPC 返回的結果集。客戶端代碼能夠遍歷該集合對全部的結果進行彙總處理。
這種coprocessorService方法的大致工做流程以下。首先它分析startkey和 endkey,找到該區間內的全部Region,假設存放在regionList中。而後,遍歷regionList,爲每個Region調用Batch.Call,在該接口內,用戶定義具體的RPC調用邏輯。最後coprocessorService將全部Batch.Call.call()的返回值加入結果集合並返回。
coprocessorService的第三種方法比第二個方法多了一個參數callback。coprocessorService第二個方法內部使用HBase自帶的缺省callback,該缺省 callback將每一個Region的返回結果都添加到一個Map類型的結果集中,並將該集合做爲coprocessorService方法的返回值。
HBase 提供第三種coprocessorService方法容許用戶定義callback行爲,coprocessorService 會爲每個RPC返回結果調用該callback,用戶能夠在callback 中執行須要的邏輯,好比執行sum累加。用第二種方法的狀況下,每一個Region協處理器RPC的返回結果先放入一個列表,全部的 Region 都返回後,用戶代碼再從該列表中取出每個結果進行累加;用第三種方法,直接在callback中進行累加,省掉了建立結果集合和遍歷該集合的開銷,效率會更高一些。
所以咱們只須要額外定義一個callback便可,callback是一個Batch.Callback接口類,用戶須要重載其update方法。
public S sum(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,final Scan scan)throws Throwable { final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); class SumCallBack implements Batch.Callback { S sumVal = null; public S getSumResult() { return sumVal; } @Override public synchronized void update(byte[] region, byte[] row, S result) { sumVal = ci.add(sumVal, result); }} SumCallBack sumCallBack = new SumCallBack(); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateService, S>() { @Override public S call(AggregateService instance) throws IOException { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback<AggregateResponse> rpcCallback = new BlockingRpcCallback<AggregateResponse>(); //RPC 調用 instance.getSum(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); if (controller.failedOnException()) { throw controller.getFailedOn(); } if (response.getFirstPartCount() == 0) { return null; } ByteString b = response.getFirstPart(0); T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b); S s = ci.getPromotedValueFromProto(t); return s; } }, sumCallBack); return sumCallBack.getSumResult();
Observer實現二級索引
Observer相似於傳統數據庫中的觸發器,當發生某些事件的時候這類協處理器會被 Server 端調用。Observer Coprocessor是一些散佈在HBase Server端代碼的 hook鉤子, 在固定的事件發生時被調用。好比:put操做以前有鉤子函數prePut,該函數在pu 操做執 行前會被Region Server調用;在put操做以後則有postPut 鉤子函數。
RegionObserver工做原理
RegionObserver提供客戶端的數據操縱事件鉤子,Get、Put、Delete、Scan,使用此功能可以解決主表以及多個索引表之間數據一致性的問題
如上圖所示,HBase能夠根據rowkey很快的檢索到數據,可是若是根據column檢索數據,首先要根據rowkey減少範圍,再經過列過濾器去過濾出數據,若是使用二級索引,能夠先查基於column的索引表,獲取到rowkey後再快速的檢索到數據。
如圖所示首先繼承BaseRegionObserver類,重寫postPut,postDelete方法,在postPut方法體內中寫Put索引表數據的代碼,在postDelete方法裏面寫Delete索引表數據,這樣能夠保持數據的一致性。
在Scan表的時候首先判斷是否先查索引表,若是不查索引直接scan主表,若是走索引表經過索引表獲取主表的rowkey再去查主表。
使用Elastic Search創建二級索引也是同樣。
咱們在同一個主機集羣上同時創建了HBase集羣和Elastic Search集羣,存儲到HBase的數據必須實時地同步到Elastic Search。而剛好HBase和Elastic Search都沒有更新的概念,咱們的需求能夠簡化爲兩步:
協處理的主要應用場景