我以爲這個分兩種狀況,一是數據量比較少,業務上每次拉取全部的數據,能夠在客戶端作排序,二是數據比較多,須要分頁,這種狀況下客戶端作顯然不合適,由於要從服務器拉取全部數據,排序完成,獲取某一頁,剩餘的數據全都不用,資源損耗比較嚴重,比較推薦作法是充分利用 hbase rowkey 的特性,數據是按照 rowkey 字典序排列的,若是排序字段是不變的,能夠把排序字段加到 rowkey 裏,這樣吐出的數據天然就是有序的。前端
假設業務上有個查詢,排序的字段是能夠變得,這樣放到 rowkey 裏就不合適了,由於排序字段變,意味着 rowkey 也會變,不是推薦的作法。這時候考慮的一種實現就是使用 redis 維護一個zset 索引,score 是排序字段,value 是對應記錄的 rowkey,每次排序的字段變了,就去更新 zset 對應的數據。查詢的話就至關於先去 redis 查出 rowkey 列表,而後根據 rowkey 列表去 hbase 批量查。java
常見的兩種方式,一種是更多這種按鈕,不支持跳轉到某一頁,一種是能夠選擇某個特定的頁進行跳轉。這兩種方式 hbase 在實現上會有區別,下面會分別介紹下。node
這種分頁不支持跳轉到某一頁,只能不斷地下一頁下一頁,使用 hbase 能夠以一種比較簡單的方式實現。由服務端告訴 app 端或者 web 端下一頁請求的參數,假設某一頁獲取20條數據,服務端去獲取21條,第21條數據的 rowkey 就是下一次掃描的 startrowkey,把它加到返回給 web 或者 app 的參數裏,這樣就能夠實現分頁。有人可能會說,假設下一頁的 startrowkey 返回給前端以後,這時候有新的數據插入,不是會有問題嗎?這種狀況其實還好,首先互聯網應用大可能是讀多寫少,你瀏覽某個列表時,列表內容更新的機率原本就小,就算真的發生,數據會按照排序方式插入到列表首,你不刷新首屏內容,僅僅也就是新加的內容沒展示出來,不影響其餘內容的展現,而只要你一刷新首屏,新的內容就出來了。web
假設分頁能夠直接跳轉到某一頁呢?這個用 hbase 實現確認比較尷尬,hbase scan 掃描的時候原本就是根據 rowkey 範圍和 limit 掃描,想到的實現方式依舊是 zset,score 排序字段,value 是 rowkey。也不必定非用 redis,反正就是要有一個地方維護查詢索引。去 hbase 直接根據 rowkey 查詢。redis
簡單Hbase分頁方案spring
某位仁兄發給的,不知道怎麼樣::::apache
網上大多數分頁方案分爲從服務端分頁或者從客戶端分頁 服務端分頁方式主要利用PageFilter過濾器,首先太複雜,其次針對集羣的兼容性不是很好,做者利用服務端分頁+客戶端分頁結合方式給出一種簡單易行的中間方案。json
Filter pageSize = new PageFilter(pageSize *pageNo);
List<Result> resultList = new ArrayList<>(); // 計算起始頁和結束頁 Integer firstPage = (pageNo) *pageSize; Integer endPage = firstPage + pageSize; //客戶端分頁 int i = 0; for (Result rs : scanner) { if (!rs.isEmpty() && i >= firstPage && i < endPage) { resultList.add(rs); } if (resultList.size() == log.getPageSize()) { break; } i++; }
public static void mai(String[] args){ Connection connection = HBaseClientUtil.getConnection(); table = connection.getTable(TableName.valueOf(ProcessLogUtil.HB_TB_NAME)); Scan scan = new Scan(); Filter pageSizeFilter = new PageFilter(pageSize *pageNo); scan.setFilter(pageSizeFilter ); ResultScanner scanner = table.getScanner(scan); List<Result> resultList = new ArrayList<>(); // 計算起始頁和結束頁 Integer firstPage = (pageNo) *pageSize; Integer endPage = firstPage + pageSize; //客戶端分頁 int i = 0; for (Result rs : scanner) { if (!rs.isEmpty() && i >= firstPage && i < endPage) { resultList.add(rs); } if (resultList.size() == log.getPageSize()) { break; } i++; } }
package cp.app.service.impl; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.AggregationClient; import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; import cp.app.batch.utils.ConfigUtil; import cp.app.comm.CpConstants; import cp.app.service.HBaseService; /** * HBase查詢與插入操做工具類 * * @author author * */ //採用注入方式,HBaseService爲定義的查詢接口,可不須要。 @Service public class HBaseServiceImpl implements HBaseService{ private static Logger log = Logger.getLogger(HBaseServiceImpl.class.getName()); static ConfigUtil util = new ConfigUtil("conf/zookeeper.properties"); private static final String HBASE_ZOOKEEPER_QUORUM = util .getString("hbase_zookeeper_quorum"); private static final String ZOOKEEPER_ZNODE_PARENT = util .getString("zookeeper_znode_parent"); private static Configuration conf = HBaseConfiguration.create(); static { conf.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM); conf.set("zookeeper.znode.parent", ZOOKEEPER_ZNODE_PARENT); } /** * 建立表 * * @param tableName * 表名 * @param columnFamily * 列簇集合 * @return 成功-true 失敗-false */ @SuppressWarnings("resource") public boolean createTable(String tableName, List<String> columnFamily) { try { if (StringUtils.isBlank(tableName) || columnFamily == null || columnFamily.size() < 0) { log.error("===Parameters tableName|columnFamily should not be null,Please check!==="); } HBaseAdmin admin = new HBaseAdmin(conf); if (admin.tableExists(tableName)) { return true; } else { HTableDescriptor tableDescriptor = new HTableDescriptor( TableName.valueOf(tableName)); for (String cf : columnFamily) { tableDescriptor.addFamily(new HColumnDescriptor(cf)); } admin.createTable(tableDescriptor); log.info("===Create Table " + tableName + " Success!columnFamily:" + columnFamily.toString() + "==="); } } catch (MasterNotRunningException e) { // TODO Auto-generated catch block log.error(e); return false; } catch (ZooKeeperConnectionException e) { // TODO Auto-generated catch block log.error(e); return false; } catch (IOException e) { // TODO Auto-generated catch block log.error(e); return false; } return true; } /** * 查詢單條記錄 * * @param tableName * 表名 * @param rowKey * rowKey值 * @return 返回單條記錄 */ public List<Map<String, String>> selectOneByRowKey(String tableName, String rowKey) { if (StringUtils.isBlank(rowKey) || StringUtils.isBlank(tableName)) { log.error("===Parameters tableName|rowKey should not be blank,Please check!==="); return null; } List<Map<String, String>> rowList = new ArrayList<Map<String, String>>(); try { Get get = new Get(Bytes.toBytes(rowKey)); HTableInterface hTable = getHTable(tableName); if (hTable != null) { Result result = hTable.get(get); Map<String, String> cellMap = getRowByResult(result); rowList.add(cellMap); } hTable.close(); } catch (IOException e) { // TODO Auto-generated catch block log.error(e); } return rowList; } /** * 分頁查詢表數據 * * @param tableName * 表名 * @param ddate * 數據日期 * @param pageSize * 頁大小 * @param lastrowKey * 起始rowkey值 * @return 返回查詢數據結果集 */ public List<Map<String, String>> selectAllByPage(String tableName, String ddate, int pageSize, String lastrowKey) { if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate) || StringUtils.isBlank(pageSize + "") || StringUtils.isBlank(lastrowKey)) { log.error("===Parameters tableName|ddate|pageSize|rowKey should not be blank,Please check!==="); return null; } HTable hTable = (HTable) getHTable(tableName); Scan scan = new Scan(); FilterList filterList = new FilterList( FilterList.Operator.MUST_PASS_ALL); Filter rowFilter1 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(ddate)); Filter pageFilter = new PageFilter(pageSize); filterList.addFilter(rowFilter1); filterList.addFilter(pageFilter); if (!CpConstants.ROWKEY_FIRST.equals(lastrowKey)) { Filter rowFilter2 = new RowFilter(CompareFilter.CompareOp.GREATER, new BinaryComparator(Bytes.toBytes(lastrowKey))); filterList.addFilter(rowFilter2); } scan.setFilter(filterList); List<Map<String, String>> lists = new ArrayList<Map<String, String>>(); try { ResultScanner rs = hTable.getScanner(scan); for (Result result : rs) { lists.add(getRowByResult(result)); } hTable.close(); } catch (IOException e) { // TODO Auto-generated catch block log.error(e); } return lists; } /** * 根據狀態分頁查詢表數據 * * @param tableName * 表名 * @param ddate * 數據日期 * @param pageSize * 頁大小 * @param lastrowKey * 起始rowkey值 * @param status * 發送狀態 * @return 返回查詢數據結果集 */ public List<Map<String, String>> selectAllByPageStatus(String tableName, String ddate, int pageSize, String lastrowKey, String status) { if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate) || StringUtils.isBlank(pageSize + "") || StringUtils.isBlank(lastrowKey)) { log.error("===Parameters tableName|ddate|pageSize|rowKey should not be blank,Please check!==="); return null; } HTable hTable = (HTable) getHTable(tableName); Scan scan = new Scan(); FilterList filterList = new FilterList( FilterList.Operator.MUST_PASS_ALL); filterList .addFilter(new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("status"), CompareOp.EQUAL, Bytes .toBytes(status))); Filter rowFilter1 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(ddate)); Filter pageFilter = new PageFilter(pageSize); filterList.addFilter(rowFilter1); filterList.addFilter(pageFilter); if (!CpConstants.ROWKEY_FIRST.equals(lastrowKey)) { Filter rowFilter2 = new RowFilter(CompareFilter.CompareOp.GREATER, new BinaryComparator(Bytes.toBytes(lastrowKey))); filterList.addFilter(rowFilter2); } scan.setFilter(filterList); List<Map<String, String>> lists = new ArrayList<Map<String, String>>(); try { ResultScanner rs = hTable.getScanner(scan); for (Result result : rs) { lists.add(getRowByResult(result)); } hTable.close(); } catch (IOException e) { // TODO Auto-generated catch block log.error(e); } return lists; } /** * 獲取頁數 * * @param tableName * 表名 * @param ddate * 數據日期 * @param pageSize * 分頁大小 * @return 返回頁數 */ public int getPages(String tableName, String ddate, int pageSize) { if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate) || StringUtils.isBlank(pageSize + "")) { log.error("===Parameters tableName|ddate|pageSize should not be blank,Please check!==="); return 0; } enableAggregation(tableName); int total = 0; try { HTable hTable = (HTable) getHTable(tableName); Scan scan = new Scan(); Filter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(ddate)); scan.setFilter(rowFilter); AggregationClient aggregation = new AggregationClient(conf); Long count = aggregation.rowCount(hTable, new LongColumnInterpreter(), scan); total = count.intValue(); hTable.close(); } catch (Throwable e) { // TODO Auto-generated catch block log.error(e); } return (total % pageSize == 0) ? total / pageSize : (total / pageSize) + 1; } /** * 根據發送狀態獲取頁數 * * @param tableName * 表名 * @param ddate * 數據日期 * @param pageSize * 分頁大小 * @param status * 發送狀態 * @return 返回頁數 */ public int getPagesByStatus(String tableName, String ddate, int pageSize, String status) { if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate) || StringUtils.isBlank(pageSize + "") || StringUtils.isBlank(status)) { log.error("===Parameters tableName|ddate|pageSize|status should not be blank,Please check!==="); return 0; } enableAggregation(tableName); int total = 0; try { HTable hTable = (HTable) getHTable(tableName); Scan scan = new Scan(); FilterList filterList = new FilterList( FilterList.Operator.MUST_PASS_ALL); Filter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(ddate)); filterList.addFilter(rowFilter); filterList.addFilter(new SingleColumnValueFilter(Bytes .toBytes("info"), Bytes.toBytes("status"), CompareOp.EQUAL, Bytes.toBytes(status))); scan.setFilter(filterList); AggregationClient aggregation = new AggregationClient(conf); Long count = aggregation.rowCount(hTable, new LongColumnInterpreter(), scan); total = count.intValue(); hTable.close(); } catch (Throwable e) { // TODO Auto-generated catch block log.error(e); } return (total % pageSize == 0) ? total / pageSize : (total / pageSize) + 1; } /** * 獲取同一個rowkey下的記錄集合 * * @param result * 結果集 * @return */ private Map<String, String> getRowByResult(Result result) { if (result == null) { log.error("===Parameter |result| should not be null,Please check!==="); return null; } Map<String, String> cellMap = new HashMap<String, String>(); for (Cell cell : result.listCells()) { String rowkey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); String cf = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); String qf = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); cellMap.put(CpConstants.HBASE_TABLE_PROP_ROWKEY, rowkey); cellMap.put(CpConstants.HBASE_TABLE_PROP_COLUMNFAMILY, cf); cellMap.put(qf, value); } return cellMap; } /** * 獲取HTableInterface * * @param tableName * 表名 * @return 返回HTableInterface實例 */ private HTableInterface getHTable(String tableName) { if (StringUtils.isBlank(tableName)) { log.error("===Parameter |tableName| should not be blank,Please check!==="); return null; } HTableInterface hTable = null; try { HConnection conn = HConnectionManager.createConnection(conf); hTable = conn.getTable(Bytes.toBytes(tableName)); } catch (IOException e) { // TODO Auto-generated catch block log.error(e); return null; } return hTable; } /** * 批量插入或更新 * * @param tableName * 表名 * @param paraList * 組裝成json或xml後的參數 * @return 成功-true 失敗-false */ public boolean batchPut(String tableName, List<Map<String, String>> paraList) { try { List<Put> puts = new ArrayList<Put>(); for (Map<String, String> map : paraList) { Put put = getPutByMap(map); puts.add(put); } HTable hTable = (HTable) getHTable(tableName); hTable.put(puts); hTable.close(); } catch (RetriesExhaustedWithDetailsException e) { // TODO Auto-generated catch block log.error(e); return false; } catch (InterruptedIOException e) { // TODO Auto-generated catch block log.error(e); return false; } catch (IOException e) { // TODO Auto-generated catch block log.error(e); return false; } return true; } /** * 根據map返回put * * @param paraMap * 參數map * @return 返回put */ private Put getPutByMap(Map<String, String> paraMap) { if (paraMap == null) { log.error("===Parameter |paraMap| should not be null,Please check!==="); return null; } Set<Entry<String, String>> set = paraMap.entrySet(); Iterator<Entry<String, String>> it = set.iterator(); byte[] rowkey = Bytes.toBytes(paraMap .get(CpConstants.HBASE_TABLE_PROP_ROWKEY)); byte[] columnfamily = Bytes.toBytes(paraMap .get(CpConstants.HBASE_TABLE_PROP_COLUMNFAMILY)); Put put = new Put(rowkey); while (it.hasNext()) { Entry<String, String> entry = it.next(); String key = entry.getKey(); if (!CpConstants.HBASE_TABLE_PROP_ROWKEY.equals(key) && !CpConstants.HBASE_TABLE_PROP_COLUMNFAMILY.equals(key)) { String value = entry.getValue(); put.add(columnfamily, Bytes.toBytes(key), Bytes.toBytes(value)); } } return put; } /** * 使表具備聚合功能 * * @param tableName * 表名 */ @SuppressWarnings("resource") private void enableAggregation(String tableName) { String coprocessorName = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation"; try { HBaseAdmin admin = new HBaseAdmin(conf); HTableDescriptor htd = admin.getTableDescriptor(Bytes .toBytes(tableName)); List<String> coprocessors = htd.getCoprocessors(); if (coprocessors != null && coprocessors.size() > 0) { return; } else { admin.disableTable(tableName); htd.addCoprocessor(coprocessorName); admin.modifyTable(tableName, htd); admin.enableTable(tableName); } } catch (TableNotFoundException e) { // TODO Auto-generated catch block log.error(e); } catch (MasterNotRunningException e) { // TODO Auto-generated catch block log.error(e); } catch (ZooKeeperConnectionException e) { // TODO Auto-generated catch block log.error(e); } catch (IOException e) { // TODO Auto-generated catch block log.error(e); } } }