HBase 2.0 協處理器實現 ES 數據同步

標籤:hbase 2.0、elasticsearch、Coprocessor、協處理器

在正式進行講述實現以前,我以爲有必要說一下出發點。團隊期初數據都是基於 HBase+Phoenix 這樣架構進行持久化。隨着業務的複雜性增長,對部分表的查詢效率和查詢條件多樣性,提出了更高的要求。HBase+Phoenix 就會出現索引濫用。變動索引變的特別的頻繁,同時一些數據客觀的表,變動索引的代價是很是大的。html

在海量數據的查詢方面,Elasticsearch 具備出色的性能。若是 HBase+ES 是否是會是更好的解決方法呢?其實,這個時候會有一個思考點,Phoenix 是如何實現二級索引的?HBase 協處理器(Coprocessor)java

個人實現過程比較曲折,後文中也會提到,以幫助你們避免這些坑。在過程當中,還嘗試了另外一種實現方案。存放兩份數據,一份 HBase,一份 ES。該方案須要解決的一個問題——數據一致性問題,但這個問題協處理器能夠解決。在此過程當中,因爲不當操做,把 HBase 服務宕機了,現象是 REGION SERVERS 沒法啓動,只有經過硬刪的方式解決。node

出於不死心,在經歷重裝 HBase 以後。心裏又開始蠢蠢欲動。首先要聲明一下,咱們團隊的環境是 HDP 3.0、HBase 2.0 ,網上不少教程都是基於 1.X2.X 與 1.X 區別仍是挺大的。RegionObserver 從繼承方式改成了面向接口編程。git

協處理器

沒有選擇協處理狀況下,HBase 實現 RDBMS SQL 方式查詢數據,大量的 Filter 須要在客戶端進行編碼完成,代碼的臃腫,可維護性大大下降。若是這部分操做在服務器端完成,是不是更好的選擇呢。協處理就能幫助實現該設想,因爲在服務端完成,能夠集中式優化查詢,下降請求的帶寬和提升查詢效率。固然,對 HBase 性能產生了必定影響。github

類型

  • Observer
  • Endpoint

Observer

Observer 協處理器相似於 RDBMS 中的觸發器,當事件觸發的時候該類協處理器會被 Server 端調用。web

Endpoint

Endpoint 協處理器相似傳統數據庫中的存儲過程,完成一些聚合操做。shell

實現

基礎嘗試

避免 ES 鏈接操做、代碼複雜性致使的 Bug,在最初只經過打日誌的方式來驗證協處理方式。數據庫

代碼實現概覽

HbaseDataSyncEsObserver.javaapache

package com.tairanchina.csp.dmp.examples;

import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.Optional;

public class HbaseDataSyncEsObserver implements RegionObserver, RegionCoprocessor {

    private static final Logger LOG = Logger.getLogger(HbaseDataSyncEsObserver.class);

    public Optional<RegionObserver> getRegionObserver() {
        return Optional.of(this);
    }

    public void start(CoprocessorEnvironment env) throws IOException {
        LOG.info("====Test Start====");
    }

    public void stop(CoprocessorEnvironment env) throws IOException {
        LOG.info("====Test End====");
    }

    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        LOG.info("====Test postPut====");
    }
    public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
        LOG.info("====Test postDelete====");
    }
}

pom.xml編程

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.tairanchina.csp.dmp</groupId>
    <artifactId>hbase-observer-simple-example</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>HBase Observer Simple 用例</name>

    <properties>
        <hbase.version>2.0.0</hbase.version>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.deploy.skip>true</maven.deploy.skip>
        <maven.install.skip>true</maven.install.skip>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>jetty-servlet</artifactId>
                    <groupId>org.eclipse.jetty</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs-client</artifactId>
            <version>3.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>javax.servlet.jsp</artifactId>
                    <groupId>org.glassfish.web</groupId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <encoding>${project.build.sourceEncoding}</encoding>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

包處理

打包

mvn clean assembly:assembly -Dmaven.test.skip=true
這裏 package 獲得的包必須是將依賴都包含在內的,不然,會報類找不到之類的錯誤。

上傳包的時候,須要上傳到 HDFS 下,同時,要給 hbase 用戶授予權限,於是,我在測試的過程當中,將其上傳到 /apps/hbase 下(HDP 環境)。因爲包名太長,這裏對包名進行了重命名。

image-20190130144459205

