HBase海量數據存儲

1.簡介

 

 

 

HBase是一個基於HDFS的、分佈式的、面向列的非關係型數據庫。算法

 

HBase的特色shell

1.海量數據存儲,HBase表中的數據可以容納上百億行*上百萬列。數據庫

2.面向列的存儲,數據在表中是按照列進行存儲的,可以動態的增長列並對列進行各類操做。apache

3.準實時查詢,HBase在海量的數據量下可以接近準實時的查詢(百毫秒之內)數組

4.多版本,HBase中每一列的數據均可以有多個版本。緩存

5.可靠性,HBase中的數據存儲於HDFS中且依賴於Zookeeper進行Master和RegionServer的協調管理。 服務器

 

HBase與關係型數據庫的區別負載均衡

1.HBase中的數據類型只有String,而關係型數據庫中有char、varchar、int等。分佈式

2.HBase中只有普通的增刪改查操做,沒有表與表之間的鏈接、子查詢等,若想要在HBase中進行復雜的操做則應該使用Phoenix。工具

3.HBase是基於列進行存儲的,所以在查詢指定列的數據時效率會很高,而關係型數據庫是基於行存儲,每次查詢都要查詢整行。

4.HBase適合海量數據存儲,而關係型數據庫通常一張表不超過500M,不然就要考慮分表操做。

5.HBase中爲空的列不佔用存儲空間,表的設計能夠很是稀疏,而關係型數據庫中表的設計較謹密。

6.HBase不支持事務,而非關係型數據庫支持事務。 

7.HBase區分大小寫,而SQL不區分大小寫。

 

2.HBase的表結構

 

 

 

*HBase中的表由RowKey、ColumnFamily、Column、Timestamp組成。

 

RowKey

記錄的惟一標識,至關於關係型數據庫中的主鍵。

*RowKey最大長度爲64KB且按字典順序進行排序存儲。

*HBase會自動爲RowKey加上索引,當按RowKey查詢時速度很快。

 

ColumnFamily

列簇至關於特定的一個類別,每一個列簇下能夠有任意數量個列,而且列是動態進行添加的,只在插入數據後存在,HBase在建立表時只須要指定表名和列簇便可。

*一個列簇下的成員有着相同的前綴,使用冒號來對列簇和列名進行分隔。

*一張表中的列簇最好不超過5個。

 

Column

列只有在插入數據後才存在,且列在列簇中是有序的。

*每一個列簇下的列數沒有限制。

 

Timestamp

HBase中的每一個鍵值對都有一個時間戳,在進行插入時由HBase進行自動賦值。

 

 

3.HBase的物理模型

 

 

 

 

 

 

Master

1.處理對錶的添加、刪除、查詢等操做。

2.進行RegionServer的負載均衡(Region與RegionServer的分配)

3.在RegionServer宕機後負責RegionServer上的Region轉移(經過WAL日誌)

*Master失效僅會致使meta數據和表沒法被修改,表中的數據仍然能夠進行讀取和寫入。

 

RegionServer

1.處理對錶中數據的添加、刪除、修改、查詢等操做。

2.維護Region並將Region中StoreFile寫入到HDFS中。

3.當Region中的數據達到必定大小時進行Region的切分。

 

Region

1.表中的數據存儲在Region中,每一個Region都由RegionServer進行管理。

2.每一個Region都包含MemoryStore和StoreFile,MemoryStore中的數據位於內存,每當MemoryStore中的數據達到128M時將會生成一個StoreFile並寫入到HDFS中。

3.Region中每一個列簇對應一個MemoryStore,能夠有多個StoreFile,當StoreFile的數量超過必定時,會進行StoreFile的合併,將多個StoreFile文件合併成一個StoreFile,當StoreFile文件的大小超過必定閥值時,會進行Region的切分,由Master將新Region分配到相應的RegionServer中,實現負載均衡。

  

Zookeeper在HBase中的做用

1.保證Master的高可用性,當狀態爲Active的Master沒法提供服務時,會馬上將狀態爲StandBy的Master切換爲Active狀態。

2.實時監控RegionServer集羣,當某個RegionServer節點沒法提供服務時將會通知Master,由Master進行RegionServer上的Region轉移以及從新進行負載均衡。

