大數據學習——Hbase

1. Hbase基礎

1.1 hbase數據庫介紹

1、簡介java

hbase是bigtable的開源java版本。是創建在hdfs之上,提供高可靠性、高性能、列存儲、可伸縮、實時讀寫nosql的數據庫系統。node

它介於nosql和RDBMS之間,僅能經過主鍵(row key)和主鍵的range來檢索數據,僅支持單行事務(可經過hive支持來實現多表join等複雜操做)。mysql

 

主要用來存儲結構化和半結構化的鬆散數據。正則表達式

Hbase查詢數據功能很簡單,不支持join等複雜操做,不支持複雜的事務(行級的事務)算法

Hbase中支持的數據類型:byte[]sql

 

與hadoop同樣,Hbase目標主要依靠橫向擴展,經過不斷增長廉價的商用服務器,來增長計算和存儲能力。shell

 

HBase中的表通常有這樣的特色:數據庫

²  大:一個表能夠有上十億行,上百萬列apache

²  面向列:面向列(族)的存儲和權限控制,列(族)獨立檢索。編程

²  稀疏:對於爲空(null)的列,並不佔用存儲空間,所以,表能夠設計的很是稀疏。

 

2、表結構邏輯視圖

HBase以表的形式存儲數據。表有行和列組成。列劃分爲若干個列族(column family)

 

3、Row Key

與nosql數據庫們同樣,row key是用來檢索記錄的主鍵。訪問hbase table中的行,只有三種方式:

1 經過單個row key訪問

2 經過row key的range

3 全表掃描

Row key行鍵 (Row key)能夠是任意字符串(最大長度是 64KB,實際應用中長度通常爲 10-100bytes),在hbase內部,row key保存爲字節數組。

Hbase會對錶中的數據按照rowkey排序(字典順序)

 

存儲時,數據按照Row key的字典序(byte order)排序存儲。設計key時,要充分排序存儲這個特性,將常常一塊兒讀取的行存儲放到一塊兒。(位置相關性)

注意:

字典序對int排序的結果是

1,10,100,11,12,13,14,15,16,17,18,19,2,20,21,…,9,91,92,93,94,95,96,97,98,99。要保持整形的天然序,行鍵必須用0做左填充。

行的一次讀寫是原子操做 (不論一次讀寫多少列)。這個設計決策可以使用戶很容易的理解程序在對同一個行進行併發更新操做時的行爲。

 

4、列族

hbase表中的每一個列,都歸屬與某個列族。列族是表的schema的一部分(而列不是),必須在使用表以前定義。

列名都以列族做爲前綴。例如courses:history , courses:math 都屬於 courses 這個列族。

訪問控制、磁盤和內存的使用統計都是在列族層面進行的。

列族越多,在取一行數據時所要參與IO、搜尋的文件就越多,因此,若是沒有必要,不要設置太多的列族

 

5、時間戳

HBase中經過row和columns肯定的爲一個存貯單元稱爲cell。每一個 cell都保存着同一份數據的多個版本。版本經過時間戳來索引。時間戳的類型是 64位整型。時間戳能夠由hbase(在數據寫入時自動 )賦值,此時時間戳是精確到毫秒的當前系統時間。時間戳也能夠由客戶顯式賦值。若是應用程序要避免數據版本衝突,就必須本身生成具備惟一性的時間戳。每一個 cell中,不一樣版本的數據按照時間倒序排序,即最新的數據排在最前面。

 

爲了不數據存在過多版本形成的的管理 (包括存貯和索引)負擔,hbase提供了兩種數據版本回收方式:

²  保存數據的最後n個版本

²  保存最近一段時間內的版本(設置數據的生命週期TTL)。

用戶能夠針對每一個列族進行設置。

 

6、Cell

由{row key, column( =<family> + <label>), version} 惟一肯定的單元。

cell中的數據是沒有類型的,所有是字節碼形式存貯。

 

1.2 hbase集羣結構

REGION:是HBASE中對錶進行切割的單元

 

HMASTER:HBASE的主節點,負責整個集羣的狀態感知,負載分配、負責用戶表的元數據管理

(能夠配置多個用來實現HA)

REGION-SERVER:HBASE中真正負責管理region的服務器,也就是負責爲客戶端進行表數據讀寫的服務器

 

ZOOKEEPER:整個HBASE中的主從節點協調,主節點之間的選舉,集羣節點之間的上下線感知……都是經過zookeeper來實現

 

 

 

1.3 hbase集羣搭建

----先部署一個zookeeper集羣

(1)上傳hbase安裝包

(2)解壓

(3)配置hbase集羣,要修改3個文件

    注意:要把hadoop的hdfs-site.xml和core-site.xml 放到hbase/conf下

      

    (3.1)修改hbase-env.sh

       export JAVA_HOME=/usr/java/jdk1.7.0_55

    //告訴hbase使用外部的zk

       export HBASE_MANAGES_ZK=false

      

       (3.2)修改 hbase-site.xml

       <configuration>

        <!-- 指定hbase在HDFS上存儲的路徑 -->

        <property>

                <name>hbase.rootdir</name>

                <value>hdfs://ns1/hbase</value>

        </property>

        <!-- 指定hbase是分佈式的 -->

        <property>

                <name>hbase.cluster.distributed</name>

                <value>true</value>

        </property>

        <!-- 指定zk的地址,多個用「,」分割 -->

        <property>

                <name>hbase.zookeeper.quorum</name>

                <value>weekend05:2181,weekend06:2181,weekend07:2181</value>

        </property>

       </configuration>

 

(3.3)修改 regionservers

       weekend03

       weekend04

       weekend05

       weekend06

(3.3) 修改 backup-masters來指定備用的主節點【不須要】

[root@mini1 conf]# vi backup-masters

mini2

 

 

 

 

(3.4) 拷貝hbase到其餘節點

              scp -r /weekend/hbase-0.96.2-hadoop2/ weekend02:/weekend/

              scp -r /weekend/hbase-0.96.2-hadoop2/ weekend03:/weekend/

              scp -r /weekend/hbase-0.96.2-hadoop2/ weekend04:/weekend/

              scp -r /weekend/hbase-0.96.2-hadoop2/ weekend05:/weekend/

              scp -r /weekend/hbase-0.96.2-hadoop2/ weekend06:/weekend/

(4) 將配置好的HBase拷貝到每個節點並同步時間。

 

(5) 啓動全部的hbase進程

    首先啓動zk集羣

              ./zkServer.sh start

       啓動hdfs集羣

              start-dfs.sh

    啓動hbase,在主節點上運行:

              start-hbase.sh

