Hbase 分頁設計

hbase 數據獲取方式

  1. 直接根據 rowkey 查找,速度最快
  2. scan,指定 startrowkey、endrowkey 和 limit獲取數據,在 rowkey 設計良好的狀況下,效率也不錯
  3. 全表掃,強烈不推薦這種作法,效率極差,在線業務不用考慮這種方式

hbase 數據排序怎麼作?

我以爲這個分兩種狀況,一是數據量比較少,業務上每次拉取全部的數據,能夠在客戶端作排序,二是數據比較多,須要分頁,這種狀況下客戶端作顯然不合適,由於要從服務器拉取全部數據,排序完成,獲取某一頁,剩餘的數據全都不用,資源損耗比較嚴重,比較推薦作法是充分利用 hbase rowkey 的特性,數據是按照 rowkey 字典序排列的,若是排序字段是不變的,能夠把排序字段加到 rowkey 裏,這樣吐出的數據天然就是有序的。前端

如何排序的字段是能夠變的呢?

假設業務上有個查詢,排序的字段是能夠變得,這樣放到 rowkey 裏就不合適了,由於排序字段變,意味着 rowkey 也會變,不是推薦的作法。這時候考慮的一種實現就是使用 redis 維護一個zset 索引,score 是排序字段,value 是對應記錄的 rowkey,每次排序的字段變了,就去更新 zset 對應的數據。查詢的話就至關於先去 redis 查出 rowkey 列表,而後根據 rowkey 列表去 hbase 批量查。java

產品分頁的方式?

常見的兩種方式,一種是更多這種按鈕,不支持跳轉到某一頁,一種是能夠選擇某個特定的頁進行跳轉。這兩種方式 hbase 在實現上會有區別,下面會分別介紹下。node

更多方式的分頁

這種分頁不支持跳轉到某一頁,只能不斷地下一頁下一頁,使用 hbase 能夠以一種比較簡單的方式實現。由服務端告訴 app 端或者 web 端下一頁請求的參數,假設某一頁獲取20條數據,服務端去獲取21條,第21條數據的 rowkey 就是下一次掃描的 startrowkey,把它加到返回給 web 或者 app 的參數裏,這樣就能夠實現分頁。有人可能會說,假設下一頁的 startrowkey 返回給前端以後,這時候有新的數據插入,不是會有問題嗎?這種狀況其實還好,首先互聯網應用大可能是讀多寫少,你瀏覽某個列表時,列表內容更新的機率原本就小,就算真的發生,數據會按照排序方式插入到列表首,你不刷新首屏內容,僅僅也就是新加的內容沒展示出來,不影響其餘內容的展現,而只要你一刷新首屏,新的內容就出來了。web

直接跳轉到某一頁

假設分頁能夠直接跳轉到某一頁呢?這個用 hbase 實現確認比較尷尬,hbase scan 掃描的時候原本就是根據 rowkey 範圍和 limit 掃描,想到的實現方式依舊是 zset,score 排序字段,value 是 rowkey。也不必定非用 redis,反正就是要有一個地方維護查詢索引。去 hbase 直接根據 rowkey 查詢。redis

 

簡單Hbase分頁方案spring

 

某位仁兄發給的,不知道怎麼樣::::apache

網上大多數分頁方案分爲從服務端分頁或者從客戶端分頁 服務端分頁方式主要利用PageFilter過濾器,首先太複雜,其次針對集羣的兼容性不是很好,做者利用服務端分頁+客戶端分頁結合方式給出一種簡單易行的中間方案。json

