Hbase多列範圍查找(效率)

Hbase索引表的結構

Hbase Rowkey 設計

Hbase Filter

Hbase二級索引

 

Hbase索引表的結構

  在HBase中,表格的Rowkey按照字典排序,Region按照RowKey設置split point進行shard,經過這種方式實現的全局、分佈式索引,成爲了其成功的最大的砝碼html

  每個索引創建一個表,而後依靠表的row key來實現範圍檢索。row key在HBase中是以B+ tree結構化有序存儲的,因此scan起來會比較效率。
單表以row key存儲索引,column value存儲id值或其餘數據 ,這就是Hbase索引表的結構。java

  Hbase QualifierFilter用於過濾qualifier,也就是一個列族裏面data:xxx,冒號後面的字符串git

Hbase Rowkey 設計

  大數據最好從rowkey入手,ColumnValueFilter的數度是很慢的,hbase查詢速度仍是要依靠rowkey,因此根據業務邏輯把rowkey設計好,以後全部的查詢都經過rowkey,是會很是快。 批量查詢最好是用 scan的startkey endkey來作查詢條件github

  rowkey是hbase中很重要的一個設計,若是你把它當成普通字段那你的設計就有點失敗了。它的設計能夠說是一門藝術。你的查詢若是不能把rowkey加入進來,那你的設計基本是失敗的。加上rowkey,hbase能夠快速地定位到具體的region去取你要的數據,不然就會滿上遍野的找數據。apache

設計原則:緩存

1. 長度越短越好安全

  Rowkey是一個二進制碼流,Rowkey的長度被不少開發者建議說設計在10~100個字節,不過建議是越短越好,不要超過16個字節。負載均衡

  緣由以下:分佈式

  (1)數據的持久化文件HFile中是按照KeyValue存儲的,若是Rowkey過長好比100個字節,1000萬列數據光Rowkey就要佔用100*1000萬=10億個字節,將近1G數據,這會極大影響HFile的存儲效率;ide

  (2)MemStore將緩存部分數據到內存,若是Rowkey字段過長內存的有效利用率會下降,系統將沒法緩存更多的數據,這會下降檢索效率。所以Rowkey的字節長度越短越好。

  (3)目前操做系統是都是64位系統,內存8字節對齊。控制在16個字節,8字節的整數倍利用操做系統的最佳特性。

2. 散列原則:若是Rowkey是按時間戳的方式遞增,不要將時間放在二進制碼的前面,建議將Rowkey的高位做爲散列字段,由程序循環生成,低位放時間字段,這樣將提升數據均衡分佈在每一個Regionserver實現負載均衡的概率。若是沒有散列字段,首字段直接是時間信息將產生全部新數據都在一個 RegionServer上堆積的熱點現象,這樣在作數據檢索的時候負載將會集中在個別RegionServer,下降查詢效率。

3. 惟一性

 

HBase按指定的條件獲取一批記錄時,使用的就是scan方法。 scan方法有如下特色:

(1)scan能夠經過setCaching與setBatch方法提升速度(以空間換時間);

(2)scan能夠經過setStartRow與setEndRow來限定範圍。範圍越小,性能越高。

經過巧妙的RowKey設計使咱們批量獲取記錄集合中的元素挨在一塊兒(應該在同一個Region下),能夠在遍歷結果時得到很好的性能。

(3)scan能夠經過setFilter方法添加過濾器,這也是分頁、多條件查詢的基礎。

 

設計RowKey時能夠這樣作:採用 UserID + CreateTime + FileID組成RowKey。

須要注意如下幾點:

(1)每條記錄的RowKey,每一個字段都須要填充到相同長度。假如預期咱們最多有10萬量級的用戶,則userID應該統一填充至6位,如000001,000002…

(2)結尾添加全局惟一的FileID的用意也是使每一個文件對應的記錄全局惟一。避免當UserID與CreateTime相同時的兩個不一樣文件記錄相互覆蓋。

RowKey存儲上述文件記錄,在HBase表中是下面的結構:

rowKey(userID 6 + time 8 + fileID 6) name category ….

00000120120902000001

Hbase Filter

應用實例

//時間範圍的查找, 好比是2012-12-12到2013-01-23日之間的數據
FilterList filter = new FilterList();  
    if (timeFrom != null) {  
        String sDate = String.valueOf(timeFrom.getTime());  
        SingleColumnValueFilter scvf = new SingleColumnValueFilter(Bytes.toBytes("CF"), Bytes.toBytes("Date"), CompareOp.GREATER_OR_EQUAL,  
                Bytes.toBytes(String.valueOf(sDate)));  
        filter.addFilter(scvf);  
    }  
  
    if (timeTo != null) {  
        String sDate = String.valueOf(timeTo.getTime());  
        SingleColumnValueFilter scvf = new SingleColumnValueFilter(Bytes.toBytes("CF"), Bytes.toBytes("Date"), CompareOp.LESS_OR_EQUAL,  
                Bytes.toBytes(String.valueOf(sDate)));  
        filter.addFilter(scvf);  
    }  

HBase(0.96以上版本)過濾器Filter詳解及實例代碼

 

Hbase二級索引

  HBase在0.92以後引入了coprocessors,提供了一系列的鉤子,讓咱們可以輕易實現訪問控制和二級索引的特性。下面簡單介紹下兩種coprocessors,第一種是Observers,它實際相似於觸發器,第二種是Endpoint,它相似與存儲過程。因爲這裏只用到了Observers,因此只介紹Observers,想要更詳細的介紹請查閱(https://blogs.apache.org/hbase/entry/coprocessor_introduction)。observers分爲三種:

RegionObserver:提供數據操做事件鉤子;

WALObserver:提供WAL(write ahead log)相關操做事件鉤子;

MasterObserver:提供DDL操做事件鉤子。

 

在二級索引的實現技術上通常有幾個方案:

1.      表索引

使用單獨的hbase表存儲索引數據,業務表的索引列值作爲索引表的rowkey,業務表的rowkey作爲索引表的qualifier或value。

問題:對數據更新性能影響較大;沒法保證一致性;Client查詢須要2次RPC(先索引表再數據表)。

2.      列索引

與業務表使用相同表,使用單獨列族存儲索引,用戶數據列值作爲索引列族的Qualifier,用戶數據Qualifier作爲索引列族的列值。適用於單行有上百萬Qualifier的數據模型,如網盤應用中網盤ID作爲rowkey,網盤的目錄元數據都存儲在一個hbase row內。(facebook消息模型也是此方案)

可保證事務性

 

爲了實現像SQL同樣檢索數據,select * from table where col=val。針對HBase Secondary Indexing的方案,成爲HBase新版本(0.96)呼聲最高的一項Feature。

 

粗略分析了當前的技術,大概的方案能夠總結爲這樣兩類:

一、使用HBase的coprocessor。CoProcessor至關於HBase的Observer+hook,目前支持MasterObserver、RegionObserver和WALObserver,基本上對於HBase Table的管理、數據的Put、Delete、Get等操做均可以找到對應的pre***和post***。這樣若是須要對於某一項Column創建Secondary Indexing,就能夠在Put、Delete的時候,將其信息更新到另一張索引表中。如圖二所示,對於Indexing裏面的value值是否存儲的問題,能夠根據須要進行控制,若是value的空間開銷不大,逆向的檢索又比較頻繁,能夠直接存儲在Indexing Table中,反之則避免這種狀況。HBase二級索引方案總結

圖2 使用HBase Coprocessor實現Secondary Indexing

二、由客戶端發起對於主表和索引表的Put、Delete操做的雙重操做。源自:http://hadoop-hbase.blogspot.com/2012/10/musings-on-secondary-indexes.html 【牆外】

它具體的作法總結起來有:

  • 設置主表的TTL(Time To Live)比索引表小一點,讓其略早一點消亡。
  • 不要在IndexingTable存儲Value值,即刪除如圖2所示的val列。
  • Put操做時,對於操做的主表的全部列,使用同一的Local TimeStamp的值,更新到Indexing Table,而後使用該TimeStamp插入主表數據。
  • Delete操做時,首先操做主表的數據,而後再去更新Indexing Table的數據。

雖然在這種方案裏沒法保證原子性和一致性,可是經過TimeStamp的設置,No Locks和 No Server-side codes,使其在二級索引上有着較大的優點。至於中間出錯的環節,咱們看看是否能夠容忍:

1)Put索引表成功,Put主表失敗。因爲Indexing Table不存儲val值,仍須要跳轉到Main Table,因此這樣的錯誤至關於拿一個Stale index去訪問對應Rowkey吧了,對結果正確性沒有影響。