(6) 經過瀏覽器訪問hbase管理頁面

       192.168.1.201:60010 /16010

(7) 爲保證集羣的可靠性,要啓動多個HMaster

       hbase-daemon.sh start master

 

 

1.4 命令行演示

1.4.1 基本shell命令

進入hbase命令行

./hbase shell

 

顯示hbase中的表

list

 

建立user表,包含info、data兩個列族

create 'user', 'info1', 'data1'

create 'user', {NAME => 'info', VERSIONS => '3'}

 

向user表中插入信息,row key爲rk0001,列族info中添加name列標示符,值爲zhangsan

put 'user', 'rk0001', 'info:name', 'zhangsan'

 

向user表中插入信息,row key爲rk0001,列族info中添加gender列標示符,值爲female

put 'user', 'rk0001', 'info:gender', 'female'

 

向user表中插入信息,row key爲rk0001,列族info中添加age列標示符,值爲20

put 'user', 'rk0001', 'info:age', 20

 

向user表中插入信息,row key爲rk0001,列族data中添加pic列標示符,值爲picture

put 'user', 'rk0001', 'data:pic', 'picture'

 

獲取user表中row key爲rk0001的全部信息

get 'user', 'rk0001'

 

獲取user表中row key爲rk0001,info列族的全部信息

get 'user', 'rk0001', 'info'

 

獲取user表中row key爲rk0001,info列族的name、age列標示符的信息

get 'user', 'rk0001', 'info:name', 'info:age'

 

獲取user表中row key爲rk0001,info、data列族的信息

get 'user', 'rk0001', 'info', 'data'

get 'user', 'rk0001', {COLUMN => ['info', 'data']}

 

get 'user', 'rk0001', {COLUMN => ['info:name', 'data:pic']}

 

獲取user表中row key爲rk0001,列族爲info,版本號最新5個的信息

get 'user', 'rk0001', {COLUMN => 'info', VERSIONS => 2}

get 'user', 'rk0001', {COLUMN => 'info:name', VERSIONS => 5}

get 'user', 'rk0001', {COLUMN => 'info:name', VERSIONS => 5, TIMERANGE => [1392368783980, 1392380169184]}

 

獲取user表中row key爲rk0001,cell的值爲zhangsan的信息

get 'people', 'rk0001', {FILTER => "ValueFilter(=, 'binary:圖片')"}

 

獲取user表中row key爲rk0001,列標示符中含有a的信息

get 'people', 'rk0001', {FILTER => "(QualifierFilter(=,'substring:a'))"}

 

put 'user', 'rk0002', 'info:name', 'fanbingbing'

put 'user', 'rk0002', 'info:gender', 'female'

put 'user', 'rk0002', 'info:nationality', '中國'

get 'user', 'rk0002', {FILTER => "ValueFilter(=, 'binary:中國')"}

 

 

查詢user表中的全部信息

scan 'user'

 

查詢user表中列族爲info的信息

scan 'user', {COLUMNS => 'info'}

scan 'user', {COLUMNS => 'info', RAW => true, VERSIONS => 5}

scan 'persion', {COLUMNS => 'info', RAW => true, VERSIONS => 3}

查詢user表中列族爲info和data的信息

scan 'user', {COLUMNS => ['info', 'data']}

scan 'user', {COLUMNS => ['info:name', 'data:pic']}

 

 

查詢user表中列族爲info、列標示符爲name的信息

scan 'user', {COLUMNS => 'info:name'}

 

查詢user表中列族爲info、列標示符爲name的信息,而且版本最新的5個

scan 'user', {COLUMNS => 'info:name', VERSIONS => 5}

 

查詢user表中列族爲info和data且列標示符中含有a字符的信息

scan 'user', {COLUMNS => ['info', 'data'], FILTER => "(QualifierFilter(=,'substring:a'))"}

 

查詢user表中列族爲info,rk範圍是[rk0001, rk0003)的數據

scan 'people', {COLUMNS => 'info', STARTROW => 'rk0001', ENDROW => 'rk0003'}

 

查詢user表中row key以rk字符開頭的

scan 'user',{FILTER=>"PrefixFilter('rk')"}

 

查詢user表中指定範圍的數據

scan 'user', {TIMERANGE => [1392368783980, 1392380169184]}

 

刪除數據

刪除user表row key爲rk0001,列標示符爲info:name的數據

delete 'people', 'rk0001', 'info:name'

刪除user表row key爲rk0001,列標示符爲info:name,timestamp爲1392383705316的數據

delete 'user', 'rk0001', 'info:name', 1392383705316

 

 

清空user表中的數據

truncate 'people'

 

 

修改表結構

首先停用user表(新版本不用)

disable 'user'

 

添加兩個列族f1和f2

alter 'people', NAME => 'f1'

alter 'user', NAME => 'f2'

啓用表

enable 'user'

 

 

###disable 'user'(新版本不用)

刪除一個列族:

alter 'user', NAME => 'f1', METHOD => 'delete' 或 alter 'user', 'delete' => 'f1'

 

添加列族f1同時刪除列族f2

alter 'user', {NAME => 'f1'}, {NAME => 'f2', METHOD => 'delete'}

 

將user表的f1列族版本號改成5

alter 'people', NAME => 'info', VERSIONS => 5

啓用表

enable 'user'

 

 

刪除表

disable 'user'

drop 'user'

 

 

get 'person', 'rk0001', {FILTER => "ValueFilter(=, 'binary:中國')"}

get 'person', 'rk0001', {FILTER => "(QualifierFilter(=,'substring:a'))"}

scan 'person', {COLUMNS => 'info:name'}

scan 'person', {COLUMNS => ['info', 'data'], FILTER => "(QualifierFilter(=,'substring:a'))"}

scan 'person', {COLUMNS => 'info', STARTROW => 'rk0001', ENDROW => 'rk0003'}

 

scan 'person', {COLUMNS => 'info', STARTROW => '20140201', ENDROW => '20140301'}

scan 'person', {COLUMNS => 'info:name', TIMERANGE => [1395978233636, 1395987769587]}

delete 'person', 'rk0001', 'info:name'

 

alter 'person', NAME => 'ffff'

alter 'person', NAME => 'info', VERSIONS => 10

 

 

get 'user', 'rk0002', {COLUMN => ['info:name', 'data:pic']}

 

1.5 hbase代碼開發(基本,過濾器查詢)

1.5.1  基本增刪改查java實現

public class HbaseDemo {

 

       private Configuration conf = null;

      

       @Before

       public void init(){

              conf = HBaseConfiguration.create();

              conf.set("hbase.zookeeper.quorum", "weekend05,weekend06,weekend07");

       }

      

