使用Hbase協做器(Coprocessor)同步數據到ElasticSearch

使用Hbase協做器(Coprocessor)同步數據到ElasticSearch
最近項目中須要將Hbase中的數據同步到ElasticSearch中,需求就是隻要往Hbase裏面put或者delete數據,那麼ES集羣中,相應的索引下,也須要更新或者刪除這條數據。本人使用了hbase-rirver插件,發現並無那麼好用,因而到網上找了一些資料,本身整理研究了一下,就本身寫了一個同步數據的組件,基於Hbase的協做器,效果還不錯,如今共享給你們,若是你們發現什麼須要優化或者改正的地方,能夠在個人csdn博客:個人csdn博客地址上面私信我給我留言,代碼託管在碼雲上Hbase-Observer-ElasticSearch。同時要感謝Gavin Zhang 2shou,我雖然不認識Gavin Zhang 2shou,(2shou的同步數據博文)可是我是看了他寫的代碼以及博客以後,(2shou的同步組件代碼)在他的基礎之上對代碼作了部分優化以及調整,來知足我自己的需求,因此在此表示感謝,但願我把個人代碼開源出來,其餘人看到以後也能激發大家的靈感,來寫出更多更好更加實用的東西:java

Hbase協做器(Coprocessor)
編寫組件
部署組件
驗證組件
總結
Hbase協做器(Coprocessor)
HBase 0.92版本後推出了Coprocessor — 協處理器,一個工做在Master/RegionServer中的框架,能運行用戶的代碼,從而靈活地完成分佈式數據處理的任務。node

HBase 支持兩種類型的協處理器,Endpoint 和 Observer。Endpoint 協處理器相似傳統數據庫中的存儲過程,客戶端能夠調用這些 Endpoint 協處理器執行一段 Server 端代碼,並將 Server 端代碼的結果返回給客戶端進一步處理,最多見的用法就是進行彙集操做。若是沒有協處理器,當用戶須要找出一張表中的最大數據,即 max 聚合操做,就必須進行全表掃描,在客戶端代碼內遍歷掃描結果,並執行求最大值的操做。這樣的方法沒法利用底層集羣的併發能力,而將全部計算都集中到 Client 端統一執行,勢必效率低下。利用 Coprocessor,用戶能夠將求最大值的代碼部署到 HBase Server 端,HBase 將利用底層 cluster 的多個節點併發執行求最大值的操做。即在每一個 Region 範圍內執行求最大值的代碼,將每一個 Region 的最大值在 Region Server 端計算出,僅僅將該 max 值返回給客戶端。在客戶端進一步將多個 Region 的最大值進一步處理而找到其中的最大值。這樣總體的執行效率就會提升不少。
另一種協處理器叫作 Observer Coprocessor,這種協處理器相似於傳統數據庫中的觸發器,當發生某些事件的時候這類協處理器會被 Server 端調用。Observer Coprocessor 就是一些散佈在 HBase Server 端代碼中的 hook 鉤子,在固定的事件發生時被調用。好比:put 操做以前有鉤子函數 prePut,該函數在 put 操做執行前會被 Region Server 調用;在 put 操做以後則有 postPut 鉤子函數。
在實際的應用場景中,第二種Observer Coprocessor應用起來會比較多一點,由於第二種方式比較靈活,能夠針對某張表進行綁定,假如hbase有十張表,我只想綁定其中的5張表,另外五張不須要處理,就不綁定便可,下面我要介紹的也是第二種方式。
編寫組件
首先編寫一個ESClient客戶端,用於連接訪問的ES集羣代碼。shell

package org.eminem.hbase.observer;

import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.lang3.StringUtils;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;

/**
 * ES Cleint class
 */
public class ESClient {

    // ElasticSearch的集羣名稱
    public static String clusterName;
    // ElasticSearch的host
    public static String nodeHost;
    // ElasticSearch的端口(Java API用的是Transport端口,也就是TCP)
    public static int nodePort;
    // ElasticSearch的索引名稱
    public static String indexName;
    // ElasticSearch的類型名稱
    public static String typeName;
    // ElasticSearch Client
    public static Client client;