裝載協處理器

# 建立測試表
create 'gejx_test','cf'
# 停用測試表
disable 'gejx_test'
# 表與協處理器創建關係
alter 'gejx_test' , METHOD =>'table_att','coprocessor'=>'hdfs://dev-dmp2.fengdai.org:8020/apps/hbase/hbase-observer-simple-example.jar|com.tairanchina.csp.dmp.examples.HbaseDataSyncEsObserver|1073741823'
# 啓用表
enable 'gejx_test'
# 查看錶信息
desc 'gejx_test'

image-20190130143803477

測試

put 'gejx_test', '2','cf:name','gjx1'
delete 'gejx_test', '2','cf:name'

image-20190130144821908

image-20190130144651647

查看日誌要先在 HBase Master UI 界面下,肯定數據存儲在哪一個節點上,再到相應的節點下面的 /var/log/hbase 下查看日誌

tail -100f hbase-hbase-regionserver-test.example.org.out

卸載協處理器

disable 'gejx_test'
alter 'gejx_test', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
enable 'gejx_test'

image-20190130145715133

以上,已經完成最基礎的協處理器實現。接下來進行講述 ES 的一種實現方案。

HBase+ES

這裏爲了快速論證結果,在編碼方面採用了硬編碼方式,但願理解。

代碼實現概覽

ElasticSearchBulkOperator.java

package com.tairanchina.csp.dmp.examples;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created on 2019/1/11.
 *
 * @author 跡_Jason
 */
public class ElasticSearchBulkOperator {

    private static final Log LOG = LogFactory.getLog(ElasticSearchBulkOperator.class);

    private static final int MAX_BULK_COUNT = 10000;

    private static BulkRequestBuilder bulkRequestBuilder = null;

    private static final Lock commitLock = new ReentrantLock();

    private static ScheduledExecutorService scheduledExecutorService = null;
    static {
        // init es bulkRequestBuilder
        bulkRequestBuilder = ESClient.client.prepareBulk();
        bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

        // init thread pool and set size 1
        scheduledExecutorService = Executors.newScheduledThreadPool(1);

        // create beeper thread( it will be sync data to ES cluster)
        // use a commitLock to protected bulk es as thread-save
        final Runnable beeper = () -> {
            commitLock.lock();
            try {
                bulkRequest(0);
            } catch (Exception ex) {
                System.out.println(ex.getMessage());
            } finally {
                commitLock.unlock();
            }
        };

        // set time bulk task
        // set beeper thread(10 second to delay first execution , 30 second period between successive executions)
        scheduledExecutorService.scheduleAtFixedRate(beeper, 10, 30, TimeUnit.SECONDS);

    }
    public static void shutdownScheduEx() {
        if (null != scheduledExecutorService && !scheduledExecutorService.isShutdown()) {
            scheduledExecutorService.shutdown();
        }
    }
    private static void bulkRequest(int threshold) {
        if (bulkRequestBuilder.numberOfActions() > threshold) {
            BulkResponse bulkItemResponse = bulkRequestBuilder.execute().actionGet();
            if (!bulkItemResponse.hasFailures()) {
                bulkRequestBuilder = ESClient.client.prepareBulk();
            }
        }
    }

    /**
     * add update builder to bulk
     * use commitLock to protected bulk as thread-save
     * @param builder
     */
    public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {
        commitLock.lock();
        try {
            bulkRequestBuilder.add(builder);
            bulkRequest(MAX_BULK_COUNT);
        } catch (Exception ex) {
            LOG.error(" update Bulk " + "gejx_test" + " index error : " + ex.getMessage());
        } finally {
            commitLock.unlock();
        }
    }

    /**
     * add delete builder to bulk
     * use commitLock to protected bulk as thread-save
     *
     * @param builder
     */
    public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {
        commitLock.lock();
        try {
            bulkRequestBuilder.add(builder);
            bulkRequest(MAX_BULK_COUNT);
        } catch (Exception ex) {
            LOG.error(" delete Bulk " + "gejx_test" + " index error : " + ex.getMessage());
        } finally {
            commitLock.unlock();
        }
    }
}

ESClient.java

package com.tairanchina.csp.dmp.examples;

/**
 * Created on 2019/1/10.
 *
 * @author 跡_Jason
 */

import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.net.InetAddress;
import java.net.UnknownHostException;

/**
 * ES Cleint class
 */
public class ESClient {
    