       @Test

       public void testDrop() throws Exception{

              HBaseAdmin admin = new HBaseAdmin(conf);

              admin.disableTable("account");

              admin.deleteTable("account");

              admin.close();

       }

      

       @Test

       public void testPut() throws Exception{

              HTable table = new HTable(conf, "person_info");

              Put p = new Put(Bytes.toBytes("person_rk_bj_zhang_000002"));

              p.add("base_info".getBytes(), "name".getBytes(), "zhangwuji".getBytes());

              table.put(p);

              table.close();

       }

      

 

       @Test

       public void testDel() throws Exception{

              HTable table = new HTable(conf, "user");

              Delete del = new Delete(Bytes.toBytes("rk0001"));

              del.deleteColumn(Bytes.toBytes("data"), Bytes.toBytes("pic"));

              table.delete(del);

              table.close();

       }

 

       @Test

       public void testGet() throws Exception{

              HTable table = new HTable(conf, "person_info");

              Get get = new Get(Bytes.toBytes("person_rk_bj_zhang_000001"));

              get.setMaxVersions(5);

              Result result = table.get(get);

             

              List<Cell> cells = result.listCells();

      

              for(Cell c:cells){

              }

             

              //result.getValue(family, qualifier);  能夠從result中直接取出一個特定的value

             

        //遍歷出result中全部的鍵值對

              List<KeyValue> kvs = result.list();

              //kv  ---> f1:title:superise....      f1:author:zhangsan    f1:content:asdfasldgkjsldg

              for(KeyValue kv : kvs){

                     String family = new String(kv.getFamily());

                     System.out.println(family);

                     String qualifier = new String(kv.getQualifier());

                     System.out.println(qualifier);

                     System.out.println(new String(kv.getValue()));

                    

              }

              table.close();

       }

 

1.5.2 過濾器查詢

引言:過濾器的類型不少,可是能夠分爲兩大類——比較過濾器,專用過濾器

過濾器的做用是在服務端判斷數據是否知足條件,而後只將知足條件的數據返回給客戶端;

 

hbase過濾器的比較運算符:

LESS  <

LESS_OR_EQUAL <=

EQUAL =

NOT_EQUAL <>

GREATER_OR_EQUAL >=

GREATER >

NO_OP 排除全部

 

Hbase過濾器的比較器(指定比較機制):

BinaryComparator  按字節索引順序比較指定字節數組,採用Bytes.compareTo(byte[])

BinaryPrefixComparator 跟前面相同,只是比較左端的數據是否相同

NullComparator 判斷給定的是否爲空

BitComparator 按位比較

RegexStringComparator 提供一個正則的比較器,僅支持 EQUAL 和非EQUAL

SubstringComparator 判斷提供的子串是否出如今value中。

 

Hbase的過濾器分類

 

  • 比較過濾器

1.1  行鍵過濾器RowFilter

Filter filter1 = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-22"))); 

scan.setFilter(filter1); 

 

1.2  列族過濾器FamilyFilter

Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("colfam3")));

scan.setFilter(filter1); 

 

1.3 列過濾器QualifierFilter

filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("col-2")));

scan.setFilter(filter1);

 

1.4 值過濾器 ValueFilter 

Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(".4") ); 

scan.setFilter(filter1); 

 

 

  • 專用過濾器

2.1 單列值過濾器 SingleColumnValueFilter  ----會返回知足條件的整行

SingleColumnValueFilter filter = new SingleColumnValueFilter( 

    Bytes.toBytes("colfam1"), 

    Bytes.toBytes("col-5"), 

    CompareFilter.CompareOp.NOT_EQUAL, 

    new SubstringComparator("val-5")); 

filter.setFilterIfMissing(true);  //若是不設置爲true,則那些不包含指定column的行也會返回

scan.setFilter(filter1); 

 

2.2  SingleColumnValueExcludeFilter

與上相反

 

2.3 前綴過濾器 PrefixFilter----針對行鍵

Filter filter = new PrefixFilter(Bytes.toBytes("row1")); 

scan.setFilter(filter1); 

 

2.4 列前綴過濾器 ColumnPrefixFilter

Filter filter = new ColumnPrefixFilter(Bytes.toBytes("qual2")); 

scan.setFilter(filter1); 

 

