HBase協處理器同步二級索引到Solr


1、 背景

在實際生產中,HBase每每不能知足多維度分析,咱們能想到的辦法就是經過建立HBase數據的二級索引來快速獲取rowkey,從而獲得想要的數據。目前比較流行的二級索引解決方案有Lily HBase Indexer,Phoenix自帶的二級索引,華爲Indexer,以及360的二級索引方案。上面的目前使用比較普遍的應該是Lily HBase Indexer,可是咱們有時候只想實現一些簡單的功能或者比較特殊的功能的時候,須要本身寫協處理器進行處理。學習HBase的協處理器對於瞭解HBase架構是有幫助的。java

2、 什麼是HBase的協處理器

協處理器分兩種類型,系統協處理器能夠全局導入region server上的全部數據表,表協處理器便是用戶能夠指定一張表使用協處理器。shell

Hbase的coprocessor分爲兩類,Observer和EndPoint。其中Observer至關於觸發器,EndPoint至關於存儲過程。其中Observer的代碼部署在服務端,至關於對API調用的代理。apache

另外一個是終端(endpoint),動態的終端有點像存儲過程。
Observer緩存

觀察者的設計意圖是容許用戶經過插入代碼來重載協處理器框架的upcall方法,而具體的事件觸發的callback方法由HBase的核心代碼來執行。協處理器框架處理全部的callback調用細節,協處理器自身只須要插入添加或者改變的功能。以HBase0.92版本爲例,它提供了三種觀察者接口:安全

  • RegionObserver:提供客戶端的數據操縱事件鉤子:Get、Put、Delete、Scan等。
  • WALObserver:提供WAL相關操做鉤子。
  • MasterObserver:提供DDL-類型的操做鉤子。如建立、刪除、修改數據表等。

這些接口能夠同時使用在同一個地方,按照不一樣優先級順序執行.用戶能夠任意基於協處理器實現複雜的HBase功能層。HBase有不少種事件能夠觸發觀察者方法,這些事件與方法從HBase0.92版本起,都會集成在HBase API中。不過這些API可能會因爲各類緣由有所改動,不一樣版本的接口改動比較大。服務器

3、 HBase協處理器同步數據到Solr

實時更新數據須要獲取到HBase的插入、更新和刪除操做。因爲HBase中的插入和更新都是對應RegionServer的Put操做,所以咱們須要使用RegionObserver中的"postPut"和"postDelete函數"。至於Truncate操做則須要使用MasterObserver。
咱們須要作的就是攔截put和delete操做,將裏面的內容獲取出來,寫入Solr。 對應的協處理器代碼以下:架構

 
 
 
 
 
package com.bqjr.bigdata.HBaseObserver.server;import com.bqjr.bigdata.HBaseObserver.entity.SolrServerManager;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.client.Delete;import org.apache.hadoop.hbase.client.Durability;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;import org.apache.hadoop.hbase.coprocessor.ObserverContext;import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;import org.apache.hadoop.hbase.regionserver.wal.WALEdit;import org.apache.hadoop.hbase.util.Bytes;import org.apache.solr.client.solrj.SolrServerException;import org.apache.solr.common.SolrInputDocument;import java.io.IOException;/** * Created by hp on 2017-02-15. */ public class HBaseIndexerToSolrObserver extends BaseRegionObserver{ String[] columns = {"test_age","test_name"}; String collection = "bqjr"; SolrServerManager solrManager = new SolrServerManager(collection); @Override public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { String rowkey= Bytes.toString(put.getRow()); SolrInputDocument doc = new SolrInputDocument(); for(String column : columns){ if(put.has(Bytes.toBytes("cf1"),Bytes.toBytes(column))){ doc.addField(column,Bytes.toString(CellUtil.cloneValue(put.get(Bytes.toBytes("cf1"),Bytes.toBytes(column)).get(0))));} } try { solrManager.addDocToCache(doc); } catch (SolrServerException e1) { e1.printStackTrace(); } } @Override public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException{ String rowkey= Bytes.toString(delete.getRow()); try { solrManager.delete(rowkey); } catch (SolrServerException e1) { e1.printStackTrace(); } }}