3.當HBase集羣啓動後,Master和RegionServer會分別向Zookeeper進行註冊,會在Zookeeper中存放HBase的meta表數據,Region與RegionServer的關係、以及RegionServer的訪問地址等信息。

*meta表中維護着TableName、RowKey和Region的關聯關係。

 

HBase處理讀取和寫入請求的流程

HBase處理讀取請求的過程

1.客戶端鏈接Zookeeper,根據TableName和RowKey從Meta表中計算出該Row對應的Region。

2.獲取該Region所關聯的RegionServer,並獲取RegionServer的訪問地址。

3.訪問RegionServer,找到對應的Region。

4.若是Region的MemoryStore中有該Row則直接進行獲取,不然從StoreFile中進行查詢。

 

HBase處理寫入請求的過程

1.客戶端鏈接Zookeeper,根據TableName找到其Region列表。

2.經過必定算法計算出要寫入的Region。

3.獲取該Region所關聯的RegionServer並進行鏈接。

4.把數據分別寫到HLog和MemoryStore中。

5.每當MemoryStore中的大小達到128M時,會生成一個StoreFile。

6.當StoreFile的數量超過必定時,會進行StoreFile的合併,將多個StoreFile文件合併成一個StoreFile,當StoreFile的文件大小超過必定閾值時,會進行Region的切分,由Master將新Region分配到相應的RegionServer中,實現負載均衡。

 

*在第一次讀取或寫入時才須要鏈接Zookeeper,會將Zookeeper中的相關數據緩存到本地,日後直接從本地進行讀取,當Zookeeper中的信息發生變化時,再經過通知機制通知客戶端進行更新。

 

 

HBase在HDFS中的目錄

1.tmp目錄:當對HBase的表進行建立和刪除時,會將表移動到該目錄中進行操做。

2.MasterProcWALs目錄:預寫日誌目錄,主要用於存儲Master的操做日誌。

3.WALs目錄:預寫日誌目錄,主要用於存儲RegionServer的操做日誌。

4.data目錄:存儲Region中的StoreFile。

5.hbase.id文件:HBase集羣的惟一標識。

6.hbase.version文件:HBase集羣的版本號。

7.oldWALs目錄:當WALs目錄下的日誌文件超過必定時間後,會將其移動到oldWALs目錄中,Master會按期進行清理。

 

 

4.HBase集羣的搭建

 

1.安裝JDK和Hadoop

因爲HBase是經過JAVA語言編寫的,且HBase是基於HDFS的,所以須要安裝JDK和Hadoop,並配置好JAVA_HOME環境變量。

 

因爲HDFS通常都以集羣的方式運行,所以須要搭建HDFS集羣。

*在搭建HDFS集羣時,須要相互配置SSH使之互相信任而且開放防火牆相應的端口,或者直接關閉防火牆。

 

2.安裝Zookeeper並進行集羣的搭建

因爲HDFS HA依賴於Zookeeper,且HBase也依賴於Zookeeper,所以須要安裝Zookeeper並進行集羣的搭建。

 

3.安裝HBase

1.從CDH中下載HBase並進行解壓:http://archive.cloudera.com/cdh5/cdh/5/ 

 

2.修改hbase-env.sh配置文件

#設置JDK的安裝目錄
export JAVA_HOME=/usr/jdk8/jdk1.8.0_161

#true則使用hbase自帶的zk服務,false則使用外部的zk服務.
export HBASE_MANAGES_ZK=flase

 

3.修改hbase-site.xml配置文件

  <!-- 指定HBase日誌的存放目錄 -->  
  <property> 
    <name>hbase.tmp.dir</name>  
    <value>/usr/hbase/hbase-1.2.8/logs</value> 
  </property>  
  <!-- 指定HBase中的數據存儲在HDFS中的目錄 -->  
  <property> 
    <name>hbase.rootdir</name>  
    <value>hdfs://nameservice:8020/hbase</value> 
  </property>  
  <!-- 設置是不是分佈式 -->  
  <property> 
    <name>hbase.cluster.distributed</name>  
    <value>true</value> 
  </property>  
  <!-- 指定HBase使用的ZK地址 -->  
  <property> 
    <name>hbase.zookeeper.quorum</name>  
    <value>192.168.1.80:2181,192.168.1.81:2181,192.168.1.82:2181</value> 
  </property> 

 

4.修改regionservers文件,配置充當RegionServer的節點