2.4分頁過濾器 PageFilter

       public static void main(String[] args) throws Exception {

              Configuration conf = HBaseConfiguration.create();

              conf.set("hbase.zookeeper.quorum", "spark01:2181,spark02:2181,spark03:2181");

             

              String tableName = "testfilter"; 

              String cfName = "f1"; 

              final byte[] POSTFIX = new byte[] { 0x00 }; 

              HTable table = new HTable(conf, tableName); 

              Filter filter = new PageFilter(3); 

              byte[] lastRow = null; 

              int totalRows = 0; 

              while (true) { 

                  Scan scan = new Scan(); 

                  scan.setFilter(filter); 

                  if(lastRow != null){ 

                //注意這裏添加了POSTFIX操做,用來重置掃描邊界 

                      byte[] startRow = Bytes.add(lastRow,POSTFIX); 

                      scan.setStartRow(startRow); 

                  } 

                  ResultScanner scanner = table.getScanner(scan); 

                  int localRows = 0; 

                  Result result; 

                  while((result = scanner.next()) != null){ 

                      System.out.println(localRows++ + ":" + result); 

                      totalRows ++; 

                      lastRow = result.getRow(); 

                  } 

                  scanner.close(); 

                  if(localRows == 0) break; 

              } 

              System.out.println("total rows:" + totalRows); 

       }

 

 

 

 

 

 

       /**

     * 多種過濾條件的使用方法

        * @throws Exception

        */

       @Test

       public void testScan() throws Exception{

              HTable table = new HTable(conf, "person_info".getBytes());

              Scan scan = new Scan(Bytes.toBytes("person_rk_bj_zhang_000001"), Bytes.toBytes("person_rk_bj_zhang_000002"));

             

        //前綴過濾器----針對行鍵

              Filter filter = new PrefixFilter(Bytes.toBytes("rk"));

             

        //行過濾器  ---針對行鍵

              ByteArrayComparable rowComparator = new BinaryComparator(Bytes.toBytes("person_rk_bj_zhang_000001"));

              RowFilter rf = new RowFilter(CompareOp.LESS_OR_EQUAL, rowComparator);

             

              /**

         * 假設rowkey格式爲:建立日期_發佈日期_ID_TITLE

         * 目標:查找  發佈日期  爲  2014-12-21  的數據

         * sc.textFile("path").flatMap(line=>line.split("\t")).map(x=>(x,1)).reduceByKey(_+_).map((_(2),_(1))).sortByKey().map((_(2),_(1))).saveAsTextFile("")

         *

         *

         */

        rf = new RowFilter(CompareOp.EQUAL , new SubstringComparator("_2014-12-21_"));

             

             

        //單值過濾器1完整匹配字節數組

              new SingleColumnValueFilter("base_info".getBytes(), "name".getBytes(), CompareOp.EQUAL, "zhangsan".getBytes());

        //單值過濾器2 匹配正則表達式

              ByteArrayComparable comparator = new RegexStringComparator("zhang.");

              new SingleColumnValueFilter("info".getBytes(), "NAME".getBytes(), CompareOp.EQUAL, comparator);

 

        //單值過濾器3匹配是否包含子串,大小寫不敏感

              comparator = new SubstringComparator("wu");

              new SingleColumnValueFilter("info".getBytes(), "NAME".getBytes(), CompareOp.EQUAL, comparator);

 

        //鍵值對元數據過濾-----family過濾----字節數組完整匹配

        FamilyFilter ff = new FamilyFilter(

                CompareOp.EQUAL ,

                new BinaryComparator(Bytes.toBytes("base_info"))   //表中不存在inf列族,過濾結果爲空

                );

        //鍵值對元數據過濾-----family過濾----字節數組前綴匹配

        ff = new FamilyFilter(

                CompareOp.EQUAL ,

                new BinaryPrefixComparator(Bytes.toBytes("inf"))   //表中存在以inf打頭的列族info,過濾結果爲該列族全部行

                );

       

       //鍵值對元數據過濾-----qualifier過濾----字節數組完整匹配

       

        filter = new QualifierFilter(

                CompareOp.EQUAL ,

                new BinaryComparator(Bytes.toBytes("na"))   //表中不存在na列,過濾結果爲空

                );

        filter = new QualifierFilter(

                CompareOp.EQUAL ,

                new BinaryPrefixComparator(Bytes.toBytes("na"))   //表中存在以na打頭的列name,過濾結果爲全部行的該列數據

                      );

             

        //基於列名(即Qualifier)前綴過濾數據的ColumnPrefixFilter

        filter = new ColumnPrefixFilter("na".getBytes());

       

        //基於列名(即Qualifier)多個前綴過濾數據的MultipleColumnPrefixFilter

        byte[][] prefixes = new byte[][] {Bytes.toBytes("na"), Bytes.toBytes("me")};

        filter = new MultipleColumnPrefixFilter(prefixes);

 

        //爲查詢設置過濾條件

        scan.setFilter(filter);

       

       

       

              scan.addFamily(Bytes.toBytes("base_info"));

        //一行

//            Result result = table.get(get);

        //多行的數據

              ResultScanner scanner = table.getScanner(scan);

              for(Result r : scanner){

                     /**

                     for(KeyValue kv : r.list()){

                            String family = new String(kv.getFamily());

                            System.out.println(family);

                            String qualifier = new String(kv.getQualifier());

                            System.out.println(qualifier);

                            System.out.println(new String(kv.getValue()));

                     }

                     */

            //直接從result中取到某個特定的value

                     byte[] value = r.getValue(Bytes.toBytes("base_info"), Bytes.toBytes("name"));

                     System.out.println(new String(value));

              }

              table.close();

       }

 

 

1.6 hbase內部原理

1.6.1  系統架構

 

 

Client

1 包含訪問hbase的接口,client維護着一些cache來加快對hbase的訪問,好比regione的位置信息。

 

Zookeeper

1 保證任什麼時候候,集羣中只有一個master

2 存貯全部Region的尋址入口----root表在哪臺服務器上。

3 實時監控Region Server的狀態,將Region server的上線和下線信息實時通知給Master

4 存儲Hbase的schema,包括有哪些table,每一個table有哪些column family

 

Master職責

1 爲Region server分配region

2 負責region server的負載均衡

3 發現失效的region server並從新分配其上的region

4 HDFS上的垃圾文件回收

5 處理schema更新請求

Region Server職責

1 Region server維護Master分配給它的region,處理對這些region的IO請求

2 Region server負責切分在運行過程當中變得過大的region

能夠看到,client訪問hbase上數據的過程並不須要master參與(尋址訪問zookeeper和region server,數據讀寫訪問regione server),master僅僅維護者table和region的元數據信息,負載很低。

 

1.6.2  物理存儲

一、總體結構

 

 

1 Table中的全部行都按照row key的字典序排列。

2 Table 在行的方向上分割爲多個Hregion。

 

3 region按大小分割的(默認10G),每一個表一開始只有一個region,隨着數據不斷插入表,region不斷增大,當增大到一個閥值的時候,Hregion就會等分會兩個新的Hregion。當table中的行不斷增多,就會有愈來愈多的Hregion。

 

4 Hregion是Hbase中分佈式存儲和負載均衡的最小單元。最小單元就表示不一樣的Hregion能夠分佈在不一樣的HRegion server上。但一個Hregion是不會拆分到多個server上的。

 

5 HRegion雖然是負載均衡的最小單元,但並非物理存儲的最小單元。

事實上,HRegion由一個或者多個Store組成,每一個store保存一個column family。

每一個Strore又由一個memStore和0至多個StoreFile組成。如上圖

二、STORE FILE & HFILE結構

StoreFile以HFile格式保存在HDFS上。

 

附:HFile的格式爲:

 

 

首先HFile文件是不定長的,長度固定的只有其中的兩塊:Trailer和FileInfo。正如圖中所示的,Trailer中有指針指向其餘數 據塊的起始點。

File Info中記錄了文件的一些Meta信息,例如:AVG_KEY_LEN, AVG_VALUE_LEN, LAST_KEY, COMPARATOR, MAX_SEQ_ID_KEY等。

Data Index和Meta Index塊記錄了每一個Data塊和Meta塊的起始點。

Data Block是HBase I/O的基本單元,爲了提升效率,HRegionServer中有基於LRU的Block Cache機制。每一個Data塊的大小能夠在建立一個Table的時候經過參數指定,大號的Block有利於順序Scan,小號Block利於隨機查詢。 每一個Data塊除了開頭的Magic之外就是一個個KeyValue對拼接而成, Magic內容就是一些隨機數字,目的是防止數據損壞。

HFile裏面的每一個KeyValue對就是一個簡單的byte數組。可是這個byte數組裏麪包含了不少項,而且有固定的結構。咱們來看看裏面的具體結構:

開始是兩個固定長度的數值,分別表示Key的長度和Value的長度。緊接着是Key,開始是固定長度的數值,表示RowKey的長度,緊接着是 RowKey,而後是固定長度的數值,表示Family的長度,而後是Family,接着是Qualifier,而後是兩個固定長度的數值,表示Time Stamp和Key Type(Put/Delete)。Value部分沒有這麼複雜的結構,就是純粹的二進制數據了。

 

 

HFile分爲六個部分:

Data Block 段–保存表中的數據,這部分能夠被壓縮

Meta Block 段 (可選的)–保存用戶自定義的kv對,能夠被壓縮。

File Info 段–Hfile的元信息,不被壓縮,用戶也能夠在這一部分添加本身的元信息。

Data Block Index 段–Data Block的索引。每條索引的key是被索引的block的第一條記錄的key。

Meta Block Index段 (可選的)–Meta Block的索引。

Trailer–這一段是定長的。保存了每一段的偏移量,讀取一個HFile時,會首先 讀取Trailer,Trailer保存了每一個段的起始位置(段的Magic Number用來作安全check),而後,DataBlock Index會被讀取到內存中,這樣,當檢索某個key時,不須要掃描整個HFile,而只需從內存中找到key所在的block,經過一次磁盤io將整個 block讀取到內存中,再找到須要的key。DataBlock Index採用LRU機制淘汰。

HFile的Data Block,Meta Block一般採用壓縮方式存儲,壓縮以後能夠大大減小網絡IO和磁盤IO,隨之而來的開銷固然是須要花費cpu進行壓縮和解壓縮。

目標Hfile的壓縮支持兩種方式:Gzip,Lzo。

 

 

三、Memstore與storefile

一個region由多個store組成,每一個store包含一個列族的全部數據

Store包括位於內存的memstore和位於硬盤的storefile

寫操做先寫入memstore,當memstore中的數據量達到某個閾值,Hregionserver啓動flashcache進程寫入storefile,每次寫入造成單獨一個storefile

當storefile大小超過必定閾值後,會把當前的region分割成兩個,並由Hmaster分配給相應的region服務器,實現負載均衡

客戶端檢索數據時,先在memstore找,找不到再找storefile

 

 

四、HLog(WAL log)