    /**
     * get Es config
     *
     * @return
     */
    public static String getInfo() {
        List<String> fields = new ArrayList<String>();
        try {
            for (Field f : ESClient.class.getDeclaredFields()) {
                fields.add(f.getName() + "=" + f.get(null));
            }
        } catch (IllegalAccessException ex) {
            ex.printStackTrace();
        }
        return StringUtils.join(fields, ", ");
    }

    /**
     * init ES client
     */
    public static void initEsClient() {
        Settings settings = ImmutableSettings.settingsBuilder()
                .put("cluster.name", ESClient.clusterName).build();
        client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(
                        ESClient.nodeHost, ESClient.nodePort));
    }

    /**
     * Close ES client
     */
    public static void closeEsClient() {
        client.close();
    }
}


而後編寫一個Class類,繼承BaseRegionObserver,並複寫其中的start()、stop()、postPut()、postDelete()、四個方法。這四個方法其實很好理解,分別表示協做器開始、協做器結束、put事件觸發並將數據存入hbase以後咱們能夠作一些事情,delete事件觸發並將數據從hbase刪除以後咱們能夠作一些事情。咱們只要將初始化ES客戶端的代碼寫在start中,在stop中關閉ES客戶端以及定義好的Scheduled對象便可。兩個觸發事件分別bulk hbase中的數據到ES,就輕輕鬆鬆的搞定了。數據庫

package org.eminem.hbase.observer;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;

/**
 * Hbase Sync data to Es Class
 */
public class HbaseDataSyncEsObserver extends BaseRegionObserver {

    private static final Log LOG = LogFactory.getLog(HbaseDataSyncEsObserver.class);


    /**
     * read es config from params
     * @param env
     */
    private static void readConfiguration(CoprocessorEnvironment env) {
        Configuration conf = env.getConfiguration();
        ESClient.clusterName = conf.get("es_cluster");
        ESClient.nodeHost = conf.get("es_host");
        ESClient.nodePort = conf.getInt("es_port", -1);
        ESClient.indexName = conf.get("es_index");
        ESClient.typeName = conf.get("es_type");
    }

    /**
     *  start
     * @param e
     * @throws IOException
     */
    @Override
    public void start(CoprocessorEnvironment e) throws IOException {
        // read config
         readConfiguration(e);
         // init ES client
         ESClient.initEsClient();
        LOG.error("------observer init EsClient ------"+ESClient.getInfo());
    }

    /**
     * stop
     * @param e
     * @throws IOException
     */
    @Override
    public void stop(CoprocessorEnvironment e) throws IOException {
        // close es client
       ESClient.closeEsClient();
       // shutdown time task
       ElasticSearchBulkOperator.shutdownScheduEx();
    }

    /**
     * Called after the client stores a value
     * after data put to hbase then prepare update builder to bulk  ES
     *
     * @param e
     * @param put
     * @param edit
     * @param durability
     * @throws IOException
     */
    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        String indexId = new String(put.getRow());
        try {
            NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
            Map<String, Object> infoJson = new HashMap<String, Object>();
            Map<String, Object> json = new HashMap<String, Object>();
            for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
                for (Cell cell : entry.getValue()) {
                    String key = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String value = Bytes.toString(CellUtil.cloneValue(cell));
                    json.put(key, value);
                }
            }
            // set hbase family to es
            infoJson.put("info", json);
            ElasticSearchBulkOperator.addUpdateBuilderToBulk(ESClient.client.prepareUpdate(ESClient.indexName, ESClient.typeName, indexId).setDocAsUpsert(true).setDoc(infoJson));
        } catch (Exception ex) {
            LOG.error("observer put  a doc, index [ " + ESClient.indexName + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());
        }
    }


    /**
     * Called after the client deletes a value.
     * after data delete from hbase then prepare delete builder to bulk  ES
     * @param e
     * @param delete
     * @param edit
     * @param durability
     * @throws IOException
     */
    @Override
    public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
        String indexId = new String(delete.getRow());
        try {
            ElasticSearchBulkOperator.addDeleteBuilderToBulk(ESClient.client.prepareDelete(ESClient.indexName, ESClient.typeName, indexId));
        } catch (Exception ex) {
            LOG.error(ex);
            LOG.error("observer delete  a doc, index [ " + ESClient.indexName + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());

        }
    }
}