    public static Client client;
    
    /**
     * init ES client
     */
    public static void initEsClient() throws UnknownHostException {
        System.setProperty("es.set.netty.runtime.available.processors", "false");
        Settings esSettings = Settings.builder().put("cluster.name", "elasticsearch").build();//設置ES實例的名稱
        client = new PreBuiltTransportClient(esSettings).addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));

    }

    /**
     * Close ES client
     */
    public static void closeEsClient() {
        client.close();
    }
}

HbaseDataSyncEsObserver.java

package com.tairanchina.csp.dmp.examples;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.*;

/**
 * Created on 2019/1/10.
 *
 * @author 跡_Jason
 */
public class HbaseDataSyncEsObserver implements RegionObserver , RegionCoprocessor {

    private static final Logger LOG = Logger.getLogger(HbaseDataSyncEsObserver.class);

    public Optional<RegionObserver> getRegionObserver() {
        return Optional.of(this);
    }

    @Override
    public void start(CoprocessorEnvironment env) throws IOException {
        // init ES client
        ESClient.initEsClient();
        LOG.info("****init start*****");
    }

    @Override
    public void stop(CoprocessorEnvironment env) throws IOException {
        ESClient.closeEsClient();
        // shutdown time task
        ElasticSearchBulkOperator.shutdownScheduEx();
        LOG.info("****end*****");
    }

    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        String indexId = new String(put.getRow());
        try {
            NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
            Map<String, Object> infoJson = new HashMap<>();
            Map<String, Object> json = new HashMap<>();
            for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
                for (Cell cell : entry.getValue()) {
                    String key = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String value = Bytes.toString(CellUtil.cloneValue(cell));
                    json.put(key, value);
                }
            }
            // set hbase family to es
            infoJson.put("info", json);
            LOG.info(json.toString());
            ElasticSearchBulkOperator.addUpdateBuilderToBulk(ESClient.client.prepareUpdate("gejx_test","dmp_ods", indexId).setDocAsUpsert(true).setDoc(json));
            LOG.info("**** postPut success*****");
        } catch (Exception ex) {
            LOG.error("observer put  a doc, index [ " + "gejx_test" + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());
        }
    }
    @Override
    public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
        String indexId = new String(delete.getRow());
        try {
            ElasticSearchBulkOperator.addDeleteBuilderToBulk(ESClient.client.prepareDelete("gejx_test", "dmp_ods", indexId));
            LOG.info("**** postDelete success*****");
        } catch (Exception ex) {
            LOG.error(ex);
            LOG.error("observer delete  a doc, index [ " + "gejx_test" + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());

        }
    }
}

其餘方面的操做與上文操做相似,這裏再也不進行綴訴,直接看 Kibana 結果。

講在最後

上文中 HBase+ES 實現方案是在 HBase 和 ES 各自存放一份數據,使用協處理器達到數據一致性。這種方案存在數據冗餘問題,在 ES 這邊須要準備大量的存儲空間。

還有一種方案也是比較流行的。使用 ES 做爲二級索引的實現。使用協處理將須要查詢的表查詢字段與 RowKey 關係保存到 ES,查詢數據的時候,先根據條件查詢 ES 獲得 RowKey,經過獲得的 RowKey 查詢 HBase 數據。以提升查詢的效率。

Anyway,這兩種方案都須要解決歷史數據的問題和還有須要注意數據更新操做。

Q&A

  • 遇到 None of the configured nodes are available 錯誤信息?

    請檢查一下 ES 的 cluster.name 配置是否錯誤。

  • 爲何Hbase 2.0 Observer 未生效?

    HBase 2.0 中 observer 接口有變化。你須要實現 RegionCoprocessorgetRegionObserver 接口。

  • 發現已經更新包,協處理器仍是在執行歷史代碼?

    當更新包的時候,要進行包名的變動,不然,可能會出現相似於緩存的現象問題。

待確認

  • [ ] 未停用的狀況下,更新 jar(已測試未操做表的時候,支持更新)
  • [ ] 測試多張表公用同一個 jar

引文

使用Hbase協做器(Coprocessor)同步數據到ElasticSearch

面向高穩定,高性能之-Hbase數據實時同步到ElasticSearch(之二)

使用HBase Coprocessor

HBase 源碼

更多內容能夠關注微信公衆號,或者訪問AppZone網站

1460000005142479

相關文章
相關標籤/搜索