WAL 意爲Write ahead log(http://en.wikipedia.org/wiki/Write-ahead_logging),相似mysql中的binlog,用來 作災難恢復只用,Hlog記錄數據的全部變動,一旦數據修改,就能夠從log中進行恢復。

每一個Region Server維護一個Hlog,而不是每一個Region一個。這樣不一樣region(來自不一樣table)的日誌會混在一塊兒,這樣作的目的是不斷追加單個文件相對於同時寫多個文件而言,能夠減小磁盤尋址次數,所以能夠提升對table的寫性能。帶來的麻煩是,若是一臺region server下線,爲了恢復其上的region,須要將region server上的log進行拆分,而後分發到其它region server上進行恢復。

HLog文件就是一個普通的Hadoop Sequence File:

²  HLog Sequence File 的Key是HLogKey對象,HLogKey中記錄了寫入數據的歸屬信息,除了table和region名字外,同時還包括 sequence number和timestamp,timestamp是」寫入時間」,sequence number的起始值爲0,或者是最近一次存入文件系統中sequence number。

²  HLog Sequece File的Value是HBase的KeyValue對象,即對應HFile中的KeyValue,可參見上文描述。

 

 

1.6.3  尋址機制

一、尋址示意圖

二、-ROOT-和.META.表結構

 

.META.行記錄結構

三、尋址流程

如今假設咱們要從Table2裏面插尋一條RowKey是RK10000的數據。那麼咱們應該遵循如下步驟:

1. 從.META.表裏面查詢哪一個Region包含這條數據。

2. 獲取管理這個Region的RegionServer地址。

3. 鏈接這個RegionServer, 查到這條數據。

 

 

 

系統如何找到某個row key (或者某個 row key range)所在的region

bigtable 使用三層相似B+樹的結構來保存region位置。

第一層是保存zookeeper裏面的文件,它持有root region的位置。

第二層root region是.META.表的第一個region其中保存了.META.表其它region的位置。經過root region,咱們就能夠訪問.META.表的數據。

.META.是第三層,它是一個特殊的表,保存了hbase中全部數據表的region 位置信息。

 

說明:

1 root region永遠不會被split,保證了最須要三次跳轉,就能定位到任意region 。

2.META.表每行保存一個region的位置信息,row key 採用表名+表的最後一行編碼而成。

3 爲了加快訪問,.META.表的所有region都保存在內存中。

4 client會將查詢過的位置信息保存緩存起來,緩存不會主動失效,所以若是client上的緩存所有失效,則須要進行最多6次網絡來回,才能定位到正確的region(其中三次用來發現緩存失效,另外三次用來獲取位置信息)。

 

 

1.6.4    讀寫過程

一、讀請求過程:

1 客戶端經過zookeeper以及root表和meta表找到目標數據所在的regionserver

2 聯繫regionserver查詢目標數據

3 regionserver定位到目標數據所在的region,發出查詢請求

4 region先在memstore中查找,命中則返回

5 若是在memstore中找不到,則在storefile中掃描(可能會掃描到不少的storefile----bloomfilter)

 

 

二、寫請求過程:

1 client向region server提交寫請求

2 region server找到目標region

3 region檢查數據是否與schema一致

4 若是客戶端沒有指定版本,則獲取當前系統時間做爲數據版本

5 將更新寫入WAL log

6 將更新寫入Memstore

7 判斷Memstore的是否須要flush爲Store文件。

 

細節描述:

hbase使用MemStore和StoreFile存儲對錶的更新。

數據在更新時首先寫入Log(WAL log)和內存(MemStore)中,MemStore中的數據是排序的,當MemStore累計到必定閾值時,就會建立一個新的MemStore,並 且將老的MemStore添加到flush隊列,由單獨的線程flush到磁盤上,成爲一個StoreFile。於此同時,系統會在zookeeper中記錄一個redo point,表示這個時刻以前的變動已經持久化了。

當系統出現意外時,可能致使內存(MemStore)中的數據丟失,此時使用Log(WAL log)來恢復checkpoint以後的數據。

 

StoreFile是隻讀的,一旦建立後就不能夠再修改。所以Hbase的更新實際上是不斷追加的操做。當一個Store中的StoreFile達到必定的閾值後,就會進行一次合併(minor_compact, major_compact),將對同一個key的修改合併到一塊兒,造成一個大的StoreFile,當StoreFile的大小達到必定閾值後,又會對 StoreFile進行split,等分爲兩個StoreFile。

因爲對錶的更新是不斷追加的,compact時,須要訪問Store中所有的 StoreFile和MemStore,將他們按row key進行合併,因爲StoreFile和MemStore都是通過排序的,而且StoreFile帶有內存中索引,合併的過程仍是比較快。

 

 

1.6.5  Region管理

(1) region分配

任什麼時候刻,一個region只能分配給一個region server。master記錄了當前有哪些可用的region server。以及當前哪些region分配給了哪些region server,哪些region尚未分配。當須要分配的新的region,而且有一個region server上有可用空間時,master就給這個region server發送一個裝載請求,把region分配給這個region server。region server獲得請求後,就開始對此region提供服務。

 

(2) region server上線

master使用zookeeper來跟蹤region server狀態。當某個region server啓動時,會首先在zookeeper上的server目錄下創建表明本身的znode。因爲master訂閱了server目錄上的變動消息,當server目錄下的文件出現新增或刪除操做時,master能夠獲得來自zookeeper的實時通知。所以一旦region server上線,master能立刻獲得消息。

 

(3) region server下線

當region server下線時,它和zookeeper的會話斷開,zookeeper而自動釋放表明這臺server的文件上的獨佔鎖。master就能夠肯定:

1 region server和zookeeper之間的網絡斷開了。

2 region server掛了。

不管哪一種狀況,region server都沒法繼續爲它的region提供服務了,此時master會刪除server目錄下表明這臺region server的znode數據,並將這臺region server的region分配給其它還活着的同志。

 

 

1.6.6  Master工做機制

  • master上線

master啓動進行如下步驟:

1 從zookeeper上獲取惟一一個表明active master的鎖,用來阻止其它master成爲master。

2 掃描zookeeper上的server父節點,得到當前可用的region server列表。

3 和每一個region server通訊,得到當前已分配的region和region server的對應關係。

4 掃描.META.region的集合,計算獲得當前還未分配的region,將他們放入待分配region列表。

 

  • master下線

因爲master只維護表和region的元數據,而不參與表數據IO的過程,master下線僅致使全部元數據的修改被凍結(沒法建立刪除表,沒法修改表的schema,沒法進行region的負載均衡,沒法處理region 上下線,沒法進行region的合併,惟一例外的是region的split能夠正常進行,由於只有region server參與),表的數據讀寫還能夠正常進行。所以master下線短期內對整個hbase集羣沒有影響。

從上線過程能夠看到,master保存的信息全是能夠冗餘信息(均可以從系統其它地方收集到或者計算出來)

所以,通常hbase集羣中老是有一個master在提供服務,還有一個以上的‘master’在等待時機搶佔它的位置。

 

 

 

動手練習(增刪改查)

 

2.  Hbase高級應用

2.1建表高級屬性

下面幾個shell 命令在hbase操做中能夠起到很到的做用,且主要體如今建表的過程當中,看下面幾個create 屬性

 

一、BLOOMFILTER  默認是NONE 是否使用布隆過慮及使用何種方式

布隆過濾能夠每列族單獨啓用。

使用 HColumnDescriptor.setBloomFilterType(NONE | ROW | ROWCOL) 對列族單獨啓用布隆。

²  Default = ROW 對行進行布隆過濾。

²  對 ROW,行鍵的哈希在每次插入行時將被添加到布隆。

²  對 ROWCOL,行鍵 + 列族 + 列族修飾的哈希將在每次插入行時添加到布隆

   使用方法: create 'table',{BLOOMFILTER =>'ROW'}

   啓用布隆過濾能夠節省讀磁盤過程,能夠有助於下降讀取延遲

 

二、VERSIONS 默認是1 這個參數的意思是數據保留1個 版本,若是咱們認爲咱們的數據沒有這麼大的必要保留這麼多,隨時都在更新,而老版本的數據對咱們毫無價值,那將此參數設爲1 能節約2/3的空間

使用方法: create 'table',{VERSIONS=>'2'}

 

附:MIN_VERSIONS => '0'是說在compact操做執行以後,至少要保留的版本

 

三、COMPRESSION 默認值是NONE 即不使用壓縮

     這個參數意思是該列族是否採用壓縮,採用什麼壓縮算法

     使用方法: create 'table',{NAME=>'info',COMPRESSION=>'SNAPPY'}

     建議採用SNAPPY壓縮算法

    HBase中,在Snappy發佈以前(Google 2011年對外發布Snappy),採用的LZO算法,目標是達到儘量快的壓縮和解壓速度,同時減小對CPU的消耗;

    在Snappy發佈以後,建議採用Snappy算法(參考《HBase: The Definitive Guide》),具體能夠根據實際狀況對LZO和Snappy作過更詳細的對比測試後再作選擇。

                    

Algorithm

% remaining

Encoding

Decoding

GZIP

13.4%

21 MB/s

118 MB/s

LZO

20.5%

135 MB/s

410 MB/s

Zippy/Snappy

22.2%

172 MB/s

409 MB/s

若是建表之初沒有壓縮,後來想要加入壓縮算法,能夠經過alter修改schema

 

四、alter

     使用方法:

     如 修改壓縮算法     

      disable 'table'

      alter 'table',{NAME=>'info',COMPRESSION=>'snappy'}

      enable 'table'

     可是須要執行major_compact 'table' 命令以後 纔會作實際的操做。

 

五、TTL

默認是 2147483647 即:Integer.MAX_VALUE 值大概是68年

這個參數是說明該列族數據的存活時間,單位是s

這個參數能夠根據具體的需求對數據設定存活時間,超過存過期間的數據將在表中不在顯示,待下次major compact的時候再完全刪除數據

注意的是TTL設定以後 MIN_VERSIONS=>'0' 這樣設置以後,TTL時間戳過時後,將所有完全刪除該family下全部的數據,若是MIN_VERSIONS 不等於0那將保留最新的MIN_VERSIONS個版本的數據,其它的所有刪除,好比MIN_VERSIONS=>'1' 屆時將保留一個最新版本的數據,其它版本的數據將再也不保存。

 

六、describe 'table' 這個命令查看了create table 的各項參數或者是默認值。

 

七、disable_all  'toplist.*'  disable_all 支持正則表達式,並列出當前匹配的表的以下:

      toplist_a_total_1001                                                                                                                                                

      toplist_a_total_1002                                                                                                                                                

      toplist_a_total_1008                 

 

    toplist_a_total_1009                                                                                                                                               

      toplist_a_total_1019                                                                                                                                                

      toplist_a_total_1035

     ...

     Disable the above 25 tables (y/n)? 並給出確認提示

 

八、drop_all 這個命令和disable_all的使用方式是同樣的

 

九、hbase 表預分區----手動分區

     默認狀況下,在建立HBase表的時候會自動建立一個region分區,當導入數據的時候,全部的HBase客戶端都向這一個region寫數據,直到這個region足夠大了才進行切分。一種能夠加快批量寫入速度的方法是經過預先建立一些空的regions,這樣當數據寫入HBase時,會按照region分區狀況,在集羣內作數據的負載均衡。

命令方式:

create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}

