Elasticsearch學習總結六 使用Observer實現HBase到Elasticsearch的數據同步

  •     最近在公司作統一日誌收集處理平臺,技術選型確定要選擇elasticsearch,由於能夠快速檢索系統日誌,日誌問題排查及功業務鏈調用能夠被快速檢索,公司各個應用的日誌有些字段好比說content是不須要在es中做爲存儲的,當時考慮使用一種keyValue形式的數據庫做存儲,而後使用hbase的Rowkey做爲es的docId,實現數據檢索在es中,存儲在hbase中,這樣能夠大大減輕es的存儲壓力。node

  • 什麼是 Observergit

HBase 0.92 版本引入了協處理器(Coprocessor),可使開發者將本身的代碼嵌入到 HBase 中,其中協處理器分爲兩大塊,一個是終端(Endpoint),另外一個是本文將要介紹的觀察者(Observer)。github

Observer 有些相似於 MySQL 中的觸發器(Trigger),它能夠爲 HBase 中的操做添加鉤子,並在事件發生後實現本身的的業務邏輯。數據庫

  • Observer 主要分爲三種:

RegionObserver:增刪改查相關,例如 Get、Put、Delete、Scan 等 WALObserver:WAL 操做相關 MasterObserver:DDL-類型相關,例如建立、刪除、修改數據表等json

數據同步將會使用 RegionObserver 監聽 Put 和 Delete 事件。緩存

  • 如何實現自定義的的 Observer

每個 Observer 都是一個 Jar 包。首先須要引入hbase-server包,並實現如BaseRegionObserver等 HBase 提供的相關接口,重寫須要監聽對應事件的方法。服務器

實現數據同步功能能夠重寫postPut和putDelete方法監聽 Put 和 Delete 事件。app

下面就是一個最簡單的例子,在這兩個方法中分別獲得 hbsae表名和 RowKey 分別對應着es中的indexName和docIdelasticsearch

public class HbaseToEsObserver extends BaseRegionObserver {
    private static Client client = null;
    private static final Log LOG = LogFactory.getLog(HbaseToEsObserver.class);
    public static final String SEARCH_INDICE_PATTERN = "idx_%s_%s";
    /**
     * 讀取HBase Shell的指令參數
     * @param env
     */
    private void readConfiguration(CoprocessorEnvironment env) {
        Configuration conf = env.getConfiguration();
        EsConfig.clusterName = conf.get("es_cluster");
        EsConfig.nodeHost = conf.get("es_host");
        EsConfig.nodePort = conf.getInt("es_port", 9300);
        EsConfig.indexName = conf.get("es_index");
        EsConfig.typeName = conf.get("es_type");
        LOG.info("observer -- started with config: " + EsConfig.getInfo());
    }

    @Override
    public void start(CoprocessorEnvironment env) throws IOException {
        readConfiguration(env);
        client = EsSearchManager.getInstance().getClient();
    }
    
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,                       Durability durability) {
       try {
            LOG.debug("es 索引開始 begin");
            String indexId = new String(put.getRow());
            Map<byte[], List<Cell>> familyMap =  put.getFamilyCellMap();
            Map<String, Object> json = new HashMap<String, Object>();
            for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
                for (Cell cell : entry.getValue()) {
                    String key = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String value = Bytes.toString(CellUtil.cloneValue(cell));
                    json.put(key, value);
                    LOG.info("key="+key+"value="+value);
                }
            }
           //es中索引表的名稱是idx_xxx_xxx
           String tableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
           String indexName = String.format(SEARCH_INDICE_PATTERN, EsConfig.indexName,tableName).toLowerCase();
           ElasticSearchUtil.addUpdateBuilderToBulk(client.prepareUpdate(indexName, EsConfig.typeName, indexId).setUpsert(json));
        } catch (Exception ex) {
            LOG.error(ex);
        }
    }

public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
        try {
            String indexId = new String(delete.getRow());
            ElasticSearchUtil.addDeleteBuilderToBulk(client.prepareDelete(EsConfig.indexName, EsConfig.typeName, indexId));
            LOG.info("observer -- delete a doc: " + indexId);
        } catch (Exception ex) {
            LOG.error(ex);
        }
    }
  • 當日志hbase中一條條插入到hbase中的時候就會觸發協處理器動做,爲了減輕es服務器操做的壓力咱們批量操做es中的數據,先將索引數據存儲到BulkRequestBuilder,當緩衝池中的索引數據爲10條或者當提交間隔達到最大提交間隔的時候批量將索引數據發送到es服務器中。下面看下ElasticSearchUtil中的代碼
