1.使用的maven客戶端包java
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-shaded-client</artifactId> <version>1.4.9</version> </dependency>
2.HbaseTemplate 模板類,基於canal修改node
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.regex.Pattern; /** * HBase操做模板 */ @Component public class HbaseTemplate implements InitializingBean { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${hbase.zookeeper.quorum}") private String quorum; @Value("${hbase.zookeeper.clientPort}") private String clientPort; @Value("${hbase.zookeeper.znodeParent}") private String znodeParent; @Value("${hadoop.home.dir}") private String hadoopHomeDir; @Value("${hadoop.home.isset}") private Boolean hadoopHomeIsset; private Connection conn; public boolean tableExists(String tableName) { try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) { return admin.tableExists(TableName.valueOf(tableName)); } catch (IOException e) { throw new RuntimeException(e); } } public boolean namespaceExists(String namespace){ boolean flag= true; try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) { NamespaceDescriptor namespaceDescriptor = admin.getNamespaceDescriptor(namespace); String name = namespaceDescriptor.getName(); if(name ==null){ flag = false; } }catch (NamespaceNotFoundException e){ flag = false; }catch (IOException e) { throw new RuntimeException(e); } return flag; } public void deleteNamespace(String namespace){ if(namespaceExists(namespace)){ try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) { Pattern compile = Pattern.compile(namespace + ":.*"); admin.disableTables(compile); admin.deleteTables(compile); admin.deleteNamespace(namespace); } catch (IOException e) { throw new RuntimeException(e); } } } public void createNamespace(String namespace){ try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) { admin.createNamespace(NamespaceDescriptor.create(namespace).build()); } catch (IOException e) { throw new RuntimeException(e); } } public void createTable(String tableName, String... familyNames) { try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) { HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); // 添加列簇 if (familyNames != null) { for (String familyName : familyNames) { HColumnDescriptor hcd = new HColumnDescriptor(familyName); desc.addFamily(hcd); } } admin.createTable(desc); } catch (IOException e) { throw new RuntimeException(e); } } public void disableTable(String tableName) { try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) { admin.disableTable(tableName); } catch (IOException e) { logger.error(e.getMessage(), e); throw new RuntimeException(e); } } public void deleteTable(String tableName) { try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) { if (admin.isTableEnabled(tableName)) { disableTable(tableName); } admin.deleteTable(tableName); } catch (IOException e) { logger.error(e.getMessage(), e); throw new RuntimeException(e); } } /** * 插入一行數據 * * @param tableName 表名 * @param hRow 行數據對象 * @return 是否成功 */ public Boolean put(String tableName, HRow hRow) { boolean flag = false; try { HTable table = (HTable) conn.getTable(TableName.valueOf(tableName)); Put put = new Put(hRow.getRowKey()); for (HRow.HCell hCell : hRow.getCells()) { put.addColumn(Bytes.toBytes(hCell.getFamily()), Bytes.toBytes(hCell.getQualifier()), hCell.getValue()); } table.put(put); flag = true; } catch (Exception e) { logger.error(e.getMessage(), e); } return flag; } /** * 批量插入 * * @param tableName 表名 * @param rows 行數據對象集合 * @return 是否成功 */ public Boolean puts(String tableName, List<HRow> rows) { boolean flag = false; try { HTable table = (HTable) conn.getTable(TableName.valueOf(tableName)); List<Put> puts = new ArrayList<>(); for (HRow hRow : rows) { Put put = new Put(hRow.getRowKey()); for (HRow.HCell hCell : hRow.getCells()) { put.addColumn(Bytes.toBytes(hCell.getFamily()), Bytes.toBytes(hCell.getQualifier()), hCell.getValue()); } puts.add(put); } if (!puts.isEmpty()) { table.put(puts); } flag = true; } catch (Exception e) { logger.error(e.getMessage(), e); } return flag; } /** * 批量刪除數據 * * @param tableName 表名 * @param rowKeys rowKey集合 * @return 是否成功 */ public Boolean deletes(String tableName, Set<byte[]> rowKeys) { boolean flag = false; try { HTable table = (HTable) conn.getTable(TableName.valueOf(tableName)); List<Delete> deletes = new ArrayList<>(); for (byte[] rowKey : rowKeys) { Delete delete = new Delete(rowKey); deletes.add(delete); } if (!deletes.isEmpty()) { table.delete(deletes); } flag = true; } catch (Exception e) { logger.error(e.getMessage(), e); } return flag; } @Override public void afterPropertiesSet() throws Exception { if(hadoopHomeIsset){ System.setProperty("hadoop.home.dir",hadoopHomeDir); } Configuration hbaseConfig = HBaseConfiguration.create(); hbaseConfig.set("hbase.zookeeper.quorum",quorum); hbaseConfig.set("hbase.zookeeper.property.clientPort",clientPort); hbaseConfig.set("zookeeper.znode.parent",znodeParent); conn = ConnectionFactory.createConnection(hbaseConfig); } }
import java.util.ArrayList; import java.util.List; /** * HBase操做對象類 */ public class HRow { private byte[] rowKey; private List<HCell> cells = new ArrayList<>(); public HRow(){ } public HRow(byte[] rowKey){ this.rowKey = rowKey; } public byte[] getRowKey() { return rowKey; } public void setRowKey(byte[] rowKey) { this.rowKey = rowKey; } public List<HCell> getCells() { return cells; } public void setCells(List<HCell> cells) { this.cells = cells; } public void addCell(String family, String qualifier, byte[] value) { HCell hCell = new HCell(family, qualifier, value); cells.add(hCell); } public static class HCell { private String family; private String qualifier; private byte[] value; public HCell(){ } public HCell(String family, String qualifier, byte[] value){ this.family = family; this.qualifier = qualifier; this.value = value; } public String getFamily() { return family; } public void setFamily(String family) { this.family = family; } public String getQualifier() { return qualifier; } public void setQualifier(String qualifier) { this.qualifier = qualifier; } public byte[] getValue() { return value; } public void setValue(byte[] value) { this.value = value; } } }
import org.apache.hadoop.hbase.util.Bytes; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * 2018/12/19 18:20 * jiacheng */ @Component public class TestHbaseTemplate implements InitializingBean { @Resource private HbaseTemplate hbaseTemplate; @Override public void afterPropertiesSet() throws Exception { System.out.println("++++++++++++++++++測試建立Hbase 表================="); String namespace="canal_test"; if(hbaseTemplate.namespaceExists(namespace)){ System.out.println("+++++++++ namespace exist , namespace="+namespace); hbaseTemplate.deleteNamespace(namespace); System.out.println("+++++++++ namespace delete successful , namespace="+namespace); } hbaseTemplate.createNamespace(namespace); System.out.println("+++++++++ namespace create successful , namespace="+namespace); String tableName = namespace+":test_jcc"; boolean tableExists = hbaseTemplate.tableExists(tableName); if(tableExists){ System.out.println("+++++++table is exist,name="+tableName); hbaseTemplate.deleteTable(tableName); System.out.println("+++++++table delete successful, name="+tableName); } String cf1 = "CF1"; String cf2 = "CF2"; hbaseTemplate.createTable(tableName, cf1, cf2); System.out.println("+++++++table create successful, name="+tableName); HRow row = new HRow(Bytes.toBytes("rowkey_"+System.currentTimeMillis())); row.addCell(cf1,"name",Bytes.toBytes("chenjia")); row.addCell(cf1,"age",Bytes.toBytes("25")); row.addCell(cf2,"name2",Bytes.toBytes("chenjia")); row.addCell(cf2,"age2",Bytes.toBytes("25")); hbaseTemplate.put(tableName,row); } }