Filter pageSize = new PageFilter(pageSize *pageNo);
            List<Result> resultList = new ArrayList<>();
            // 計算起始頁和結束頁
            Integer firstPage = (pageNo) *pageSize;
            Integer endPage = firstPage + pageSize;

            //客戶端分頁
            int i = 0;

            for (Result rs : scanner) {
                if (!rs.isEmpty() && i >= firstPage && i < endPage) {
                    resultList.add(rs);
                }
                if (resultList.size() == log.getPageSize()) {
                    break;
                }
                i++;
            }

 

    public static void mai(String[] args){
            Connection connection = HBaseClientUtil.getConnection();
            table = connection.getTable(TableName.valueOf(ProcessLogUtil.HB_TB_NAME));
            Scan scan = new Scan();
            Filter pageSizeFilter = new PageFilter(pageSize *pageNo);
            scan.setFilter(pageSizeFilter );
            ResultScanner scanner = table.getScanner(scan);
            List<Result> resultList = new ArrayList<>();
            // 計算起始頁和結束頁
            Integer firstPage = (pageNo) *pageSize;
            Integer endPage = firstPage + pageSize;

            //客戶端分頁
            int i = 0;

            for (Result rs : scanner) {
                if (!rs.isEmpty() && i >= firstPage && i < endPage) {
                    resultList.add(rs);
                }
                if (resultList.size() == log.getPageSize()) {
                    break;
                }
                i++;
            }
    }
           

 

package cp.app.service.impl;
 
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
 
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
 
import cp.app.batch.utils.ConfigUtil;
import cp.app.comm.CpConstants;
import cp.app.service.HBaseService;
 
/**
 * HBase查詢與插入操做工具類
 * 
 * @author author
 *
 */
 //採用注入方式,HBaseService爲定義的查詢接口,可不須要。 
@Service
public class HBaseServiceImpl implements HBaseService{
 
    private static Logger log = Logger.getLogger(HBaseServiceImpl.class.getName());
 