*值能夠是主機名或者IP地址

*若是Hadoop配置了HDFS HA高可用集羣,那麼就會有兩個NameNode和一個NameService,此時就須要將HDFS的core-site.xml和hdfs-site.xml配置文件複製到HBase的conf目錄下,且hbase-site.xml配置文件中的hbase.rootdir配置項的HDFS地址指向NameService的名稱。

 

5.NTP時間同步

NTP是一個時間服務器,做用是使集羣中的各個節點的時間都保持一致。

因爲在HBase集羣中,Zookeeper與HBase對時間的要求較高,若是兩個節點之間的時間相差過大時,那麼整個集羣就會崩潰,所以須要使各個節點的時間都保持一致。

#查看是否安裝了NTP服務
rpm -qa|grep ntp

#安裝NTP服務
yum install ntp -y

#從NTP服務器中獲取時間並同步本地
ntpdate 192.168.1.80

*在實際的應用場景中,能夠本身搭建NTP服務器,也可使用第三方開源的NTP服務器,如阿里等。

使用 「ntpdate NTP服務器地址」 命令從NTP服務器中獲取時間並同步本地,通常配合Linux的crontab使用,每隔5分鐘進行一次時間的同步。

 

4.啓動集羣

使用bin目錄下的start-hbase.sh命令啓動集羣,那麼會在當前節點中啓動一個Master和RegionSever進程,並經過SSH訪問其它節點,啓動RegionServer進程。

 

因爲HBase的Master HA集羣是經過Zookeeper進行協調的,須要手動在其餘節點中啓動Master,Zookeeper能保證當前HBase集羣中有且只有一個Master處於Active狀態,當狀態爲Active的Master沒法正常提供服務時,會將處於StandBy的Master的狀態修改成Active。 

 

*當HBase集羣啓動後,能夠訪問http:/localhost:16030,進入HBase的Web監控頁面。

 

 

5.使用Shell操做HBase

 

使用bin/hbase shell命令進行HBase的Shell操做

#建立表 create 'tableName' , 'columnFamily' , 'columnFamily...' #添加記錄 put 'tableName' , 'rowkey' , 'columnFamily:column' , 'value' #查詢記錄 get 'tableName' , 'rowkey' #統計表的記錄數 count 'tableName' #刪除記錄 deleteall 'tableName' , 'rowkey' #刪除記錄的某一列 delete 'tableName' , 'rowkey' ,'columnFamily:column' #禁用表 disable 'tableName' #啓動表 enable 'tableName' #查看錶是否被禁用 is_disabled 'tableName' #刪除表 drop 'tableName' #查看錶中的全部記錄 scan 'tableName' #查看錶中指定列的全部記錄 scan 'tableName' , {COLUMNS=>'columnFamily:column'} #檢查表是否存在 exists 'tableName' #查看當前HBase中的表 list

 

*在刪除表時須要禁用表,不然沒法刪除。

*使用put相同rowkey的一條數據來進行記錄的更新,僅會更新列相同的值。

 

 

6.使用JAVA操做HBase

 

1.導入相關依賴

<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-client</artifactId>
  <version>1.2.8</version>
</dependency>

 

2.初始化配置

使用HBaseConfiguration的create()靜態方法建立一個Configuration實例,用於封裝環境配置信息。

Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum","192.168.1.80,192.168.1.81,192.168.1.82"); config.set("hbase.zookeeper.property.clientPort","2181");

*此方法會默認加載classpath下的hbase-site.xml配置文件,若是沒有此配置文件則須要手動進行環境的配置。

 

3.建立HBase鏈接對象

Connection conn = ConnectionFactory.createConnection(config);

 

4.進行表的管理

*使用Admin類進行HBase表的管理,經過Connection實例的getAdmin()靜態方法返回一個Admin實例。

//判斷表是否存在
boolean tableExists(TableName); //遍歷HBase中的表定義
HTableDescriptor [] listTables(); //遍歷HBase中的表名稱
TableName [] listTableNames(); //根據表名獲取表定義
HTableDescriptor getTableDescriptor(TableName); //建立表
void createTable(HTableDescriptor); //刪除表
void deleteTable(TableName); //啓用表
void enableTable(TableName); //禁用表
void disableTable(TableName); //判斷表是不是啓用狀態
boolean isTableEnabled(TableName); //判斷表是不是禁用狀態
boolean isTableDisabled(TableName); //爲表添加列簇
void addColumn(TableName,HColumnDescriptor); //刪除表中的列簇
void deleteColumn(TableName,byte); //修改表中的列簇
void modifyColumn(TableName,HColumnDescriptor);

 

