引言:過濾器的類型不少,可是能夠分爲兩大類——比較過濾器,專用過濾器
過濾器的做用是在服務端判斷數據是否知足條件,而後只將知足條件的數據返回給客戶端;java
1、hbase過濾器的分類mysql
一、比較過濾器正則表達式
行鍵過濾器 RowFiltersql
Filter filter1 = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-22")));apache
scan.setFilter(filter1);數組
列族過濾器 FamilyFilteroop
Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("colfam3")));
scan.setFilter(filter1);blog
列過濾器 QualifierFilterip
Filter filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("col-2")));
scan.setFilter(filter1);hadoop
值過濾器 ValueFilter
Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(".4") );
scan.setFilter(filter1);
二、專用過濾器
單列值過濾器 SingleColumnValueFilter ----會返回知足條件的整行
SingleColumnValueFilter filter = new SingleColumnValueFilter(
Bytes.toBytes("colfam1"),
Bytes.toBytes("col-5"),
CompareFilter.CompareOp.NOT_EQUAL,
new SubstringComparator("val-5"));
filter.setFilterIfMissing(true); //若是不設置爲 true,則那些不包含指定 column 的行也會返回
scan.setFilter(filter1);
單列值排除器 SingleColumnValueExcludeFilter -----返回排除了該列的結果 與上面的結果相反
前綴過濾器 PrefixFilter----針對行鍵
Filter filter = new PrefixFilter(Bytes.toBytes("row1"));
scan.setFilter(filter1);
列前綴過濾器 ColumnPrefixFilter
Filter filter = new ColumnPrefixFilter(Bytes.toBytes("qual2"));
scan.setFilter(filter1);
分頁過濾器 PageFilter
代碼實現:
package com.ghgj.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FamilyFilter; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; public class HbasePageDemo { // 聲明靜態配置 static Configuration conf = null; private static final String ZK_CONNECT_STR = "hadoop01:2181,hadoop02:2181,hadoop03:2181,hadoop04:2181,hadoop05:2181"; static { conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", ZK_CONNECT_STR); } public static void main(String[] args) throws Exception { String tableName = "testfilter"; String cfName = "f1"; final byte[] POSTFIX = new byte[] { 0x00 }; HTable table = new HTable(conf, tableName); Filter filter = new PageFilter(3); byte[] lastRow = null; int totalRows = 0; while (true) { Scan scan = new Scan(); scan.setFilter(filter); if(lastRow != null){ //注意這裏添加了 POSTFIX 操做,用來重置掃描邊界 byte[] startRow = Bytes.add(lastRow,POSTFIX); scan.setStartRow(startRow); } ResultScanner scanner = table.getScanner(scan); int localRows = 0; Result result; while((result = scanner.next()) != null){ System.out.println(localRows++ + ":" + result); totalRows ++; lastRow = result.getRow(); } scanner.close(); if(localRows == 0) break; } System.out.println("total rows:" + totalRows); } / ** * 多種過濾條件的使用方法 * @throws Exception */ @Test public void testScan() throws Exception{ HTable table = new HTable(conf, "person".getBytes()); Scan scan = new Scan(Bytes.toBytes("person_zhang_000001"), Bytes.toBytes("person_zhang_000002")); //前綴過濾器----針對行鍵 Filter filter = new PrefixFilter(Bytes.toBytes("person")); //行過濾器 ---針對行鍵 ByteArrayComparable rowComparator = new BinaryComparator(Bytes.toBytes("person_zhang_000001")); RowFilter rf = new RowFilter(CompareOp.LESS_OR_EQUAL, rowComparator); rf = new RowFilter(CompareOp.EQUAL , new SubstringComparator("_2016-12-31_")); //單值過濾器 1 完整匹配字節數組 new SingleColumnValueFilter("base_info".getBytes(), "name".getBytes(), CompareOp.EQUAL, "zhangsan".getBytes()); //單值過濾器 2 匹配正則表達式 ByteArrayComparable comparator = new RegexStringComparator("zhang."); new SingleColumnValueFilter("info".getBytes(), "NAME".getBytes(), CompareOp.EQUAL, comparator); //單值過濾器 3 匹配是否包含子串,大小寫不敏感 comparator = new SubstringComparator("wu"); new SingleColumnValueFilter("info".getBytes(), "NAME".getBytes(), CompareOp.EQUAL, comparator); //鍵值對元數據過濾-----family 過濾----字節數組完整匹配 FamilyFilter ff = new FamilyFilter(CompareOp.EQUAL , new BinaryComparator(Bytes.toBytes("base_info")) //表中不存 在 inf 列族,過濾結果爲空 ); //鍵值對元數據過濾-----family 過濾----字節數組前綴匹配 ff = new FamilyFilter( CompareOp.EQUAL , new BinaryPrefixComparator(Bytes.toBytes("inf")) //表中存在以 inf 打頭的列族 info,過濾結果爲該列族全部行 ); //鍵值對元數據過濾-----qualifier 過濾----字節數組完整匹配 filter = new QualifierFilter( CompareOp.EQUAL , new BinaryComparator(Bytes.toBytes("na")) //表中不存在 na 列,過濾結果爲空 ); filter = new QualifierFilter( CompareOp.EQUAL , new BinaryPrefixComparator(Bytes.toBytes("na")) //表中存在以 na 打頭的列 name,過濾結果爲全部行的該列數據 ); //基於列名(即 Qualifier)前綴過濾數據的 ColumnPrefixFilter filter = new ColumnPrefixFilter("na".getBytes()); //基於列名(即 Qualifier)多個前綴過濾數據的 MultipleColumnPrefixFilter byte[][] prefixes = new byte[][] {Bytes.toBytes("na"), Bytes.toBytes("me")}; filter = new MultipleColumnPrefixFilter(prefixes); //爲查詢設置過濾條件 scan.setFilter(filter); scan.addFamily(Bytes.toBytes("base_info")); //一行 // Result result = table.get(get); //多行的數據 ResultScanner scanner = table.getScanner(scan); for(Result r : scanner){ /** for(KeyValue kv : r.list()){ String family = new String(kv.getFamily()); System.out.println(family); String qualifier = new String(kv.getQualifier()); System.out.println(qualifier); System.out.println(new String(kv.getValue())); } */ //直接從 result 中取到某個特定的 value byte[] value = r.getValue(Bytes.toBytes("base_info"), Bytes.toBytes("name")); System.out.println(new String(value)); } table.close(); } }
分頁過濾器 代碼實現:
package com.ghgj.hbase.test1610; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.util.Bytes; /** * 501條 * * 每頁100條,求第四頁 : 301 - 400 * * pageIndex:第幾頁 * pageNumber:每頁幾條 * * * 在hbase當中取一部分數據的取法: * scan 'user_info',{COLUMNS => 'base_info:name', * LIMIT => 4, STARTROW => 'zhangsan_20150701_0001'} * * mysqL:從第幾條開始,取多少條 * * 從mysql的分頁規則引伸到hbase的分頁:把startRow轉換成mysql的第幾條 */ public class HBasePageFilterDemo { private static final String ZK_CONNECT_STR = "hadoop03:2181,hadoop04:2181,hadoop05:2181"; private static final String TABLE_NAME = "user_info"; private static final String FAMILY_BASIC = "base_info"; private static final String FAMILY_EXTRA = "extra_info"; private static final String COLUMN_NAME = "name"; private static final String COLUMN_AGE = "age"; private static final String ROW_KEY = "rk0001"; private static Configuration config = null; private static HTable table = null; static { config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", ZK_CONNECT_STR); try { table = new HTable(config, TABLE_NAME); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { // ResultScanner pageData = getPageData("zhangsan_20150701_0001", 4); ResultScanner pageData = getPageData(2, 4); HBasePrintUtil.printResultScanner(pageData); // String lastRowkey = getLastRowkey(pageData); // System.out.println(lastRowkey); } public static ResultScanner getPageData(int pageIndex, int pageNumber) throws Exception{ // 怎麼把pageIndex 轉換成 startRow String startRow = null; if(pageIndex == 1){ // 當客戶方法只取第一頁的分頁數據時, ResultScanner pageData = getPageData(startRow, pageNumber); return pageData; }else{ ResultScanner newPageData = null; for(int i=0; i<pageIndex - 1; i++){ // 總共循環次數是比你取的頁數少1 newPageData = getPageData(startRow, pageNumber); startRow = getLastRowkey(newPageData); byte[] add = Bytes.add(Bytes.toBytes(startRow), new byte[]{ 0X00 }); startRow = Bytes.toString(add); } newPageData = getPageData(startRow, pageNumber); return newPageData; } } /** * @param startRow * @param pageNumber * @return * * scan 'user_info',{COLUMNS => 'base_info:name', * LIMIT => 4, STARTROW => 'zhangsan_20150701_0001'} * @throws Exception */ public static ResultScanner getPageData(String startRow, int pageNumber) throws Exception{ Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("name")); // 設置當前查詢的其實位置 if(!StringUtils.isBlank(startRow)){ scan.setStartRow(Bytes.toBytes(startRow)); } // 第二個參數 Filter pageFilter = new PageFilter(pageNumber); scan.setFilter(pageFilter); ResultScanner rs = table.getScanner(scan); return rs; } public static String getLastRowkey(ResultScanner rs){ String lastRowkey = null; for(Result result : rs){ // System.out.println(result.getRow()); lastRowkey = Bytes.toString(result.getRow()); } return lastRowkey; // return null; } }
多條件過濾時,能夠使用FilterList
List<Filter> filters = new ArrayList<Filter>(); SingleColumnValueFilter filter =new SingleColumnValueFilter( Bytes.toBytes("info"), Bytes.toBytes("age"), CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("20"))); filters.add(filter); SingleColumnValueFilter filter1 =new SingleColumnValueFilter( Bytes.toBytes("info"), Bytes.toBytes("age"), CompareOp.GREATER, new BinaryComparator(Bytes.toBytes("18"))); filters.add(filter1); Filter filter2 = new ValueFilter(CompareOp.EQUAL, new SubstringComparator("lisi") ); filters.add(filter2); FilterList f=new FilterList(filters); scan.setFilter(f);