public class ElasticSearchUtil {
    private static final Log LOG = LogFactory.getLog(ElasticSearchUtil.class);
    // 緩衝池容量
    private static final int MAX_BULK_COUNT = 10;
    // 最大提交間隔(秒)
    private static final int MAX_COMMIT_INTERVAL = 60 * 2;
    private static Client client = null;
    private static BulkRequestBuilder bulkRequestBuilder = null;
    private static Lock commitIndexLock= new ReentrantLock();

    static {
        try {
           client = EsSearchManager.getInstance().getClient();
           bulkRequestBuilder = client.prepareBulk();
           bulkRequestBuilder.setRefresh(true);
           ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
           executor.scheduleWithFixedDelay(
                   new CommitIndexTimer(),
                   30 * 1000,
                   MAX_COMMIT_INTERVAL * 1000,
                   TimeUnit.MILLISECONDS);
        }catch(Exception e){
            LOG.error(e.getMessage());
         }
    }

    /**
     * 判斷緩存池是否已滿,批量提交
     *
     * @param threshold
     */
    private static void bulkRequest(int threshold) {
        if (bulkRequestBuilder.numberOfActions() > threshold) {
            LOG.info("執行索引程序,當前池中待索引數量="+bulkRequestBuilder.numberOfActions());
            BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
            if (!bulkResponse.hasFailures()) {
                LOG.info("es索引程序成功!");
                bulkRequestBuilder = client.prepareBulk();
            }
            if (bulkResponse.hasFailures()) {
                LOG.error("es索引異常:"+bulkResponse.buildFailureMessage());
            }
        }
    }

    /**
     * 定時任務,避免RegionServer遲遲無數據更新,致使ElasticSearch沒有與HBase同步
     * 定時執行
     */
    static class CommitIndexTimer implements Runnable {
        @Override
        public void run() {
            commitIndexLock.lock();
            try {
                bulkRequest(0);
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                commitIndexLock.unlock();
            }
        }
    }
}

而後將項目打成jar包,提交到hdfs中,而後使用 HBase Shell 建立一個表,將這個 Observer 掛到該表中:ide

create 'businessslog','info'
disable 'businessslog'

alter 'businessslog',METHOD =>'table_att','coprocessor' => 'hdfs://hadoop26:9000/observer.jar|com.github.hbase.observer.HbaseToEsObserver|1001|es_cluster=myes,es_type=loginfo,es_index=test,es_port=9300,es_host=114.55.253.15'

enable 'businessslog'		
describe 'businessslog'

最後使用 describe 'businessslog' 命令就能夠查看協處理器是否掛載成功,使用命令掛載協處理器仍是有點麻煩,爲此 封裝了hbase建立表的時候自動創建協處理器的代碼以下,不用在使用麻煩的命令創建協處理器了,直接調用Java 方法建立,方便了許多

public void createTableWithCoprocessor(String tableName,String oberverName,String path,Map<String,String> map, String...familyColumn) throws Exception {
        TableName table = TableName.valueOf(tableName);
        Admin admin = getConn().getAdmin();
        boolean isExists = admin.tableExists(table);
        if(isExists){
            return ;
        }else{
            try {
                HTableDescriptor htd = new HTableDescriptor(table);
                for (String fc : familyColumn) {
                    HColumnDescriptor hcd = new HColumnDescriptor(fc);
                    htd.addFamily(hcd);
                }
                admin.createTable(htd);
                admin.disableTable(table);
                HTableDescriptor hTableDescriptor = new HTableDescriptor(table);
                for (String fc : familyColumn) {
                    HColumnDescriptor hcd = new HColumnDescriptor(fc);
                    hTableDescriptor.addFamily(hcd);
                }
                hTableDescriptor.addCoprocessor(oberverName, new Path(path), Coprocessor.PRIORITY_USER, map);
                admin.modifyTable(table, hTableDescriptor);
                admin.enableTable(table);
                admin.close();
            } catch (IOException e) {
                logger.error(e.getMessage());
            }
        }
    }

總結: es:能夠實現複雜快速查詢,可是不適合存儲海量數據(針對一些大字段,不存儲) hbase:能夠實現海量數據存儲,可是不適合進行復雜查詢 es+hbase能夠實現海量數據的複雜快速查詢,在這裏es能夠認爲是hbase的二級索引

es中還須要將mapping映射配置正確,確保某些大字段創建索引 不存儲,這裏就在贅述,如上就能夠實現當檢索的時候仍是在es中查詢,當查詢具體能容的時候再去hbase根據rowkey也就是es中的docId定位具體日誌內容。

以上總結了部分代碼,詳細的代碼請查看github地址 https://github.com/winstonelei/BigDataTools ,包括了一些大數據組件的基本操做,包含了hbase,hadoop,es,hive等

相關文章
相關標籤/搜索