[TOC]git
測試案例需求:在原表LJK_TEST上,將mycf:name做爲二級索引。github
建立一張索引表服務器
create 'INDEX_LJK_TEST','mycf'
app
寫代碼ide
public class SecondIndexObserver extends BaseRegionObserver { private static final String INDEX_TABLE_NAME = "INDEX_LJK_TEST"; private static final byte[] COLUMN_FAMILY = Bytes.toBytes("mycf"); private static final byte[] COLUMN_NAME = Bytes.toBytes("name"); private static final byte[] COLUMN_ID = Bytes.toBytes("id"); private Configuration configuration = HBaseConfiguration.create(); @Override public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { HTable hTable = new HTable(configuration, INDEX_TABLE_NAME); List<Cell> cells = put.get(COLUMN_FAMILY, COLUMN_NAME); for (Cell cell : cells) { Put indexPut = new Put(CellUtil.cloneValue(cell)); indexPut.addColumn(COLUMN_FAMILY, COLUMN_ID, CellUtil.cloneRow(cell)); hTable.put(indexPut); } } }
將jar包上傳到HDFS,並給表LJK_TEST加上協處理器。oop
alter 'LJK_TEST','coprocessor'=>'/user/LJK/hbase.server.test-1.0-SNAPSHOT.jar|com.sunsharing.SecondIndexObserver||'
測試
測試!往原表增長數據,看是否二級索引表符合預期結果。ui
能夠看到索引表對應增長了一條數據。this
hbase(main):004:0> put 'LJK_TEST','003','mycf:name','LJK3' 0 row(s) in 0.0930 seconds hbase(main):006:0> scan 'INDEX_LJK_TEST' ROW COLUMN+CELL LJK3 column=mycf:id, timestamp=1562055903019, value=003 1 row(s) in 0.0110 seconds
測試案例需求:在原表LJK_TEST上,將mycf:name做爲二級索引。spa
寫代碼
public class MrIndexBuilder { static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> { private String columnFamily; private String quality; private String indexTableName; @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { List<Cell> columnCells = value.getColumnCells(Bytes.toBytes(columnFamily), Bytes.toBytes(quality)); for (Cell cell : columnCells) { byte[] indexRow = CellUtil.cloneValue(cell); Put put = new Put(indexRow); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("id"), key.get()); context.write(new ImmutableBytesWritable(Bytes.toBytes(indexTableName)), put); } } @Override protected void setup(Context context) { Configuration configuration = context.getConfiguration(); this.columnFamily = configuration.get("cf"); this.quality = configuration.get("qa"); this.indexTableName = configuration.get("indexTalbeName"); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = HBaseConfiguration.create(); if (args.length < 4) { throw new RuntimeException("參數傳入錯誤,須要4個參數,原表名,二級索引表名,原表的CF,原表做爲二級索引的字段名!"); } String tableName = args[0]; String indexTalbeName = args[1]; String columnFamily = args[2]; String indexQualify = args[3]; conf.set("cf", columnFamily); conf.set("qa", indexQualify); conf.set("indexTalbeName", indexTalbeName); Job mrIndexBuilder = new Job(conf, "MrIndexBuilder"); mrIndexBuilder.setJarByClass(MrIndexBuilder.class); mrIndexBuilder.setMapperClass(MyMapper.class); mrIndexBuilder.setInputFormatClass(TableInputFormat.class); mrIndexBuilder.setOutputFormatClass(MultiTableOutputFormat.class); mrIndexBuilder.setNumReduceTasks(0); Scan scan = new Scan(); scan.setCaching(500); scan.setCacheBlocks(false); TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class, ImmutableBytesWritable.class, Put.class, mrIndexBuilder); boolean b = mrIndexBuilder.waitForCompletion(true); if (!b) { throw new IOException("任務報錯!"); } } }
打成jar包,放到hbase集羣環境的某一臺服務器上。執行命令
HADOOP_CLASSPATH=`hbase classpath` hadoop jar hbase.server.test-1.0-SNAPSHOT.jar com.sunsharing.MrIndexBuilder LJK_TEST INDEX_LJK_TEST mycf name
驗證結果符合預期
hbase(main):021:0> scan 'INDEX_LJK_TEST' ROW COLUMN+CELL LJK column=mycf:id, timestamp=1562657562219, value=002 LJK3 column=mycf:id, timestamp=1562657562219, value=003 LJK4 column=mycf:id, timestamp=1562657562219, value=004 LJK5 column=mycf:id, timestamp=1562657562219, value=005 LJK6 column=mycf:id, timestamp=1562657562219, value=006 LJK7 column=mycf:id, timestamp=1562657562219, value=007 LJK8 column=mycf:id, timestamp=1562657562219, value=008 7 row(s) in 0.3670 seconds
該方案最爲簡單,先創建一張映射到Phoenix的表,接着採用全局二級索引
CREATE TABLE LJK_TEST (ID VARCHAR NOT NULL PRIMARY KEY,"mycf"."name" VARCHAR)
CREATE INDEX COVER_LJKTEST_INDEX ON LJKTEST(name);
該方案基本能夠應付全部狀況,待補充。