Hbase-client客戶端操做模板

1.使用的maven客戶端包java

<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-shaded-client</artifactId>
        <version>1.4.9</version>
</dependency>

2.HbaseTemplate 模板類,基於canal修改node

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;

/**
 * HBase操做模板
 */
@Component
public class HbaseTemplate implements InitializingBean {

    private Logger     logger = LoggerFactory.getLogger(this.getClass());

    @Value("${hbase.zookeeper.quorum}")
    private String quorum;
    @Value("${hbase.zookeeper.clientPort}")
    private String clientPort;
    @Value("${hbase.zookeeper.znodeParent}")
    private String znodeParent;

    @Value("${hadoop.home.dir}")
    private String hadoopHomeDir;
    @Value("${hadoop.home.isset}")
    private Boolean hadoopHomeIsset;

    private Connection conn;

    public boolean tableExists(String tableName) {
        try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) {

            return admin.tableExists(TableName.valueOf(tableName));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public boolean namespaceExists(String namespace){
        boolean flag= true;
        try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) {
            NamespaceDescriptor namespaceDescriptor = admin.getNamespaceDescriptor(namespace);
            String name = namespaceDescriptor.getName();
            if(name ==null){
              flag = false;
            }
        }catch (NamespaceNotFoundException e){
             flag = false;
        }catch (IOException e) {
            throw new RuntimeException(e);
        }
        return flag;
    }

