截至到目前 (2019.04),HBase 有兩個主要的版本,分別是 1.x 和 2.x ,兩個版本的 Java API 有所不一樣,1.x 中某些方法在 2.x 中被標識爲 @deprecated
過期。因此下面關於 API 的樣例,我會分別給出 1.x 和 2.x 兩個版本。完整的代碼見本倉庫:html
同時你使用的客戶端的版本必須與服務端版本保持一致,若是用 2.x 版本的客戶端代碼去鏈接 1.x 版本的服務端,會拋出 NoSuchColumnFamilyException
等異常。github
要使用 Java API 操做 HBase,須要引入 hbase-client
。這裏選取的 HBase Client
的版本爲 1.2.0
。web
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0</version> </dependency>
public class HBaseUtils { private static Connection connection; static { Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.property.clientPort", "2181"); // 若是是集羣 則主機名用逗號分隔 configuration.set("hbase.zookeeper.quorum", "hadoop001"); try { connection = ConnectionFactory.createConnection(configuration); } catch (IOException e) { e.printStackTrace(); } } /** * 建立 HBase 表 * * @param tableName 表名 * @param columnFamilies 列族的數組 */ public static boolean createTable(String tableName, List<String> columnFamilies) { try { HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); if (admin.tableExists(tableName)) { return false; } HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); columnFamilies.forEach(columnFamily -> { HColumnDescriptor columnDescriptor = new HColumnDescriptor(columnFamily); columnDescriptor.setMaxVersions(1); tableDescriptor.addFamily(columnDescriptor); }); admin.createTable(tableDescriptor); } catch (IOException e) { e.printStackTrace(); } return true; } /** * 刪除 hBase 表 * * @param tableName 表名 */ public static boolean deleteTable(String tableName) { try { HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); // 刪除表前須要先禁用表 admin.disableTable(tableName); admin.deleteTable(tableName); } catch (Exception e) { e.printStackTrace(); } return true; } /** * 插入數據 * * @param tableName 表名 * @param rowKey 惟一標識 * @param columnFamilyName 列族名 * @param qualifier 列標識 * @param value 數據 */ public static boolean putRow(String tableName, String rowKey, String columnFamilyName, String qualifier, String value) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value)); table.put(put); table.close(); } catch (IOException e) { e.printStackTrace(); } return true; } /** * 插入數據 * * @param tableName 表名 * @param rowKey 惟一標識 * @param columnFamilyName 列族名 * @param pairList 列標識和值的集合 */ public static boolean putRow(String tableName, String rowKey, String columnFamilyName, List<Pair<String, String>> pairList) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); pairList.forEach(pair -> put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(pair.getKey()), Bytes.toBytes(pair.getValue()))); table.put(put); table.close(); } catch (IOException e) { e.printStackTrace(); } return true; } /** * 根據 rowKey 獲取指定行的數據 * * @param tableName 表名 * @param rowKey 惟一標識 */ public static Result getRow(String tableName, String rowKey) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); return table.get(get); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 獲取指定行指定列 (cell) 的最新版本的數據 * * @param tableName 表名 * @param rowKey 惟一標識 * @param columnFamily 列族 * @param qualifier 列標識 */ public static String getCell(String tableName, String rowKey, String columnFamily, String qualifier) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); if (!get.isCheckExistenceOnly()) { get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); Result result = table.get(get); byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); return Bytes.toString(resultValue); } else { return null; } } catch (IOException e) { e.printStackTrace(); } return null; } /** * 檢索全表 * * @param tableName 表名 */ public static ResultScanner getScanner(String tableName) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); return table.getScanner(scan); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 檢索表中指定數據 * * @param tableName 表名 * @param filterList 過濾器 */ public static ResultScanner getScanner(String tableName, FilterList filterList) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); scan.setFilter(filterList); return table.getScanner(scan); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 檢索表中指定數據 * * @param tableName 表名 * @param startRowKey 起始 RowKey * @param endRowKey 終止 RowKey * @param filterList 過濾器 */ public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey, FilterList filterList) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); scan.setStartRow(Bytes.toBytes(startRowKey)); scan.setStopRow(Bytes.toBytes(endRowKey)); scan.setFilter(filterList); return table.getScanner(scan); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 刪除指定行記錄 * * @param tableName 表名 * @param rowKey 惟一標識 */ public static boolean deleteRow(String tableName, String rowKey) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Delete delete = new Delete(Bytes.toBytes(rowKey)); table.delete(delete); } catch (IOException e) { e.printStackTrace(); } return true; } /** * 刪除指定行的指定列 * * @param tableName 表名 * @param rowKey 惟一標識 * @param familyName 列族 * @param qualifier 列標識 */ public static boolean deleteColumn(String tableName, String rowKey, String familyName, String qualifier) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Delete delete = new Delete(Bytes.toBytes(rowKey)); delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier)); table.delete(delete); table.close(); } catch (IOException e) { e.printStackTrace(); } return true; } }
以單元測試的方式對上面封裝的 API 進行測試。apache
public class HBaseUtilsTest { private static final String TABLE_NAME = "class"; private static final String TEACHER = "teacher"; private static final String STUDENT = "student"; @Test public void createTable() { // 新建表 List<String> columnFamilies = Arrays.asList(TEACHER, STUDENT); boolean table = HBaseUtils.createTable(TABLE_NAME, columnFamilies); System.out.println("表建立結果:" + table); } @Test public void insertData() { List<Pair<String, String>> pairs1 = Arrays.asList(new Pair<>("name", "Tom"), new Pair<>("age", "22"), new Pair<>("gender", "1")); HBaseUtils.putRow(TABLE_NAME, "rowKey1", STUDENT, pairs1); List<Pair<String, String>> pairs2 = Arrays.asList(new Pair<>("name", "Jack"), new Pair<>("age", "33"), new Pair<>("gender", "2")); HBaseUtils.putRow(TABLE_NAME, "rowKey2", STUDENT, pairs2); List<Pair<String, String>> pairs3 = Arrays.asList(new Pair<>("name", "Mike"), new Pair<>("age", "44"), new Pair<>("gender", "1")); HBaseUtils.putRow(TABLE_NAME, "rowKey3", STUDENT, pairs3); } @Test public void getRow() { Result result = HBaseUtils.getRow(TABLE_NAME, "rowKey1"); if (result != null) { System.out.println(Bytes .toString(result.getValue(Bytes.toBytes(STUDENT), Bytes.toBytes("name")))); } } @Test public void getCell() { String cell = HBaseUtils.getCell(TABLE_NAME, "rowKey2", STUDENT, "age"); System.out.println("cell age :" + cell); } @Test public void getScanner() { ResultScanner scanner = HBaseUtils.getScanner(TABLE_NAME); if (scanner != null) { scanner.forEach(result -> System.out.println(Bytes.toString(result.getRow()) + "->" + Bytes .toString(result.getValue(Bytes.toBytes(STUDENT), Bytes.toBytes("name"))))); scanner.close(); } } @Test public void getScannerWithFilter() { FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); SingleColumnValueFilter nameFilter = new SingleColumnValueFilter(Bytes.toBytes(STUDENT), Bytes.toBytes("name"), CompareOperator.EQUAL, Bytes.toBytes("Jack")); filterList.addFilter(nameFilter); ResultScanner scanner = HBaseUtils.getScanner(TABLE_NAME, filterList); if (scanner != null) { scanner.forEach(result -> System.out.println(Bytes.toString(result.getRow()) + "->" + Bytes .toString(result.getValue(Bytes.toBytes(STUDENT), Bytes.toBytes("name"))))); scanner.close(); } } @Test public void deleteColumn() { boolean b = HBaseUtils.deleteColumn(TABLE_NAME, "rowKey2", STUDENT, "age"); System.out.println("刪除結果: " + b); } @Test public void deleteRow() { boolean b = HBaseUtils.deleteRow(TABLE_NAME, "rowKey2"); System.out.println("刪除結果: " + b); } @Test public void deleteTable() { boolean b = HBaseUtils.deleteTable(TABLE_NAME); System.out.println("刪除結果: " + b); } }
這裏選取的 HBase Client
的版本爲最新的 2.1.4
。api
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.1.4</version> </dependency>
2.x 版本相比於 1.x 廢棄了一部分方法,關於廢棄的方法在源碼中都會指明新的替代方法,好比,在 2.x 中建立表時:HTableDescriptor
和 HColumnDescriptor
等類都標識爲廢棄,取而代之的是使用 TableDescriptorBuilder
和 ColumnFamilyDescriptorBuilder
來定義表和列族。數組
如下爲 HBase 2.x 版本 Java API 的使用示例:安全
public class HBaseUtils { private static Connection connection; static { Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.property.clientPort", "2181"); // 若是是集羣 則主機名用逗號分隔 configuration.set("hbase.zookeeper.quorum", "hadoop001"); try { connection = ConnectionFactory.createConnection(configuration); } catch (IOException e) { e.printStackTrace(); } } /** * 建立 HBase 表 * * @param tableName 表名 * @param columnFamilies 列族的數組 */ public static boolean createTable(String tableName, List<String> columnFamilies) { try { HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); if (admin.tableExists(TableName.valueOf(tableName))) { return false; } TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); columnFamilies.forEach(columnFamily -> { ColumnFamilyDescriptorBuilder cfDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily)); cfDescriptorBuilder.setMaxVersions(1); ColumnFamilyDescriptor familyDescriptor = cfDescriptorBuilder.build(); tableDescriptor.setColumnFamily(familyDescriptor); }); admin.createTable(tableDescriptor.build()); } catch (IOException e) { e.printStackTrace(); } return true; } /** * 刪除 hBase 表 * * @param tableName 表名 */ public static boolean deleteTable(String tableName) { try { HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); // 刪除表前須要先禁用表 admin.disableTable(TableName.valueOf(tableName)); admin.deleteTable(TableName.valueOf(tableName)); } catch (Exception e) { e.printStackTrace(); } return true; } /** * 插入數據 * * @param tableName 表名 * @param rowKey 惟一標識 * @param columnFamilyName 列族名 * @param qualifier 列標識 * @param value 數據 */ public static boolean putRow(String tableName, String rowKey, String columnFamilyName, String qualifier, String value) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value)); table.put(put); table.close(); } catch (IOException e) { e.printStackTrace(); } return true; } /** * 插入數據 * * @param tableName 表名 * @param rowKey 惟一標識 * @param columnFamilyName 列族名 * @param pairList 列標識和值的集合 */ public static boolean putRow(String tableName, String rowKey, String columnFamilyName, List<Pair<String, String>> pairList) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); pairList.forEach(pair -> put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(pair.getKey()), Bytes.toBytes(pair.getValue()))); table.put(put); table.close(); } catch (IOException e) { e.printStackTrace(); } return true; } /** * 根據 rowKey 獲取指定行的數據 * * @param tableName 表名 * @param rowKey 惟一標識 */ public static Result getRow(String tableName, String rowKey) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); return table.get(get); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 獲取指定行指定列 (cell) 的最新版本的數據 * * @param tableName 表名 * @param rowKey 惟一標識 * @param columnFamily 列族 * @param qualifier 列標識 */ public static String getCell(String tableName, String rowKey, String columnFamily, String qualifier) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); if (!get.isCheckExistenceOnly()) { get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); Result result = table.get(get); byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); return Bytes.toString(resultValue); } else { return null; } } catch (IOException e) { e.printStackTrace(); } return null; } /** * 檢索全表 * * @param tableName 表名 */ public static ResultScanner getScanner(String tableName) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); return table.getScanner(scan); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 檢索表中指定數據 * * @param tableName 表名 * @param filterList 過濾器 */ public static ResultScanner getScanner(String tableName, FilterList filterList) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); scan.setFilter(filterList); return table.getScanner(scan); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 檢索表中指定數據 * * @param tableName 表名 * @param startRowKey 起始 RowKey * @param endRowKey 終止 RowKey * @param filterList 過濾器 */ public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey, FilterList filterList) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); scan.withStartRow(Bytes.toBytes(startRowKey)); scan.withStopRow(Bytes.toBytes(endRowKey)); scan.setFilter(filterList); return table.getScanner(scan); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 刪除指定行記錄 * * @param tableName 表名 * @param rowKey 惟一標識 */ public static boolean deleteRow(String tableName, String rowKey) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Delete delete = new Delete(Bytes.toBytes(rowKey)); table.delete(delete); } catch (IOException e) { e.printStackTrace(); } return true; } /** * 刪除指定行指定列 * * @param tableName 表名 * @param rowKey 惟一標識 * @param familyName 列族 * @param qualifier 列標識 */ public static boolean deleteColumn(String tableName, String rowKey, String familyName, String qualifier) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Delete delete = new Delete(Bytes.toBytes(rowKey)); delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier)); table.delete(delete); table.close(); } catch (IOException e) { e.printStackTrace(); } return true; } }
在上面的代碼中,在類加載時就初始化了 Connection 鏈接,而且以後的方法都是複用這個 Connection,這時咱們可能會考慮是否能夠使用自定義鏈接池來獲取更好的性能表現?實際上這是沒有必要的。服務器
首先官方對於 Connection
的使用說明以下:
Connection Pooling For applications which require high-end multithreaded access (e.g., web-servers or application servers that may serve many application threads in a single JVM), you can pre-create a Connection, as shown in the following example: 對於高併發多線程訪問的應用程序(例如,在單個 JVM 中存在的爲多個線程服務的 Web 服務器或應用程序服務器), 您只須要預先建立一個 Connection。例子以下: // Create a connection to the cluster. Configuration conf = HBaseConfiguration.create(); try (Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf(tablename))) { // use table as needed, the table returned is lightweight }
之因此能這樣使用,這是由於 Connection 並非一個簡單的 socket 鏈接,接口文檔 中對 Connection 的表述是:
A cluster connection encapsulating lower level individual connections to actual servers and a connection to zookeeper. Connections are instantiated through the ConnectionFactory class. The lifecycle of the connection is managed by the caller, who has to close() the connection to release the resources. Connection 是一個集羣鏈接,封裝了與多臺服務器(Matser/Region Server)的底層鏈接以及與 zookeeper 的鏈接。 鏈接經過 ConnectionFactory 類實例化。鏈接的生命週期由調用者管理,調用者必須使用 close() 關閉鏈接以釋放資源。
之因此封裝這些鏈接,是由於 HBase 客戶端須要鏈接三個不一樣的服務角色:
meta
表的位置信息,Master 的信息;Connection 對象和實際的 Socket 鏈接之間的對應關係以下圖:
上面兩張圖片引用自博客:鏈接 HBase 的正確姿式
在 HBase 客戶端代碼中,真正對應 Socket 鏈接的是 RpcConnection
對象。HBase 使用 PoolMap
這種數據結構來存儲客戶端到 HBase 服務器之間的鏈接。PoolMap
的內部有一個 ConcurrentHashMap
實例,其 key 是 ConnectionId
(封裝了服務器地址和用戶 ticket),value 是一個 RpcConnection
對象的資源池。當 HBase 須要鏈接一個服務器時,首先會根據 ConnectionId
找到對應的鏈接池,而後從鏈接池中取出一個鏈接對象。
@InterfaceAudience.Private public class PoolMap<K, V> implements Map<K, V> { private PoolType poolType; private int poolMaxSize; private Map<K, Pool<V>> pools = new ConcurrentHashMap<>(); public PoolMap(PoolType poolType) { this.poolType = poolType; } .....
HBase 中提供了三種資源池的實現,分別是 Reusable
,RoundRobin
和 ThreadLocal
。具體實現能夠通 hbase.client.ipc.pool.type
配置項指定,默認爲 Reusable
。鏈接池的大小也能夠經過 hbase.client.ipc.pool.size
配置項指定,默認爲 1,即每一個 Server 1 個鏈接。也能夠經過修改配置實現:
config.set("hbase.client.ipc.pool.type",...); config.set("hbase.client.ipc.pool.size",...); connection = ConnectionFactory.createConnection(config);
由此能夠看出 HBase 中 Connection 類已經實現了對鏈接的管理功能,因此咱們沒必要在 Connection 上在作額外的管理。
另外,Connection 是線程安全的,但 Table 和 Admin 卻不是線程安全的,所以正確的作法是一個進程共用一個 Connection 對象,而在不一樣的線程中使用單獨的 Table 和 Admin 對象。Table 和 Admin 的獲取操做 getTable()
和 getAdmin()
都是輕量級,因此沒必要擔憂性能的消耗,同時建議在使用完成後顯示的調用 close()
方法來關閉它們。
更多大數據系列文章能夠參見 GitHub 開源項目: 大數據入門指南