最近在公司作統一日誌收集處理平臺,技術選型確定要選擇elasticsearch,由於能夠快速檢索系統日誌,日誌問題排查及功業務鏈調用能夠被快速檢索,公司各個應用的日誌有些字段好比說content是不須要在es中做爲存儲的,當時考慮使用一種keyValue形式的數據庫做存儲,而後使用hbase的Rowkey做爲es的docId,實現數據檢索在es中,存儲在hbase中,這樣能夠大大減輕es的存儲壓力。node
什麼是 Observergit
HBase 0.92 版本引入了協處理器(Coprocessor),可使開發者將本身的代碼嵌入到 HBase 中,其中協處理器分爲兩大塊,一個是終端(Endpoint),另外一個是本文將要介紹的觀察者(Observer)。github
Observer 有些相似於 MySQL 中的觸發器(Trigger),它能夠爲 HBase 中的操做添加鉤子,並在事件發生後實現本身的的業務邏輯。數據庫
RegionObserver:增刪改查相關,例如 Get、Put、Delete、Scan 等 WALObserver:WAL 操做相關 MasterObserver:DDL-類型相關,例如建立、刪除、修改數據表等json
數據同步將會使用 RegionObserver 監聽 Put 和 Delete 事件。緩存
每個 Observer 都是一個 Jar 包。首先須要引入hbase-server包,並實現如BaseRegionObserver等 HBase 提供的相關接口,重寫須要監聽對應事件的方法。服務器
實現數據同步功能能夠重寫postPut和putDelete方法監聽 Put 和 Delete 事件。app
下面就是一個最簡單的例子,在這兩個方法中分別獲得 hbsae表名和 RowKey 分別對應着es中的indexName和docIdelasticsearch
public class HbaseToEsObserver extends BaseRegionObserver { private static Client client = null; private static final Log LOG = LogFactory.getLog(HbaseToEsObserver.class); public static final String SEARCH_INDICE_PATTERN = "idx_%s_%s"; /** * 讀取HBase Shell的指令參數 * @param env */ private void readConfiguration(CoprocessorEnvironment env) { Configuration conf = env.getConfiguration(); EsConfig.clusterName = conf.get("es_cluster"); EsConfig.nodeHost = conf.get("es_host"); EsConfig.nodePort = conf.getInt("es_port", 9300); EsConfig.indexName = conf.get("es_index"); EsConfig.typeName = conf.get("es_type"); LOG.info("observer -- started with config: " + EsConfig.getInfo()); } @Override public void start(CoprocessorEnvironment env) throws IOException { readConfiguration(env); client = EsSearchManager.getInstance().getClient(); } public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) { try { LOG.debug("es 索引開始 begin"); String indexId = new String(put.getRow()); Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap(); Map<String, Object> json = new HashMap<String, Object>(); for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) { for (Cell cell : entry.getValue()) { String key = Bytes.toString(CellUtil.cloneQualifier(cell)); String value = Bytes.toString(CellUtil.cloneValue(cell)); json.put(key, value); LOG.info("key="+key+"value="+value); } } //es中索引表的名稱是idx_xxx_xxx String tableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); String indexName = String.format(SEARCH_INDICE_PATTERN, EsConfig.indexName,tableName).toLowerCase(); ElasticSearchUtil.addUpdateBuilderToBulk(client.prepareUpdate(indexName, EsConfig.typeName, indexId).setUpsert(json)); } catch (Exception ex) { LOG.error(ex); } } public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete, final WALEdit edit, final Durability durability) throws IOException { try { String indexId = new String(delete.getRow()); ElasticSearchUtil.addDeleteBuilderToBulk(client.prepareDelete(EsConfig.indexName, EsConfig.typeName, indexId)); LOG.info("observer -- delete a doc: " + indexId); } catch (Exception ex) { LOG.error(ex); } }
public class ElasticSearchUtil { private static final Log LOG = LogFactory.getLog(ElasticSearchUtil.class); // 緩衝池容量 private static final int MAX_BULK_COUNT = 10; // 最大提交間隔(秒) private static final int MAX_COMMIT_INTERVAL = 60 * 2; private static Client client = null; private static BulkRequestBuilder bulkRequestBuilder = null; private static Lock commitIndexLock= new ReentrantLock(); static { try { client = EsSearchManager.getInstance().getClient(); bulkRequestBuilder = client.prepareBulk(); bulkRequestBuilder.setRefresh(true); ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); executor.scheduleWithFixedDelay( new CommitIndexTimer(), 30 * 1000, MAX_COMMIT_INTERVAL * 1000, TimeUnit.MILLISECONDS); }catch(Exception e){ LOG.error(e.getMessage()); } } /** * 判斷緩存池是否已滿,批量提交 * * @param threshold */ private static void bulkRequest(int threshold) { if (bulkRequestBuilder.numberOfActions() > threshold) { LOG.info("執行索引程序,當前池中待索引數量="+bulkRequestBuilder.numberOfActions()); BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet(); if (!bulkResponse.hasFailures()) { LOG.info("es索引程序成功!"); bulkRequestBuilder = client.prepareBulk(); } if (bulkResponse.hasFailures()) { LOG.error("es索引異常:"+bulkResponse.buildFailureMessage()); } } } /** * 定時任務,避免RegionServer遲遲無數據更新,致使ElasticSearch沒有與HBase同步 * 定時執行 */ static class CommitIndexTimer implements Runnable { @Override public void run() { commitIndexLock.lock(); try { bulkRequest(0); } catch (Exception ex) { ex.printStackTrace(); } finally { commitIndexLock.unlock(); } } } }
而後將項目打成jar包,提交到hdfs中,而後使用 HBase Shell 建立一個表,將這個 Observer 掛到該表中:ide
create 'businessslog','info' disable 'businessslog' alter 'businessslog',METHOD =>'table_att','coprocessor' => 'hdfs://hadoop26:9000/observer.jar|com.github.hbase.observer.HbaseToEsObserver|1001|es_cluster=myes,es_type=loginfo,es_index=test,es_port=9300,es_host=114.55.253.15' enable 'businessslog' describe 'businessslog'
最後使用 describe 'businessslog' 命令就能夠查看協處理器是否掛載成功,使用命令掛載協處理器仍是有點麻煩,爲此 封裝了hbase建立表的時候自動創建協處理器的代碼以下,不用在使用麻煩的命令創建協處理器了,直接調用Java 方法建立,方便了許多
public void createTableWithCoprocessor(String tableName,String oberverName,String path,Map<String,String> map, String...familyColumn) throws Exception { TableName table = TableName.valueOf(tableName); Admin admin = getConn().getAdmin(); boolean isExists = admin.tableExists(table); if(isExists){ return ; }else{ try { HTableDescriptor htd = new HTableDescriptor(table); for (String fc : familyColumn) { HColumnDescriptor hcd = new HColumnDescriptor(fc); htd.addFamily(hcd); } admin.createTable(htd); admin.disableTable(table); HTableDescriptor hTableDescriptor = new HTableDescriptor(table); for (String fc : familyColumn) { HColumnDescriptor hcd = new HColumnDescriptor(fc); hTableDescriptor.addFamily(hcd); } hTableDescriptor.addCoprocessor(oberverName, new Path(path), Coprocessor.PRIORITY_USER, map); admin.modifyTable(table, hTableDescriptor); admin.enableTable(table); admin.close(); } catch (IOException e) { logger.error(e.getMessage()); } } }
總結: es:能夠實現複雜快速查詢,可是不適合存儲海量數據(針對一些大字段,不存儲) hbase:能夠實現海量數據存儲,可是不適合進行復雜查詢 es+hbase能夠實現海量數據的複雜快速查詢,在這裏es能夠認爲是hbase的二級索引
es中還須要將mapping映射配置正確,確保某些大字段創建索引 不存儲,這裏就在贅述,如上就能夠實現當檢索的時候仍是在es中查詢,當查詢具體能容的時候再去hbase根據rowkey也就是es中的docId定位具體日誌內容。
以上總結了部分代碼,詳細的代碼請查看github地址 https://github.com/winstonelei/BigDataTools ,包括了一些大數據組件的基本操做,包含了hbase,hadoop,es,hive等