HBase二級索引方案

HBase二級索引方案

[TOC]git

使用HBase Coprocessor方案

測試案例需求:在原表LJK_TEST上,將mycf:name做爲二級索引。github

第一步

建立一張索引表服務器

create 'INDEX_LJK_TEST','mycf'app

第二步

寫代碼ide

public class SecondIndexObserver extends BaseRegionObserver {

    private static final String INDEX_TABLE_NAME = "INDEX_LJK_TEST";

    private static final byte[] COLUMN_FAMILY = Bytes.toBytes("mycf");
    private static final byte[] COLUMN_NAME = Bytes.toBytes("name");
    private static final byte[] COLUMN_ID = Bytes.toBytes("id");

    private Configuration configuration = HBaseConfiguration.create();


    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {

        HTable hTable = new HTable(configuration, INDEX_TABLE_NAME);
        List<Cell> cells = put.get(COLUMN_FAMILY, COLUMN_NAME);

        for (Cell cell : cells) {
            Put indexPut = new Put(CellUtil.cloneValue(cell));
            indexPut.addColumn(COLUMN_FAMILY, COLUMN_ID, CellUtil.cloneRow(cell));
            hTable.put(indexPut);
        }
    }
}

第三步

將jar包上傳到HDFS,並給表LJK_TEST加上協處理器。oop

alter 'LJK_TEST','coprocessor'=>'/user/LJK/hbase.server.test-1.0-SNAPSHOT.jar|com.sunsharing.SecondIndexObserver||'測試

第四步

測試!往原表增長數據,看是否二級索引表符合預期結果。ui

能夠看到索引表對應增長了一條數據。this

hbase(main):004:0> put 'LJK_TEST','003','mycf:name','LJK3'
0 row(s) in 0.0930 seconds

hbase(main):006:0> scan 'INDEX_LJK_TEST'
ROW                       COLUMN+CELL                                                           
 LJK3                     column=mycf:id, timestamp=1562055903019, value=003                    
1 row(s) in 0.0110 seconds

使用Hadoop MapReduce創建二級索引

測試案例需求:在原表LJK_TEST上,將mycf:name做爲二級索引。spa

第一步

寫代碼

public class MrIndexBuilder {

    static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
        private String columnFamily;
        private String quality;
        private String indexTableName;

        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

            List<Cell> columnCells = value.getColumnCells(Bytes.toBytes(columnFamily), Bytes.toBytes(quality));
            for (Cell cell : columnCells) {
                byte[] indexRow = CellUtil.cloneValue(cell);
                Put put = new Put(indexRow);
                put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("id"), key.get());
                context.write(new ImmutableBytesWritable(Bytes.toBytes(indexTableName)), put);
            }
        }

        @Override
        protected void setup(Context context) {
            Configuration configuration = context.getConfiguration();

            this.columnFamily = configuration.get("cf");
            this.quality = configuration.get("qa");
            this.indexTableName = configuration.get("indexTalbeName");
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = HBaseConfiguration.create();

        if (args.length < 4) {
            throw new RuntimeException("參數傳入錯誤,須要4個參數,原表名,二級索引表名,原表的CF,原表做爲二級索引的字段名!");
        }

        String tableName = args[0];
        String indexTalbeName = args[1];
        String columnFamily = args[2];
        String indexQualify = args[3];

        conf.set("cf", columnFamily);
        conf.set("qa", indexQualify);
        conf.set("indexTalbeName", indexTalbeName);

        Job mrIndexBuilder = new Job(conf, "MrIndexBuilder");
        mrIndexBuilder.setJarByClass(MrIndexBuilder.class);
        mrIndexBuilder.setMapperClass(MyMapper.class);
        mrIndexBuilder.setInputFormatClass(TableInputFormat.class);
        mrIndexBuilder.setOutputFormatClass(MultiTableOutputFormat.class);
        mrIndexBuilder.setNumReduceTasks(0);

        Scan scan = new Scan();
        scan.setCaching(500);
        scan.setCacheBlocks(false);

        TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class, ImmutableBytesWritable.class,
            Put.class, mrIndexBuilder);
        boolean b = mrIndexBuilder.waitForCompletion(true);
        if (!b) {
            throw new IOException("任務報錯!");
        }
    }
}

第二步

打成jar包,放到hbase集羣環境的某一臺服務器上。執行命令

HADOOP_CLASSPATH=`hbase classpath` hadoop jar hbase.server.test-1.0-SNAPSHOT.jar com.sunsharing.MrIndexBuilder LJK_TEST INDEX_LJK_TEST  mycf name

第三步

驗證結果符合預期

hbase(main):021:0> scan 'INDEX_LJK_TEST'
ROW                               COLUMN+CELL                                                                                    
 LJK                              column=mycf:id, timestamp=1562657562219, value=002                                             
 LJK3                             column=mycf:id, timestamp=1562657562219, value=003                                             
 LJK4                             column=mycf:id, timestamp=1562657562219, value=004                                             
 LJK5                             column=mycf:id, timestamp=1562657562219, value=005                                             
 LJK6                             column=mycf:id, timestamp=1562657562219, value=006                                             
 LJK7                             column=mycf:id, timestamp=1562657562219, value=007                                             
 LJK8                             column=mycf:id, timestamp=1562657562219, value=008                                             
7 row(s) in 0.3670 seconds

Phoenix 二級索引方案

該方案最爲簡單,先創建一張映射到Phoenix的表,接着採用全局二級索引

CREATE TABLE LJK_TEST (ID VARCHAR NOT NULL PRIMARY KEY,"mycf"."name" VARCHAR)

CREATE INDEX COVER_LJKTEST_INDEX ON LJKTEST(name);

ES 二級索引方案

該方案基本能夠應付全部狀況,待補充。

附錄

Lily HBase 二級索引

相關文章
相關標籤/搜索