這段代碼中info節點是根據我這邊自身的需求加的,你們能夠結合自身需求,去掉這個info節點,直接將hbase中的字段寫入到ES中去。咱們的需求須要把hbase的Family也要插入到ES中。apache

最後就是比較關鍵的bulk ES代碼,結合2shou的代碼,我本身寫的這部分代碼,沒有使用Timer,而是使用了ScheduledExecutorService,至於爲何不使用Timer,你們能夠去百度上面搜索下這兩個東東的區別,我在這裏就不作過多的介紹了。在ElasticSearchBulkOperator這個類中,我使用ScheduledExecutorService週期性的執行一個任務,去判斷緩衝池中,是否有須要bulk的數據,閥值是10000.每30秒執行一次,若是達到閥值,那麼就會當即將緩衝池中的數據bulk到ES中,並清空緩衝池中的數據,等待下一次定時任務的執行。固然,初始化定時任務須要一個beeper響鈴的線程,delay時間10秒。還有一個很重要的就是須要對bulk的過程進行加鎖操做。json

package org.eminem.hbase.observer;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Bulk hbase data to ElasticSearch Class
 */
public class ElasticSearchBulkOperator {

    private static final Log LOG = LogFactory.getLog(ElasticSearchBulkOperator.class);

    private static final int MAX_BULK_COUNT = 10000;

    private static BulkRequestBuilder bulkRequestBuilder = null;

    private static final Lock commitLock = new ReentrantLock();

    private static ScheduledExecutorService scheduledExecutorService = null;

    static {
        // init es bulkRequestBuilder
        bulkRequestBuilder = ESClient.client.prepareBulk();
        bulkRequestBuilder.setRefresh(true);

        // init thread pool and set size 1
        scheduledExecutorService = Executors.newScheduledThreadPool(1);

        // create beeper thread( it will be sync data to ES cluster)
        // use a commitLock to protected bulk es as thread-save
        final Runnable beeper = new Runnable() {
            public void run() {
                commitLock.lock();
                try {
                    bulkRequest(0);
                } catch (Exception ex) {
                    System.out.println(ex.getMessage());
                    LOG.error("Time Bulk " + ESClient.indexName + " index error : " + ex.getMessage());
                } finally {
                    commitLock.unlock();
                }
            }
        };

        // set time bulk task
        // set beeper thread(10 second to delay first execution , 30 second period between successive executions)
        scheduledExecutorService.scheduleAtFixedRate(beeper, 10, 30, TimeUnit.SECONDS);

    }

    /**
     * shutdown time task immediately
     */
    public static void shutdownScheduEx() {
        if (null != scheduledExecutorService && !scheduledExecutorService.isShutdown()) {
            scheduledExecutorService.shutdown();
        }
    }

    /**
     * bulk request when number of builders is grate then threshold
     *
     * @param threshold
     */
    private static void bulkRequest(int threshold) {
        if (bulkRequestBuilder.numberOfActions() > threshold) {
            BulkResponse bulkItemResponse = bulkRequestBuilder.execute().actionGet();
            if (!bulkItemResponse.hasFailures()) {
                bulkRequestBuilder = ESClient.client.prepareBulk();
            }
        }
    }

    /**
     * add update builder to bulk
     * use commitLock to protected bulk as thread-save
     * @param builder
     */
    public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {
        commitLock.lock();
        try {
            bulkRequestBuilder.add(builder);
            bulkRequest(MAX_BULK_COUNT);
        } catch (Exception ex) {
            LOG.error(" update Bulk " + ESClient.indexName + " index error : " + ex.getMessage());
        } finally {
            commitLock.unlock();
        }
    }