    public void deleteNamespace(String namespace){
        if(namespaceExists(namespace)){
            try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) {
                Pattern compile = Pattern.compile(namespace + ":.*");
                admin.disableTables(compile);
                admin.deleteTables(compile);
                admin.deleteNamespace(namespace);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
    public void createNamespace(String namespace){
        try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) {
                admin.createNamespace(NamespaceDescriptor.create(namespace).build());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void createTable(String tableName, String... familyNames) {
        try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) {
            HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
            // 添加列簇
            if (familyNames != null) {
                for (String familyName : familyNames) {
                    HColumnDescriptor hcd = new HColumnDescriptor(familyName);
                    desc.addFamily(hcd);
                }
            }
            admin.createTable(desc);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    public void disableTable(String tableName) {
        try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) {
            admin.disableTable(tableName);
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
            throw new RuntimeException(e);
        }
    }

    public void deleteTable(String tableName) {
        try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) {
            if (admin.isTableEnabled(tableName)) {
                disableTable(tableName);
            }
            admin.deleteTable(tableName);
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
            throw new RuntimeException(e);
        }
    }

    /**
     * 插入一行數據
     * 
     * @param tableName 表名
     * @param hRow 行數據對象
     * @return 是否成功
     */
    public Boolean put(String tableName, HRow hRow) {
        boolean flag = false;
        try {
            HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
            Put put = new Put(hRow.getRowKey());
            for (HRow.HCell hCell : hRow.getCells()) {
                put.addColumn(Bytes.toBytes(hCell.getFamily()), Bytes.toBytes(hCell.getQualifier()), hCell.getValue());
            }
            table.put(put);
            flag = true;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return flag;

    }

    /**
     * 批量插入
     * 
     * @param tableName 表名
     * @param rows 行數據對象集合
     * @return 是否成功
     */
    public Boolean puts(String tableName, List<HRow> rows) {
        boolean flag = false;
        try {
            HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
            List<Put> puts = new ArrayList<>();
            for (HRow hRow : rows) {
                Put put = new Put(hRow.getRowKey());
                for (HRow.HCell hCell : hRow.getCells()) {
                    put.addColumn(Bytes.toBytes(hCell.getFamily()),
                        Bytes.toBytes(hCell.getQualifier()),
                        hCell.getValue());
                }
                puts.add(put);
            }
            if (!puts.isEmpty()) {
                table.put(puts);
            }
            flag = true;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return flag;
    }

    /**
     * 批量刪除數據
     * 
     * @param tableName 表名
     * @param rowKeys rowKey集合
     * @return 是否成功
     */
    public Boolean deletes(String tableName, Set<byte[]> rowKeys) {
        boolean flag = false;
        try {
            HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
            List<Delete> deletes = new ArrayList<>();
            for (byte[] rowKey : rowKeys) {
                Delete delete = new Delete(rowKey);
                deletes.add(delete);
            }
            if (!deletes.isEmpty()) {
                table.delete(deletes);
            }
            flag = true;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return flag;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        if(hadoopHomeIsset){
            System.setProperty("hadoop.home.dir",hadoopHomeDir);
        }
        Configuration hbaseConfig = HBaseConfiguration.create();
        hbaseConfig.set("hbase.zookeeper.quorum",quorum);
        hbaseConfig.set("hbase.zookeeper.property.clientPort",clientPort);
        hbaseConfig.set("zookeeper.znode.parent",znodeParent);
        conn = ConnectionFactory.createConnection(hbaseConfig);
    }
}
import java.util.ArrayList;
import java.util.List;

/**
 * HBase操做對象類
 */
public class HRow {

    private byte[]      rowKey;
    private List<HCell> cells = new ArrayList<>();

    public HRow(){
    }

    public HRow(byte[] rowKey){
        this.rowKey = rowKey;
    }

    public byte[] getRowKey() {
        return rowKey;
    }

    public void setRowKey(byte[] rowKey) {
        this.rowKey = rowKey;
    }

    public List<HCell> getCells() {
        return cells;
    }

    public void setCells(List<HCell> cells) {
        this.cells = cells;
    }

    public void addCell(String family, String qualifier, byte[] value) {
        HCell hCell = new HCell(family, qualifier, value);
        cells.add(hCell);
    }

    public static class HCell {

        private String family;
        private String qualifier;
        private byte[] value;

        public HCell(){
        }

        public HCell(String family, String qualifier, byte[] value){
            this.family = family;
            this.qualifier = qualifier;
            this.value = value;
        }

        public String getFamily() {
            return family;
        }

        public void setFamily(String family) {
            this.family = family;
        }

        public String getQualifier() {
            return qualifier;
        }

        public void setQualifier(String qualifier) {
            this.qualifier = qualifier;
        }

        public byte[] getValue() {
            return value;
        }

        public void setValue(byte[] value) {
            this.value = value;
        }
    }
}
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 2018/12/19 18:20
 * jiacheng
 */
@Component
public class TestHbaseTemplate implements InitializingBean {

    @Resource
    private HbaseTemplate hbaseTemplate;

    @Override
    public void afterPropertiesSet() throws Exception {
        System.out.println("++++++++++++++++++測試建立Hbase 表=================");
        String namespace="canal_test";
        if(hbaseTemplate.namespaceExists(namespace)){
            System.out.println("+++++++++ namespace exist , namespace="+namespace);
            hbaseTemplate.deleteNamespace(namespace);
            System.out.println("+++++++++ namespace delete successful , namespace="+namespace);
        }
        hbaseTemplate.createNamespace(namespace);
        System.out.println("+++++++++ namespace create successful , namespace="+namespace);
        String tableName = namespace+":test_jcc";
        boolean tableExists = hbaseTemplate.tableExists(tableName);
        if(tableExists){
            System.out.println("+++++++table is exist,name="+tableName);
            hbaseTemplate.deleteTable(tableName);
            System.out.println("+++++++table  delete successful, name="+tableName);
        }
        String cf1 = "CF1";
        String cf2 = "CF2";
        hbaseTemplate.createTable(tableName, cf1, cf2);
        System.out.println("+++++++table create successful, name="+tableName);
        HRow row = new HRow(Bytes.toBytes("rowkey_"+System.currentTimeMillis()));
        row.addCell(cf1,"name",Bytes.toBytes("chenjia"));
        row.addCell(cf1,"age",Bytes.toBytes("25"));
        row.addCell(cf2,"name2",Bytes.toBytes("chenjia"));
        row.addCell(cf2,"age2",Bytes.toBytes("25"));
        hbaseTemplate.put(tableName,row);
    }
}
相關文章
相關標籤/搜索