    static ConfigUtil util = new ConfigUtil("conf/zookeeper.properties");
    private static final String HBASE_ZOOKEEPER_QUORUM = util
            .getString("hbase_zookeeper_quorum");
    private static final String ZOOKEEPER_ZNODE_PARENT = util
            .getString("zookeeper_znode_parent");
    private static Configuration conf = HBaseConfiguration.create();
    static {
        conf.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM);
        conf.set("zookeeper.znode.parent", ZOOKEEPER_ZNODE_PARENT);
    }
 
    /**
     * 建立表
     * 
     * @param tableName
     *            表名
     * @param columnFamily
     *            列簇集合
     * @return 成功-true 失敗-false
     */
    @SuppressWarnings("resource")
    public boolean createTable(String tableName, List<String> columnFamily) {
        try {
            if (StringUtils.isBlank(tableName) || columnFamily == null
                    || columnFamily.size() < 0) {
                log.error("===Parameters tableName|columnFamily should not be null,Please check!===");
            }
            HBaseAdmin admin = new HBaseAdmin(conf);
            if (admin.tableExists(tableName)) {
                return true;
            } else {
                HTableDescriptor tableDescriptor = new HTableDescriptor(
                        TableName.valueOf(tableName));
                for (String cf : columnFamily) {
                    tableDescriptor.addFamily(new HColumnDescriptor(cf));
                }
                admin.createTable(tableDescriptor);
                log.info("===Create Table " + tableName
                        + " Success!columnFamily:" + columnFamily.toString()
                        + "===");
            }
        } catch (MasterNotRunningException e) {
            // TODO Auto-generated catch block
            log.error(e);
            return false;
        } catch (ZooKeeperConnectionException e) {
            // TODO Auto-generated catch block
            log.error(e);
            return false;
        } catch (IOException e) {
            // TODO Auto-generated catch block
            log.error(e);
            return false;
        }
        return true;
    }
 
    /**
     * 查詢單條記錄
     * 
     * @param tableName
     *            表名
     * @param rowKey
     *            rowKey值
     * @return 返回單條記錄
     */
    public List<Map<String, String>> selectOneByRowKey(String tableName,
            String rowKey) {
        if (StringUtils.isBlank(rowKey) || StringUtils.isBlank(tableName)) {
            log.error("===Parameters tableName|rowKey should not be blank,Please check!===");
            return null;
        }
        List<Map<String, String>> rowList = new ArrayList<Map<String, String>>();
        try {
            Get get = new Get(Bytes.toBytes(rowKey));
            HTableInterface hTable = getHTable(tableName);
            if (hTable != null) {
                Result result = hTable.get(get);
                Map<String, String> cellMap = getRowByResult(result);
                rowList.add(cellMap);
            }
            hTable.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            log.error(e);
        }
        return rowList;
    }
 
    /**
     * 分頁查詢表數據
     * 
     * @param tableName
     *            表名
     * @param ddate
     *            數據日期
     * @param pageSize
     *            頁大小
     * @param lastrowKey
     *            起始rowkey值
     * @return 返回查詢數據結果集
     */
    public List<Map<String, String>> selectAllByPage(String tableName,
            String ddate, int pageSize, String lastrowKey) {
        if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate)
                || StringUtils.isBlank(pageSize + "")
                || StringUtils.isBlank(lastrowKey)) {
            log.error("===Parameters tableName|ddate|pageSize|rowKey should not be blank,Please check!===");
            return null;
        }
        HTable hTable = (HTable) getHTable(tableName);
        Scan scan = new Scan();
        FilterList filterList = new FilterList(
                FilterList.Operator.MUST_PASS_ALL);
        Filter rowFilter1 = new RowFilter(CompareFilter.CompareOp.EQUAL,
                new SubstringComparator(ddate));
        Filter pageFilter = new PageFilter(pageSize);
        filterList.addFilter(rowFilter1);
        filterList.addFilter(pageFilter);
        if (!CpConstants.ROWKEY_FIRST.equals(lastrowKey)) {
            Filter rowFilter2 = new RowFilter(CompareFilter.CompareOp.GREATER,
                    new BinaryComparator(Bytes.toBytes(lastrowKey)));
            filterList.addFilter(rowFilter2);
        }
        scan.setFilter(filterList);
        List<Map<String, String>> lists = new ArrayList<Map<String, String>>();
        try {
            ResultScanner rs = hTable.getScanner(scan);
            for (Result result : rs) {
                lists.add(getRowByResult(result));
            }
            hTable.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            log.error(e);
        }
        return lists;
    }
 
    /**
     * 根據狀態分頁查詢表數據
     * 
     * @param tableName
     *            表名
     * @param ddate
     *            數據日期
     * @param pageSize
     *            頁大小
     * @param lastrowKey
     *            起始rowkey值
     * @param status
     *            發送狀態
     * @return 返回查詢數據結果集
     */
    public List<Map<String, String>> selectAllByPageStatus(String tableName,
            String ddate, int pageSize, String lastrowKey, String status) {
        if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate)
                || StringUtils.isBlank(pageSize + "")
                || StringUtils.isBlank(lastrowKey)) {
            log.error("===Parameters tableName|ddate|pageSize|rowKey should not be blank,Please check!===");
            return null;
        }
        HTable hTable = (HTable) getHTable(tableName);
        Scan scan = new Scan();
        FilterList filterList = new FilterList(
                FilterList.Operator.MUST_PASS_ALL);
        filterList
                .addFilter(new SingleColumnValueFilter(Bytes.toBytes("info"),
                        Bytes.toBytes("status"), CompareOp.EQUAL, Bytes
                                .toBytes(status)));
        Filter rowFilter1 = new RowFilter(CompareFilter.CompareOp.EQUAL,
                new SubstringComparator(ddate));
        Filter pageFilter = new PageFilter(pageSize);
        filterList.addFilter(rowFilter1);
        filterList.addFilter(pageFilter);
        if (!CpConstants.ROWKEY_FIRST.equals(lastrowKey)) {
            Filter rowFilter2 = new RowFilter(CompareFilter.CompareOp.GREATER,
                    new BinaryComparator(Bytes.toBytes(lastrowKey)));
            filterList.addFilter(rowFilter2);
        }
        scan.setFilter(filterList);
        List<Map<String, String>> lists = new ArrayList<Map<String, String>>();
        try {
            ResultScanner rs = hTable.getScanner(scan);
            for (Result result : rs) {
                lists.add(getRowByResult(result));
            }
            hTable.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            log.error(e);
        }
        return lists;
    }
 
    /**
     * 獲取頁數
     * 
     * @param tableName
     *            表名
     * @param ddate
     *            數據日期
     * @param pageSize
     *            分頁大小
     * @return 返回頁數
     */
    public int getPages(String tableName, String ddate, int pageSize) {
        if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate)
                || StringUtils.isBlank(pageSize + "")) {
            log.error("===Parameters tableName|ddate|pageSize should not be blank,Please check!===");
            return 0;
        }
        enableAggregation(tableName);
        int total = 0;
        try {
            HTable hTable = (HTable) getHTable(tableName);
            Scan scan = new Scan();
            Filter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL,
                    new SubstringComparator(ddate));
            scan.setFilter(rowFilter);
            AggregationClient aggregation = new AggregationClient(conf);
            Long count = aggregation.rowCount(hTable,
                    new LongColumnInterpreter(), scan);
            total = count.intValue();
            hTable.close();
        } catch (Throwable e) {
            // TODO Auto-generated catch block
            log.error(e);
        }
        return (total % pageSize == 0) ? total / pageSize
                : (total / pageSize) + 1;
    }
 
    /**
     * 根據發送狀態獲取頁數
     * 
     * @param tableName
     *            表名
     * @param ddate
     *            數據日期
     * @param pageSize
     *            分頁大小
     * @param status
     *            發送狀態
     * @return 返回頁數
     */
    public int getPagesByStatus(String tableName, String ddate, int pageSize,
            String status) {
        if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate)
                || StringUtils.isBlank(pageSize + "")
                || StringUtils.isBlank(status)) {
            log.error("===Parameters tableName|ddate|pageSize|status should not be blank,Please check!===");
            return 0;
        }
        enableAggregation(tableName);
        int total = 0;
        try {
            HTable hTable = (HTable) getHTable(tableName);
            Scan scan = new Scan();
            FilterList filterList = new FilterList(
                    FilterList.Operator.MUST_PASS_ALL);
            Filter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL,
                    new SubstringComparator(ddate));
            filterList.addFilter(rowFilter);
            filterList.addFilter(new SingleColumnValueFilter(Bytes
                    .toBytes("info"), Bytes.toBytes("status"), CompareOp.EQUAL,
                    Bytes.toBytes(status)));
            scan.setFilter(filterList);
            AggregationClient aggregation = new AggregationClient(conf);
            Long count = aggregation.rowCount(hTable,
                    new LongColumnInterpreter(), scan);
            total = count.intValue();
            hTable.close();
        } catch (Throwable e) {
            // TODO Auto-generated catch block
            log.error(e);
        }
        return (total % pageSize == 0) ? total / pageSize
                : (total / pageSize) + 1;
    }
 
    /**
     * 獲取同一個rowkey下的記錄集合
     * 
     * @param result
     *            結果集
     * @return
     */
    private Map<String, String> getRowByResult(Result result) {
        if (result == null) {
            log.error("===Parameter |result| should not be null,Please check!===");
            return null;
        }
        Map<String, String> cellMap = new HashMap<String, String>();
        for (Cell cell : result.listCells()) {
            String rowkey = Bytes.toString(cell.getRowArray(),
                    cell.getRowOffset(), cell.getRowLength());
            String cf = Bytes.toString(cell.getFamilyArray(),
                    cell.getFamilyOffset(), cell.getFamilyLength());
            String qf = Bytes.toString(cell.getQualifierArray(),
                    cell.getQualifierOffset(), cell.getQualifierLength());
            String value = Bytes.toString(cell.getValueArray(),
                    cell.getValueOffset(), cell.getValueLength());
            cellMap.put(CpConstants.HBASE_TABLE_PROP_ROWKEY, rowkey);
            cellMap.put(CpConstants.HBASE_TABLE_PROP_COLUMNFAMILY, cf);
            cellMap.put(qf, value);
        }
        return cellMap;
    }
 
    /**
     * 獲取HTableInterface
     * 
     * @param tableName
     *            表名
     * @return 返回HTableInterface實例
     */
    private HTableInterface getHTable(String tableName) {
        if (StringUtils.isBlank(tableName)) {
            log.error("===Parameter |tableName| should not be blank,Please check!===");
            return null;
        }
        HTableInterface hTable = null;
        try {
            HConnection conn = HConnectionManager.createConnection(conf);
            hTable = conn.getTable(Bytes.toBytes(tableName));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            log.error(e);
            return null;
        }
        return hTable;
    }
 
    /**
     * 批量插入或更新
     * 
     * @param tableName
     *            表名
     * @param paraList
     *            組裝成json或xml後的參數
     * @return 成功-true 失敗-false
     */
    public boolean batchPut(String tableName, List<Map<String, String>> paraList) {
        try {
            List<Put> puts = new ArrayList<Put>();
            for (Map<String, String> map : paraList) {
                Put put = getPutByMap(map);
                puts.add(put);
            }
            HTable hTable = (HTable) getHTable(tableName);
            hTable.put(puts);
            hTable.close();
        } catch (RetriesExhaustedWithDetailsException e) {
            // TODO Auto-generated catch block
            log.error(e);
            return false;
        } catch (InterruptedIOException e) {
            // TODO Auto-generated catch block
            log.error(e);
            return false;
        } catch (IOException e) {
            // TODO Auto-generated catch block
            log.error(e);
            return false;
        }
        return true;
    }
 
    /**
     * 根據map返回put
     * 
     * @param paraMap
     *            參數map
     * @return 返回put
     */
    private Put getPutByMap(Map<String, String> paraMap) {
        if (paraMap == null) {
            log.error("===Parameter |paraMap| should not be null,Please check!===");
            return null;
        }
        Set<Entry<String, String>> set = paraMap.entrySet();
        Iterator<Entry<String, String>> it = set.iterator();
        byte[] rowkey = Bytes.toBytes(paraMap
                .get(CpConstants.HBASE_TABLE_PROP_ROWKEY));
        byte[] columnfamily = Bytes.toBytes(paraMap
                .get(CpConstants.HBASE_TABLE_PROP_COLUMNFAMILY));
        Put put = new Put(rowkey);
        while (it.hasNext()) {
            Entry<String, String> entry = it.next();
            String key = entry.getKey();
            if (!CpConstants.HBASE_TABLE_PROP_ROWKEY.equals(key)
                    && !CpConstants.HBASE_TABLE_PROP_COLUMNFAMILY.equals(key)) {
                String value = entry.getValue();
                put.add(columnfamily, Bytes.toBytes(key), Bytes.toBytes(value));
            }
        }
        return put;
    }
 
    /**
     * 使表具備聚合功能
     * 
     * @param tableName
     *            表名
     */
    @SuppressWarnings("resource")
    private void enableAggregation(String tableName) {
        String coprocessorName = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
        try {
            HBaseAdmin admin = new HBaseAdmin(conf);
            HTableDescriptor htd = admin.getTableDescriptor(Bytes
                    .toBytes(tableName));
            List<String> coprocessors = htd.getCoprocessors();
            if (coprocessors != null && coprocessors.size() > 0) {
                return;
            } else {
                admin.disableTable(tableName);
                htd.addCoprocessor(coprocessorName);
                admin.modifyTable(tableName, htd);
                admin.enableTable(tableName);
            }
        } catch (TableNotFoundException e) {
            // TODO Auto-generated catch block
            log.error(e);
        } catch (MasterNotRunningException e) {
            // TODO Auto-generated catch block
            log.error(e);
        } catch (ZooKeeperConnectionException e) {
            // TODO Auto-generated catch block
            log.error(e);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            log.error(e);
        }
    }
}
相關文章
相關標籤/搜索