    /**
     * add delete builder to bulk
     * use commitLock to protected bulk as thread-save
     *
     * @param builder
     */
    public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {
        commitLock.lock();
        try {
            bulkRequestBuilder.add(builder);
            bulkRequest(MAX_BULK_COUNT);
        } catch (Exception ex) {
            LOG.error(" delete Bulk " + ESClient.indexName + " index error : " + ex.getMessage());
        } finally {
            commitLock.unlock();
        }
    }
}


至此,代碼已經所有完成了,接下來只須要咱們打包部署便可。併發

部署組件
使用maven打包框架

mvn clean package


使用shell命令上傳到hdfselasticsearch

hadoop fs -put hbase-observer-elasticsearch-1.0-SNAPSHOT-zcestestrecord.jar /hbase_es 
hadoop fs -chmod -R 777 /hbase_es 

驗證組件
hbase shellmaven

create 'test_record','info'

disable 'test_record'

alter 'test_record', METHOD => 'table_att', 'coprocessor' => 'hdfs:///hbase_es/hbase-observer-elasticsearch-1.0-SNAPSHOT-zcestestrecord.jar|org.eminem.hbase.observer.HbaseDataSyncEsObserver|1001|es_cluster=zcits,es_type=zcestestrecord,es_index=zcestestrecord,es_port=9100,es_host=master'

enable 'test_record'

put 'test_record','test1','info:c1','value1'
deleteall 'test_record','test1'


綁定操做以前須要,在ES集羣中創建好相應的索引如下是對綁定代碼的解釋: 
把Java項目打包爲jar包,上傳到HDFS的特定路徑 
進入HBase Shell,disable你但願加載的表 
經過alert 命令激活Observer 
coprocessor對應的格式以|分隔,依次爲: 
- jar包的HDFS路徑 
- Observer的主類 
- 優先級(通常不用改) 
- 參數(通常不用改) 
- 新安裝的coprocessor會自動生成名稱:coprocessor + $ + 序號(經過describe table_name可查看)

之後對jar包內容作了調整,須要從新打包並綁定新jar包,再綁定以前須要作目標表作解綁操做,加入目標表以前綁定了同步組件的話,如下是解綁的命令

hbase shell

disable 'test_record'
alter 'test_record', METHOD => 'table_att_unset',NAME => 'coprocessor$1'
enable 'test_record'
desc 'test_record'


總結
綁定以後若是在執行的過程當中有報錯或者同步不過去,能夠到hbase的從節點上的logs目錄下,查看hbase-roor-regionserver-slave*.log文件。由於協做器是部署在regionserver上的,因此要到從節點上面去看日誌,而不是master節點。

hbase-river插件以前下載了源代碼看了下,hbase-river插件是週期性的scan整張表進行bulk操做,而咱們這裏本身寫的這個組件呢,是基於hbase的觸發事件來進行的,二者的效果和性能不言而喻,一個是全量的,一個是增量的,咱們在實際的開發中,確定是但願若是有數據更新了或者刪除了,咱們只要對着部分數據進行同步就好了,沒有修改或者刪除的數據,咱們能夠不用去理會。

Timer和 ScheduledExecutorService,在這裏我選擇了ScheduledExecutorService,2shou以前提到過部署插件有個坑,修改Java代碼後,上傳到HDFS的jar包文件必須和以前不同,不然就算卸載掉原有的coprocessor再從新安裝也不能生效,這個坑我也碰到了,就是由於沒有複寫stop方法,將定時任務停掉,線程一直會掛在那裏,並且一旦報錯將會致使hbase沒法啓動,必需要kill掉相應的線程。這個坑,坑了我一段時間,你們千萬要注意,必定記得要複寫stop方法,關閉以前打開的線程或者客戶端,這樣纔是最好的方式。

相關文章
相關標籤/搜索