也可使用api的方式:

     bin/hbase org.apache.hadoop.hbase.util.RegionSplitter test_table HexStringSplit -c 10 -f info 

     參數:

       test_table是表名

HexStringSplit 是split 方式

-c 是分10個region

-f 是family

 

可在UI上查看結果,如圖:

 

 這樣就能夠將表預先分爲15個區,減小數據達到storefile 大小的時候自動分區的時間消耗,而且還有以一個優點,就是合理設計rowkey 能讓各個region 的併發請求平均分配(趨於均勻) 使IO 效率達到最高,可是預分區須要將filesize 設置一個較大的值,設置哪一個參數呢 hbase.hregion.max.filesize 這個值默認是10G 也就是說單個region 默認大小是10G

     這個參數的默認值在0.90 到0.92到0.94.3各版本的變化:256M--1G--10G

 

     可是若是MapReduce Input類型爲TableInputFormat 使用hbase做爲輸入的時候,就要注意了,每一個region一個map,若是數據小於10G 那隻會啓用一個map 形成很大的資源浪費,這時候能夠考慮適當調小該參數的值,或者採用預分配region的方式,並將檢測若是達到這個值,再手動分配region。

 

 

 

 

 

 

 

 

2.2 hbase應用案例看行鍵設計

表結構設計

一、列族數量的設定

以用戶信息爲例,能夠將必須的基本信息存放在一個列族,而一些附加的額外信息能夠放在另外一列族;

二、行鍵的設計

語音詳單:

13877889988-20150625

13877889988-20150625

13877889988-20150626

13877889988-20150626

13877889989

13877889989

13877889989

----將須要批量查詢的數據儘量連續存放

CMS系統----多條件查詢

儘量將查詢條件關鍵詞拼裝到rowkey中,查詢頻率最高的條件儘可能往前靠

20150230-zhangsan-category… 

20150230-lisi-category… 

(每個條件的值長度不一樣,能夠經過作定長映射來提升效率)

 

參考:《hbase 實戰》----詳細講述了facebook /GIS等系統的表結構設計

 

 

 

 

 

 

 

2.3 Hbase和mapreduce結合

爲何須要用mapreduce去訪問hbase的數據?

——加快分析速度和擴展分析能力

Mapreduce訪問hbase數據做分析必定是在離線分析的場景下應用

2.3.1 從Hbase中讀取數據、分析,寫入hdfs

/**

public abstract class TableMapper<KEYOUT, VALUEOUT>

extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {

}

 * @author duanhaitao@itcast.cn

 *

 */

public class HbaseReader {

 

       public static String flow_fields_import = "flow_fields_import";

       static class HdfsSinkMapper extends TableMapper<Text, NullWritable>{

 

              @Override

              protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

 

                     byte[] bytes = key.copyBytes();

                     String phone = new String(bytes);

                     byte[] urlbytes = value.getValue("f1".getBytes(), "url".getBytes());

                     String url = new String(urlbytes);

                     context.write(new Text(phone + "\t" + url), NullWritable.get());

                    

              }

             

       }

      

       static class HdfsSinkReducer extends Reducer<Text, NullWritable, Text, NullWritable>{

             

              @Override

              protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

                    

                     context.write(key, NullWritable.get());

              }

       }

      

       public static void main(String[] args) throws Exception {

              Configuration conf = HBaseConfiguration.create();

              conf.set("hbase.zookeeper.quorum", "spark01");

             

              Job job = Job.getInstance(conf);

             

              job.setJarByClass(HbaseReader.class);

             

//            job.setMapperClass(HdfsSinkMapper.class);

              Scan scan = new Scan();

              TableMapReduceUtil.initTableMapperJob(flow_fields_import, scan, HdfsSinkMapper.class, Text.class, NullWritable.class, job);

              job.setReducerClass(HdfsSinkReducer.class);

             

              FileOutputFormat.setOutputPath(job, new Path("c:/hbasetest/output"));

             

              job.setOutputKeyClass(Text.class);

              job.setOutputValueClass(NullWritable.class);

             

              job.waitForCompletion(true);

       }

      

}

2.3.2 從hdfs中讀取數據寫入Hbase

/**

public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>

extends Reducer<KEYIN, VALUEIN, KEYOUT, Writable> {

}

 * @author duanhaitao@itcast.cn

 *

 */

public class HbaseSinker {

 

       public static String flow_fields_import = "flow_fields_import";

       static class HbaseSinkMrMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{

              @Override

              protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

 

                     String line = value.toString();

                     String[] fields = line.split("\t");

                     String phone = fields[0];

                     String url = fields[1];

                    

                     FlowBean bean = new FlowBean(phone,url);

                    

                     context.write(bean, NullWritable.get());

              }

       }

      

       static class HbaseSinkMrReducer extends TableReducer<FlowBean, NullWritable, ImmutableBytesWritable>{

             

              @Override

              protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

                    

                     Put put = new Put(key.getPhone().getBytes());

                     put.add("f1".getBytes(), "url".getBytes(), key.getUrl().getBytes());

                    

                     context.write(new ImmutableBytesWritable(key.getPhone().getBytes()), put);

                    

              }

             

       }

      

       public static void main(String[] args) throws Exception {

              Configuration conf = HBaseConfiguration.create();

              conf.set("hbase.zookeeper.quorum", "spark01");

             

              HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);

             

              boolean tableExists = hBaseAdmin.tableExists(flow_fields_import);

              if(tableExists){

                     hBaseAdmin.disableTable(flow_fields_import);

                     hBaseAdmin.deleteTable(flow_fields_import);

              }

              HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(flow_fields_import));

              HColumnDescriptor hColumnDescriptor = new HColumnDescriptor ("f1".getBytes());

              desc.addFamily(hColumnDescriptor);

             

              hBaseAdmin.createTable(desc);

             

             

              Job job = Job.getInstance(conf);

             

              job.setJarByClass(HbaseSinker.class);

             

              job.setMapperClass(HbaseSinkMrMapper.class);

              TableMapReduceUtil.initTableReducerJob(flow_fields_import, HbaseSinkMrReducer.class, job);

             

              FileInputFormat.setInputPaths(job, new Path("c:/hbasetest/data"));

             

              job.setMapOutputKeyClass(FlowBean.class);

              job.setMapOutputValueClass(NullWritable.class);

             

              job.setOutputKeyClass(ImmutableBytesWritable.class);

              job.setOutputValueClass(Mutation.class);

             

              job.waitForCompletion(true);

             

             

       }

      

}

 

 