大致的寫入流程咱們已經完成了,接下來就是Solr的寫入實現了。因爲Solr須要使用Zookeeper等信息,咱們能夠直接經過HBase的conf中獲取Zookeeper相關信息來構造所須要的SolrCloudServer。
另外一方面,咱們不能來了一條數據就立刻寫入,這樣很是消耗資源。所以咱們須要作一個緩存,將這些Solr數據暫時保存在裏面,定時 + 定量的發送。代碼以下框架

 
 
 
 
 
package com.bqjr.bigdata.HBaseObserver.entity;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.solr.client.solrj.SolrServerException;import org.apache.solr.client.solrj.impl.CloudSolrServer;import org.apache.solr.client.solrj.response.UpdateResponse;import org.apache.solr.common.SolrInputDocument;import java.io.IOException;import java.util.*;/** * Created by hp on 2017-02-15. */public class SolrServerManager { public static String ZKHost = ""; public static String ZKPort = ""; int zkClientTimeout = 1800000;// 心跳 int zkConnectTimeout = 1800000;// 鏈接時間 CloudSolrServer solrServer; private static String defaultCollection; int maxCache = 10000; public static List<SolrInputDocument> cache = new LinkedList<SolrInputDocument>(); private static int maxCommitTime = 60; //最大提交時間,s public SolrServerManager(String collection) { defaultCollection = collection; Configuration conf = HBaseConfiguration.create(); ZKHost = conf.get("hbase.zookeeper.quorum", "bqdpm1,bqdpm2,bqdps2"); ZKPort = conf.get("hbase.zookeeper.property.clientPort", "2181"); String SolrUrl = ZKHost + ":" + ZKPort + "/" + "solr"; solrServer = new CloudSolrServer(SolrUrl); solrServer.setDefaultCollection(defaultCollection); solrServer.setZkClientTimeout(zkClientTimeout); solrServer.setZkConnectTimeout(zkConnectTimeout); //啓動定時任務,第一次延遲10執行,以後每隔指定時間執行一次 Timer timer = new Timer(); timer.schedule(new CommitTimer(), 10 * 1000L, maxCommitTime * 1000L); } public UpdateResponse put(SolrInputDocument doc) throws IOException, SolrServerException { solrServer.add(doc); return solrServer.commit(false, false); } public UpdateResponse put(List<SolrInputDocument> docs) throws IOException, SolrServerException { solrServer.add(docs); return solrServer.commit(false, false); } public void addDocToCache(SolrInputDocument doc) throws IOException, SolrServerException { synchronized (cache) { cache.add(doc); if (cache.size() >= maxCache) { this.put(cache); cache.clear(); } } } public UpdateResponse delete(String rowkey) throws IOException, SolrServerException { solrServer.deleteById(rowkey); return solrServer.commit(false, false); } /** * 提交定時器 */ static class CommitTimer extends TimerTask { @Override public void run() { synchronized (cache) { try { new SolrServerManager(defaultCollection).put(cache); cache.clear(); } catch (IOException e) { e.printStackTrace(); } catch (SolrServerException e) { e.printStackTrace(); } cache.clear(); } } }}

4、 添加協處理器

 
 
 
 
 
#先禁用這張表disable 'HBASE_OBSERVER_TEST'#爲這張表添加協處理器,設置的參數具體爲: jar文件路徑|類名|優先級(SYSTEM或者USER)alter 'HBASE_OBSERVER_TEST','coprocessor'=>'hdfs://bqdpm1:8020/ext_lib/HBaseObserver-1.0.0.jar|com.bqjr.bigdata.HBaseObserver.server.HBaseIndexerToSolrObserver||'#啓用這張表enable 'HBASE_OBSERVER_TEST'#刪除某個協處理器,"$<bumber>"後面跟的ID號與desc裏面的ID號相同alter 'HBASE_OBSERVER_TEST',METHOD=>'table_att_unset',NAME => 'coprocessor$1'

5、 測試

嘗試插入一條數據put 'HBASE_OBSERVER_TEST','001','cf1:test_age','18'
結果Solr中一條數據都沒有
ide

而後查看了regionserver的日誌發現,沒有找到SolrJ的類
函數

而後咱們將全部的依賴加到Jar包裏面以後,再次運行。就能夠看到數據了。

測試Delete功能

測試進行到這裏就完了嗎?固然不是
咱們嘗試再插入一條put 'HBASE_OBSERVER_TEST','001','cf1:test_name','Bob'

理論上咱們須要在Solr中看到 test_age = 18,test_name = Bob。
可是在Solr中只有一條數據

因而咱們須要使用到Solr的原子更新功能。將postPut改爲下面這樣的代碼便可

 
 
 
 
 
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { String rowkey= Bytes.toString(put.getRow()); Long version = 1L; SolrInputDocument doc = new SolrInputDocument(); for(String column : columns){ if(put.has(Bytes.toBytes("cf1"),Bytes.toBytes(column))){ Cell cell = put.get(Bytes.toBytes("cf1"),Bytes.toBytes(column)).get(0); Map<String, String > operation = new HashMap<String,String>(); operation.put("set",Bytes.toString(CellUtil.cloneValue(cell))); doc.setField(column,operation); } } doc.addField("id",rowkey);// doc.addField("_version_",version); try { solrManager.addDocToCache(doc); } catch (SolrServerException e1) { e1.printStackTrace(); } }

再次插入數據

查看Solr結果

6、 協處理器動態加載

hbase的官方文檔指出動態級別的協處理器,能夠作到不重啓hbase,更新協處理,作法就是
禁用表,卸載協處理器,從新指定協處理器, 激活表,便可,但實際測試發現
動態加載無效,是hbase的一個bug,看這個連接:
https://issues.apache.org/jira/browse/HBASE-8445
由於協處理器,已經被JVM加載,即便刪除jar也不能從新load的jar,由於cache裏面的hdfs的jar路徑,沒有變化,因此動態更新無效
,除非重啓JVM,那樣就意味着,須要重啓RegionServer,
裏面的小夥伴們指出了兩種辦法,使協處理器加載生效:
(1)滾動重啓regionserver,避免停掉全部的節點
(2)改變協處理器的jar的類名字或者hdfs加載路徑,以方便有新的ClassLoad去加載它

但整體來看,第2種方法,比較安全,第一種風險太大,通常狀況下沒有人會隨便滾動重啓線上的服務器的,這隻在hbase升級的時候使用

相關文章
相關標籤/搜索