2)Delete主表成功,Delete索引表失敗。都是索引表的內容>=主表的內容而已,而實際返回值須要經過主表進行。

 應用場景:

一、主表服務在線業務,它的性能須要保證。使用coprocessor和客戶端的封裝也好,都會影響其性能,因此在正常狀況下,直接操做都不太合適。若是想使用方案二,我卻是感受,能夠調整Indexing Table的操做方式,去除保證其安全性的內容,好比能夠關閉寫HLOG,這樣會進一步減低其操做的延遲。

二、離線更新索引表。在真正須要二級索引的場景內,其時效性要求每每不高。能夠將索引實時更新到Redis等KV系統中,定時從KV更新索引到Hbase的Indexing Table中。PS:Redis裏面有DB設置的概念,能夠按照時間段進行隔離,這樣某段時間內的數據會更新到Redis上,保證Redis導入MapReduce以後仍然能夠進行update操做。

 

coprocessor代碼實現  ??

 

We have been working on implementing secondary index in HBase and open sourced on hbase 0.94.8 version.
The project is available on github.
https://github.com/Huawei-Hadoop/hindex
This Jira is to support secondary index on trunk(0.98).
Following features will be supported.
multiple indexes on table,
multi column index,
index based on part of a column value,
equals and range condition scans using index, and
bulk loading data to indexed table (Indexing done with bulk load)
Most of the kernel changes needed for secondary index is available in trunk. Very minimal changes needed for it.

首先在HBase-0.19.3中必須設置參數,使得Hbase可使用索引,修改$HBASE_INSTALL_DIR/conf/hbase-site.xml:

    <property>
        <name>hbase.regionserver.class</name>
        <value>org.apache.hadoop.hbase.ipc.IndexedRegionInterface</value>
    </property>

    <property>
        <name>hbase.regionserver.impl</name>
        <value>
        org.apache.hadoop.hbase.regionserver.tableindexed.IndexedRegionServer
        </value>
    </property>

(1)建立表時,增長二級索引:

   HBaseConfiguration conf = new HBaseConfiguration();
    conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml"));

    HTableDescriptor desc = new HTableDescriptor("test_table");

    desc.addFamily(new HColumnDescriptor("columnfamily1:"));
    desc.addFamily(new HColumnDescriptor("columnfamily2:"));

    desc.addIndex(new IndexSpecification("column1", 
        Bytes.toBytes("columnfamily1:column1")));
    desc.addIndex(new IndexSpecification("column2", 
        Bytes.toBytes("columnfamily1:column2")));

    IndexedTableAdmin admin = null;
    admin = new IndexedTableAdmin(conf);
    admin.createTable(desc);

(2)在已經存在的表中,增長索引

 HBaseConfiguration conf = new HBaseConfiguration();
    conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml"));

    IndexedTableAdmin admin = null;
    admin = new IndexedTableAdmin(conf);

    admin.addIndex(Bytes.toBytes("test_table"), new IndexSpecification("column2",
    Bytes.toBytes("columnfamily1:column2")));  

(3)刪除存在的索引

HBaseConfiguration conf = new HBaseConfiguration();
    conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml"));

    IndexedTableAdmin admin = null;
    admin = new IndexedTableAdmin(conf);

    admin.removeIndex(Bytes.toBytes("test_table"), "column2");