TableName實例用於封裝表名稱。

HTableDescriptor實例用於封裝表定義,包括表的名稱、表的列簇等。

HColumnDescriptor實例用於封裝表的列簇。

 

5.對錶中的數據進行增刪改查

使用Table類進行表數據的增刪改查,經過Connection的getTable(TableName)靜態方法返回一個Table實例。

//判斷指定RowKey的數據是否存在
boolean exists(Get get); //根據RowKey獲取數據
Result get(Get get); //根據多個RowKey獲取數據
Result [] get(List<Get>); //獲取表的掃描器
ResultScanner getScanner(Scan); //添加數據
void put(Put); //批量添加數據
void put(List<Put>); //刪除數據
void delete(Delete); //批量刪除數據
void delete(List<Delete>)

 

使用Get實例封裝查詢參數,使用其構建方法設置RowKey。

使用Put實例封裝新增和更新參數,使用其構建方法設置RowKey,使用其addColumn(byte[] family , byte[] qualifier , byte[] value)方法分別指定列簇、列名、列值。

使用Delete實例封裝刪除參數,使用其構建方法設置RowKey。

使用Scan實例封裝掃描器的查詢條件,使用其addFamily(byte[] family)方法設置掃描的列簇,使用其addColumn(byte[] family , byte[] qualifier)方法分別指定要掃描的列簇和列名。

 

*在進行表的增刪改查時,方法參數大多都是字節數組類型,可使用HBase Java提供的Bytes工具類進行字符串和字節數組之間的轉換。

*在進行查詢操做時,會返回Result實例,Result實例包含了一個RowKey的全部鍵值對(cell,不區分列簇),能夠經過Result實例的listCells()方法獲取其包含的全部cell,藉助CellUtil工具類獲取Cell實例中對應的RowKey、Family、Qualifier、Value等屬性信息。

*在使用getScanner掃描時,返回的ResultScanner接口繼承Iterable接口,其泛型是Result,所以能夠理解成ResultScanner是Result的一個集合。

 

6.完整的HBaseUtil

/**
 * @Auther: ZHUANGHAOTANG
 * @Date: 2018/11/26 11:40
 * @Description:
 */
public class HBaseUtils {

    private static final Logger logger = LoggerFactory.getLogger(HBaseUtils.class);

    /**
     * ZK集羣地址
     */
    private static final String ZK_CLUSTER_HOSTS = "192.168.1.80,192.168.1.81,192.168.1.82";

    /**
     * ZK端口
     */
    private static final String ZK_CLUSTER_PORT = "2181";

    /**
     * HBase全局鏈接
     */
    private static Connection connection;

    static {
        //默認加載classpath下hbase-site.xml文件
        Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", ZK_CLUSTER_HOSTS);
        configuration.set("hbase.zookeeper.property.clientPort", ZK_CLUSTER_PORT);
        try {
            connection = ConnectionFactory.createConnection(configuration);
        } catch (Exception e) {
            logger.info("初始化HBase鏈接失敗:", e);
        }
    }

    /**
     * 返回鏈接
     */
    public static Connection getConnection() {
        return connection;
    }

    /**
     * 建立表
     */
    public static void createTable(String tableName, String... families) throws Exception {
        Admin admin = connection.getAdmin();
        if (admin.tableExists(TableName.valueOf(tableName))) {
            throw new UnsupportedOperationException("tableName " + tableName + " is already exists");
        }
        HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
        for (String family : families)
            descriptor.addFamily(new HColumnDescriptor(family));
        admin.createTable(descriptor);
    }

    /**
     * 刪除表
     */
    public static void deleteTable(String tableName) throws Exception {
        Admin admin = connection.getAdmin();
        if (admin.tableExists(TableName.valueOf(tableName))) {
            admin.disableTable(TableName.valueOf(tableName));
            admin.deleteTable(TableName.valueOf(tableName));
        }
    }

    /**
     * 獲取全部表名稱
     */
    public static TableName[] getTableNameList() throws Exception {
        Admin admin = connection.getAdmin();
        return admin.listTableNames();
    }