2.3 hbase高級編程

2.3.1 協處理器---- Coprocessor

協處理器有兩種:observer和endpoint

Observer容許集羣在正常的客戶端操做過程當中能夠有不一樣的行爲表現

Endpoint容許擴展集羣的能力,對客戶端應用開放新的運算命令

 

ü  Observer協處理器

²  正常put請求的流程:

 

 

²  加入Observer協處理後的put流程:

1 客戶端發出put請求

2 該請求被分派給合適的RegionServer和region

3 coprocessorHost攔截該請求,而後在該表上登記的每一個RegionObserver上調用prePut()

4 若是沒有被prePut()攔截,該請求繼續送到region,而後進行處理

5 region產生的結果再次被CoprocessorHost攔截,調用postPut()

6 假如沒有postPut()攔截該響應,最終結果被返回給客戶端

 

ü  Observer的類型

一、RegionObs——這種Observer鉤在數據訪問和操做階段,全部標準的數據操做命令均可以被pre-hooks和post-hooks攔截

二、WALObserver——WAL所支持的Observer;可用的鉤子是pre-WAL和post-WAL

三、MasterObserver——鉤住DDL事件,如表建立或模式修改

 

ü  Observer應用場景示例

見下節;

 

 

  • Endpoint—參考《Hbase 權威指南》

 

2.3.2 二級索引

row key在HBase中是以B+ tree結構化有序存儲的,因此scan起來會比較效率。單表以row key存儲索引,column value存儲id值或其餘數據 ,這就是Hbase索引表的結構。

 

因爲HBase自己沒有二級索引(Secondary Index)機制,基於索引檢索數據只能單純地依靠RowKey,爲了能支持多條件查詢,開發者須要將全部可能做爲查詢條件的字段一一拼接到RowKey中,這是HBase開發中極爲常見的作法

 

好比,如今有一張1億的用戶信息表,建有出生地和年齡兩個索引,我想獲得一個條件是在杭州出生,年齡爲20歲的按用戶id正序排列前10個的用戶列表。

有一種方案是,系統先掃描出生地爲杭州的索引,獲得一個用戶id結果集,這個集合的規模假設是10萬。而後掃描年齡,規模是5萬,最後merge這些用戶id,去重,排序獲得結果。

這明顯有問題,如何改良?

保證出生地和年齡的結果是排過序的,能夠減小merge的數據量?但Hbase是按row key排序,value是不能排序的。

變通一下——將用戶id冗餘到row key裏?OK,這是一種解決方案了,這個方案的圖示以下:

merge時提取交集就是所須要的列表,順序是靠索引增長了_id,以字典序保證的。

 

2, 按索引查詢種類創建組合索引。

在方案1的場景中,想象一下,若是單索引數量多達10個會怎麼樣?10個索引,就要merge 10次,性能可想而知。

解決這個問題須要參考RDBMS的組合索引實現。

好比出生地和年齡須要同時查詢,此時若是創建一個出生地和年齡的組合索引,查詢時效率會高出merge不少。

固然,這個索引也須要冗餘用戶id,目的是讓結果天然有序。結構圖示以下:

這個方案的優勢是查詢速度很是快,根據查詢條件,只須要到一張表中檢索便可獲得結果list。缺點是若是有多個索引,就要創建多個與查詢條件一一對應的組合索引

 

而索引表的維護若是交給應用客戶端,則無疑增長了應用端開發的負擔

經過協處理器能夠將索引表維護的工做從應用端剝離

 

ü  利用Observer自動維護索引表示例

在社交類應用中,常常須要快速檢索各用戶的關注列表t_guanzhu,同時,又須要反向檢索各類戶的粉絲列表t_fensi,爲了實現這個需求,最佳實踐是創建兩張互爲反向的表:

n  一個表爲正向索引關注表 「t_guanzhu」:

Rowkey: A-B

f1:From

f1:To

n  另外一個表爲反向索引粉絲表:「t_fensi」:

Rowkey: B—A

f1:From

f1:To

插入一條關注信息時,爲了減輕應用端維護反向索引表的負擔,可用Observer協處理器實現:

 

一、編寫自定義RegionServer

public class InverIndexCoprocessor extends BaseRegionObserver {

   

    @Override

    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {

        // set configuration

        Configuration conf = HBaseConfiguration.create();

        // need conf.set...

 

        HTable table = new HTable(conf, "t_fensi");

        Cell fromCell = put.get("f1".getBytes(), "From".getBytes()).get(0);

        Cell toCell = put.get("f1".getBytes(), "To".getBytes()).get(0);

        byte[] valueArray = fromCell.getValue();

        String from = new String(valueArray);

        valueArray = toCell.getValue();

        String to = new String(valueArray);

       

        Put putIndex = new Put((to+"-"+from).getBytes());

        putIndex.add("f1".getBytes(), "From".getBytes(),from.getBytes());

        putIndex.add("f1".getBytes(), "To".getBytes(),to.getBytes());

       

        table.put(putIndex);

        table.close();

 

    }

}

 

二、打成jar包「fensiguanzhu.jar」上傳hdfs

hadoop fs -put fensiguanzhu.jar /demo/

 

三、修改t_fensi的schema,註冊協處理器

hbase(main):017:0> alter ' t_fensi ',METHOD => 'table_att','coprocessor'=>'hdfs://spark01:9000/demo/ fensiguanzhu.jar|cn.itcast.bigdata.hbasecoprocessor. InverIndexCoprocessor|1001|'

Updating all regions with the new schema...

0/1 regions updated.

1/1 regions updated.

Done.

 

四、檢查是否註冊成功

hbase(main):018:0> describe 'ff'

DESCRIPTION                                                                                           ENABLED                                               

 'ff', {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://spark01:9000/demo/fensiguanzhu.jar|cn.itcast.bi true                                                  

 gdata.hbasecoprocessor.TestCoprocessor|1001|'}, {NAME => 'f1', DATA_BLOCK_ENCODING => 'NONE', BLOOMF                                                       

 ILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0                                                       

 ', TTL => '2147483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', B                                                        

 LOCKCACHE => 'true'}, {NAME => 'f2', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATIO                                                       

 N_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '2147483647', KE                                                       

 EP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}                                                             

1 row(s) in 0.0250 seconds

 

五、向正向索引表中插入數據進行驗證

相關文章
相關標籤/搜索