1.HBase讀寫的方式概況
主要分爲:html
- 純Java API讀寫HBase的方式;
- Spark讀寫HBase的方式;
- Flink讀寫HBase的方式;
- HBase經過Phoenix讀寫的方式;
第一種方式是HBase自身提供的比較原始的高效操做方式,而第2、第三則分別是Spark、Flink集成HBase的方式,最後一種是第三方插件Phoenix集成的JDBC方式,Phoenix集成的JDBC操做方式也能在Spark、Flink中調用。git
注意:github
這裏咱們使用HBase2.1.2版本,如下代碼都是基於該版本開發的。sql
2. 純Java API讀寫HBase
2.1 鏈接HBase
這裏咱們採用靜態方式鏈接HBase,不一樣於2.1.2以前的版本,無需建立HBase線程池,HBase2.1.2提供的代碼已經封裝好,只需建立調用便可:apache
/** * 聲明靜態配置 */ static Configuration conf = null; static Connection conn = null; static { conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "hadoop01,hadoop02,hadoop03"); conf.set("hbase.zookeeper.property.client", "2181"); try{ conn = ConnectionFactory.createConnection(conf); }catch (Exception e){ e.printStackTrace(); } }
2.2 建立HBase的表
建立HBase表,是經過Admin來執行的,表和列簇則是分別經過TableDescriptorBuilder和ColumnFamilyDescriptorBuilder來構建。api
/** * 建立只有一個列簇的表 * @throws Exception */ public static void createTable() throws Exception{ Admin admin = conn.getAdmin(); if (!admin.tableExists(TableName.valueOf("test"))){ TableName tableName = TableName.valueOf("test"); //表描述器構造器 TableDescriptorBuilder tdb = TableDescriptorBuilder.newBuilder(tableName); //列族描述器構造器 ColumnFamilyDescriptorBuilder cdb = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("user")); //得到列描述器 ColumnFamilyDescriptor cfd = cdb.build(); //添加列族 tdb.setColumnFamily(cfd); //得到表描述器 TableDescriptor td = tdb.build(); //建立表 admin.createTable(td); }else { System.out.println("表已存在"); } //關閉鏈接 }
2.3 HBase表添加數據
經過put api來添加數據oop
/** * 添加數據(多個rowKey,多個列族) * @throws Exception */ public static void insertMany() throws Exception{ Table table = conn.getTable(TableName.valueOf("test")); List<Put> puts = new ArrayList<Put>(); Put put1 = new Put(Bytes.toBytes("rowKey1")); put1.addColumn(Bytes.toBytes("user"), Bytes.toBytes("name"), Bytes.toBytes("wd")); Put put2 = new Put(Bytes.toBytes("rowKey2")); put2.addColumn(Bytes.toBytes("user"), Bytes.toBytes("age"), Bytes.toBytes("25")); Put put3 = new Put(Bytes.toBytes("rowKey3")); put3.addColumn(Bytes.toBytes("user"), Bytes.toBytes("weight"), Bytes.toBytes("60kg")); Put put4 = new Put(Bytes.toBytes("rowKey4")); put4.addColumn(Bytes.toBytes("user"), Bytes.toBytes("sex"), Bytes.toBytes("男")); puts.add(put1); puts.add(put2); puts.add(put3); puts.add(put4); table.put(puts); table.close(); }
2.4 刪除HBase的列簇或列
/** * 根據rowKey刪除一行數據、或者刪除某一行的某個列簇,或者某一行某個列簇某列 * @param tableName * @param rowKey * @throws Exception */ public static void deleteData(TableName tableName, String rowKey, String rowKey, String columnFamily, String columnName) throws Exception{ Table table = conn.getTable(tableName); Delete delete = new Delete(Bytes.toBytes(rowKey)); //①根據rowKey刪除一行數據 table.delete(delete); //②刪除某一行的某一個列簇內容 delete.addFamily(Bytes.toBytes(columnFamily)); //③刪除某一行某個列簇某列的值 delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName)); table.close(); }
2.5 更新HBase表的列
使用Put api直接替換掉便可ui
/** * 根據RowKey , 列簇, 列名修改值 * @param tableName * @param rowKey * @param columnFamily * @param columnName * @param columnValue * @throws Exception */ public static void updateData(TableName tableName, String rowKey, String columnFamily, String columnName, String columnValue) throws Exception{ Table table = conn.getTable(tableName); Put put1 = new Put(Bytes.toBytes(rowKey)); put1.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(columnValue)); table.put(put1); table.close(); }
2.6 HBase查詢
HBase查詢分爲get、scan、scan和filter結合。filter過濾器又分爲RowFilter(rowKey過濾器)、SingleColumnValueFilter(列值過濾器)、ColumnPrefixFilter(列名前綴過濾器)。url
/** * 根據rowKey查詢數據 * @param tableName * @param rowKey * @throws Exception */ public static void getResult(TableName tableName, String rowKey) throws Exception{ Table table = conn.getTable(tableName); //得到一行 Get get = new Get(Bytes.toBytes(rowKey)); Result set = table.get(get); Cell[] cells = set.rawCells(); for (Cell cell: cells){ System.out.println(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::" + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } table.close(); } //過濾器 LESS < LESS_OR_EQUAL <= EQUAL = NOT_EQUAL <> GREATER_OR_EQUAL >= GREATER > NO_OP 排除全部 /** * @param tableName * @throws Exception */ public static void scanTable(TableName tableName) throws Exception{ Table table = conn.getTable(tableName); //①全表掃描 Scan scan1 = new Scan(); ResultScanner rscan1 = table.getScanner(scan1); //②rowKey過濾器 Scan scan2 = new Scan(); //str$ 末尾匹配,至關於sql中的 %str ^str開頭匹配,至關於sql中的str% RowFilter filter = new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("Key1$")); scan2.setFilter(filter); ResultScanner rscan2 = table.getScanner(scan2); //③列值過濾器 Scan scan3 = new Scan(); //下列參數分別爲列族,列名,比較符號,值 SingleColumnValueFilter filter3 = new SingleColumnValueFilter(Bytes.toBytes("author"), Bytes.toBytes("name"), CompareOperator.EQUAL, Bytes.toBytes("spark")); scan3.setFilter(filter3); ResultScanner rscan3 = table.getScanner(scan3); //列名前綴過濾器 Scan scan4 = new Scan(); ColumnPrefixFilter filter4 = new ColumnPrefixFilter(Bytes.toBytes("name")); scan4.setFilter(filter4); ResultScanner rscan4 = table.getScanner(scan4); //過濾器集合 Scan scan5 = new Scan(); FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL); SingleColumnValueFilter filter51 = new SingleColumnValueFilter(Bytes.toBytes("author"), Bytes.toBytes("name"), CompareOperator.EQUAL, Bytes.toBytes("spark")); ColumnPrefixFilter filter52 = new ColumnPrefixFilter(Bytes.toBytes("name")); list.addFilter(filter51); list.addFilter(filter52); scan5.setFilter(list); ResultScanner rscan5 = table.getScanner(scan5); for (Result rs : rscan){ String rowKey = Bytes.toString(rs.getRow()); System.out.println("row key :" + rowKey); Cell[] cells = rs.rawCells(); for (Cell cell: cells){ System.out.println(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) + "::" + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::" + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } System.out.println("-------------------------------------------"); } }
3.總結
HBase鏈接的幾種方式(二)spark篇 查看Spark上讀寫HBasespa
HBase讀寫的幾種方式(三)flink篇 查看flink上讀寫HBase
github地址:
https://github.com/SwordfallYeung/HBaseDemo
參考資料:
https://hbase.apache.org/book.html