    /**
     * 獲取全部表定義
     */
    public static HTableDescriptor[] getTableDescriptorList() throws Exception {
        Admin admin = connection.getAdmin();
        return admin.listTables();
    }

    /**
     * 爲表添加列簇
     */
    public static void addFamily(String tableName, String family) throws Exception {
        Admin admin = connection.getAdmin();
        if (!admin.tableExists(TableName.valueOf(tableName))) {
            throw new UnsupportedOperationException("tableName " + tableName + " is not exists");
        }
        admin.addColumn(TableName.valueOf(tableName), new HColumnDescriptor(family));
    }

    /**
     * 刪除表中指定的列簇
     */
    public static void deleteFamily(String tableName, String family) throws Exception {
        Admin admin = connection.getAdmin();
        admin.deleteColumn(TableName.valueOf(tableName), Bytes.toBytes(family));
    }

    /**
     * 爲表添加一條數據
     */
    public static void put(String tableName, String rowKey, String family, Map<String, String> values) throws Exception {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Put put = new Put(Bytes.toBytes(rowKey));
        for (Map.Entry<String, String> entry : values.entrySet())
            put.addColumn(Bytes.toBytes(family), Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue()));
        table.put(put);
    }

    /**
     * 批量爲表添加數據
     */
    public static void batchPut(String tableName, String family, Map<String, Map<String, String>> values) throws Exception {
        Table table = connection.getTable(TableName.valueOf(tableName));
        List<Put> puts = new ArrayList<>();
        for (Map.Entry<String, Map<String, String>> entry : values.entrySet()) {
            Put put = new Put(Bytes.toBytes(entry.getKey()));
            for (Map.Entry<String, String> subEntry : entry.getValue().entrySet())
                put.addColumn(Bytes.toBytes(family), Bytes.toBytes(subEntry.getKey()), Bytes.toBytes(subEntry.getValue()));
            puts.add(put);
        }
        table.put(puts);
    }

    /**
     * 刪除RowKey中的某列
     */
    public static void deleteColumn(String tableName, String rowKey, String family, String qualifier) throws Exception {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        table.delete(delete);
    }

    /**
     * 刪除RowKey
     */
    public static void delete(String tableName, String rowKey) throws Exception {
        Table table = connection.getTable(TableName.valueOf(tableName));
        table.delete(new Delete(Bytes.toBytes(rowKey)));
    }

    /**
     * 批量刪除RowKey
     */
    public static void batchDelete(String tableName, String... rowKeys) throws Exception {
        Table table = connection.getTable(TableName.valueOf(tableName));
        List<Delete> deletes = new ArrayList<>();
        for (String rowKey : rowKeys)
            deletes.add(new Delete(Bytes.toBytes(rowKey)));
        table.delete(deletes);
    }

    /**
     * 根據RowKey獲取數據
     */
    public static Map<String, String> get(String tableName, String rowKey) throws Exception {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Result result = table.get(new Get(Bytes.toBytes(rowKey)));
        List<Cell> cells = result.listCells();
        Map<String, String> cellsMap = new HashMap<>();
        for (Cell cell : cells) {
            cellsMap.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
        }
        return cellsMap;
    }

    /**
     * 獲取全表數據
     */
    public static Map<String, Map<String, String>> scan(String tableName) throws Exception {
        Table table = connection.getTable(TableName.valueOf(tableName));
        ResultScanner resultScanner = table.getScanner(new Scan());
        return getResult(resultScanner);
    }

    /**
     * 獲取某列數據
     */
    public static Map<String, Map<String, String>> scan(String tableName, String family, String qualifier) throws Exception {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        ResultScanner resultScanner = table.getScanner(scan);
        return getResult(resultScanner);
    }

    private static Map<String, Map<String, String>> getResult(ResultScanner resultScanner) {
        Map<String, Map<String, String>> resultMap = new HashMap<>();
        for (Result result : resultScanner) {
            List<Cell> cells = result.listCells();
            Map<String, String> cellsMap = new HashMap<>();
            for (Cell cell : cells)
                cellsMap.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
            resultMap.put(Bytes.toString(result.getRow()), cellsMap);
        }
        return resultMap;
    }

}
相關文章
相關標籤/搜索