(4)經過索引scan全部數據

HBaseConfiguration conf = new HBaseConfiguration();
    conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml"));

    IndexedTable table = new IndexedTable(conf, Bytes.toBytes("test_table"));

    // You need to specify which columns to get
    Scanner scanner = table.getIndexedScanner("column1",
        HConstants.EMPTY_START_ROW, null, null, new byte[][] {
        Bytes.toBytes("columnfamily1:column1"),
        Bytes.toBytes("columnfamily1:column2") });

    for (RowResult rowResult : scanner) {
        String value1 = new String(
            rowResult.get(Bytes.toBytes("columnfamily1:column1")).getValue());
        String value2 = new String(
            rowResult.get(Bytes.toBytes("columnfamily1:column2")).getValue());
        System.out.println(value1 + ", " + value2);
    }

    table.close();

 (5)經過索引scan一部分子集,經過ColumnValueFilter過濾。

  使用SingleColumnValueFilter會影響查詢性能,在真正處理海量數據時會消耗很大的資源,且須要較長的時間

  ColumnValueFilter filter = 
        new ColumnValueFilter(Bytes.toBytes("columnfamily1:column1"),
        CompareOp.LESS, Bytes.toBytes("value1-10"));

    scanner = table.getIndexedScanner("column1", HConstants.EMPTY_START_ROW,
        null, filter, new byte[][] { Bytes.toBytes("columnfamily1:column1"),
        Bytes.toBytes("columnfamily1:column2"));

    for (RowResult rowResult : scanner) {
        String value1 = new String(
            rowResult.get(Bytes.toBytes("columnfamily1:column1")).getValue());
        String value2 = new String(
            rowResult.get(Bytes.toBytes("columnfamily1:column2")).getValue());
        System.out.println(value1 + ", " + value2);
    }

通常不建議用Filter,scan.setFilters(),經過filter設置的條件查不到數據時,響應速度很是慢,大概在十幾秒,有時會超時,

但能夠查到數據時,響應速度只有幾百ms,差距很是大

Scan scan = new Scan();
FilterList filters = new FilterList();

for (String[] param : params)
{
//param[0]爲列名,param[1]爲相應的值
filters.addFilter(new SingleColumnValueFilter("INFO".getBytes(), param[0].getBytes(), CompareOp.EQUAL, param[1].getBytes()));
}

scan.setFilter(filters);

(6)一個徹底的例子

import java.io.IOException;
import java.util.Date;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Scanner;
import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification;
import org.apache.hadoop.hbase.client.tableindexed.IndexedTable;
import org.apache.hadoop.hbase.client.tableindexed.IndexedTableAdmin;
import org.apache.hadoop.hbase.filter.ColumnValueFilter;
import org.apache.hadoop.hbase.filter.ColumnValueFilter.CompareOp;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.util.Bytes;

public class SecondaryIndexTest {
    public void writeToTable() throws IOException {
        HBaseConfiguration conf = new HBaseConfiguration();
        conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml"));

        IndexedTable table = new IndexedTable(conf, Bytes.toBytes("test_table"));

        String row = "test_row";
        BatchUpdate update = null;

        for (int i = 0; i < 100; i++) {
            update = new BatchUpdate(row + i);
            update.put("columnfamily1:column1", Bytes.toBytes("value1-" + i));
            update.put("columnfamily1:column2", Bytes.toBytes("value2-" + i));
            table.commit(update);
        }

        table.close();
    }

    public void readAllRowsFromSecondaryIndex() throws IOException {
        HBaseConfiguration conf = new HBaseConfiguration();
        conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml"));

        IndexedTable table = new IndexedTable(conf, Bytes.toBytes("test_table"));

        Scanner scanner = table.getIndexedScanner("column1",
            HConstants.EMPTY_START_ROW, null, null, new byte[][] {
            Bytes.toBytes("columnfamily1:column1"),
                Bytes.toBytes("columnfamily1:column2") });

        for (RowResult rowResult : scanner) {
            System.out.println(Bytes.toString(
                rowResult.get(Bytes.toBytes("columnfamily1:column1")).getValue())
                + ", " + Bytes.toString(rowResult.get(
                Bytes.toBytes("columnfamily1:column2")).getValue()
                ));
        }

        table.close();
    }

    public void readFilteredRowsFromSecondaryIndex() throws IOException {
        HBaseConfiguration conf = new HBaseConfiguration();
        conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml"));

        IndexedTable table = new IndexedTable(conf, Bytes.toBytes("test_table"));

        ColumnValueFilter filter = 
            new ColumnValueFilter(Bytes.toBytes("columnfamily1:column1"), 
            CompareOp.LESS, Bytes.toBytes("value1-40"));

        Scanner scanner = table.getIndexedScanner("column1", 
            HConstants.EMPTY_START_ROW, null, filter, 
            new byte[][] { Bytes.toBytes("columnfamily1:column1"),
                Bytes.toBytes("columnfamily1:column2")
            });

        for (RowResult rowResult : scanner) {
            System.out.println(Bytes.toString(
                rowResult.get(Bytes.toBytes("columnfamily1:column1")).getValue())
                + ", " + Bytes.toString(rowResult.get(
                Bytes.toBytes("columnfamily1:column2")).getValue()
                ));
        }

        table.close();
    }

    public void createTableWithSecondaryIndexes() throws IOException {
        HBaseConfiguration conf = new HBaseConfiguration();
        conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml"));

        HTableDescriptor desc = new HTableDescriptor("test_table");

        desc.addFamily(new HColumnDescriptor("columnfamily1:column1"));
        desc.addFamily(new HColumnDescriptor("columnfamily1:column2"));

        desc.addIndex(new IndexSpecification("column1",
            Bytes.toBytes("columnfamily1:column1")));
        desc.addIndex(new IndexSpecification("column2",
            Bytes.toBytes("columnfamily1:column2")));

        IndexedTableAdmin admin = null;
        admin = new IndexedTableAdmin(conf);

        if (admin.tableExists(Bytes.toBytes("test_table"))) {
            if (admin.isTableEnabled("test_table")) {
                admin.disableTable(Bytes.toBytes("test_table"));
            }

            admin.deleteTable(Bytes.toBytes("test_table"));
        }

        if (admin.tableExists(Bytes.toBytes("test_table-column1"))) {
            if (admin.isTableEnabled("test_table-column1")) {
                admin.disableTable(Bytes.toBytes("test_table-column1"));
            }

            admin.deleteTable(Bytes.toBytes("test_table-column1"));
        }

        admin.createTable(desc);
    }

    public void addSecondaryIndexToExistingTable() throws IOException {
        HBaseConfiguration conf = new HBaseConfiguration();
        conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml"));

        IndexedTableAdmin admin = null;
        admin = new IndexedTableAdmin(conf);

        admin.addIndex(Bytes.toBytes("test_table"), 
            new IndexSpecification("column2", 
            Bytes.toBytes("columnfamily1:column2")));
    }

    public void removeSecondaryIndexToExistingTable() throws IOException {
        HBaseConfiguration conf = new HBaseConfiguration();
        conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml"));

        IndexedTableAdmin admin = null;
        admin = new IndexedTableAdmin(conf);

        admin.removeIndex(Bytes.toBytes("test_table"), "column2");
    }

    public static void main(String[] args) throws IOException {
        SecondaryIndexTest test = new SecondaryIndexTest();

        test.createTableWithSecondaryIndexes();
        test.writeToTable();
        test.addSecondaryIndexToExistingTable();
        test.removeSecondaryIndexToExistingTable();
        test.readAllRowsFromSecondaryIndex();
        test.readFilteredRowsFromSecondaryIndex();

        System.out.println("Done!");
    }
}
相關文章
相關標籤/搜索