一、因爲要存儲消息內容供app端查詢,因此每條消息須要爲對應的每一個用戶生成一條記錄,這樣在用戶量大的狀況下數據是海量的,沒有上限,且不能清理歷史數據 二、讀和寫的比例大體差很少
最終決定採用Hbase存儲,kafka做消息中間件來構建整個系統java
咱們先來看一下Hbase是什麼,爲何要用Hbase?git
HBase是一種構建在HDFS之上的分佈式、面向列的存儲系統。在須要實時讀寫、隨機訪問超大規模數據集時,可使用HBasespring
1.大:一個表能夠有上億行,上百萬列。 2.面向列:面向列表(簇)的存儲和權限控制,列(簇)獨立檢索。 3.稀疏:對於爲空(NULL)的列,並不佔用存儲空間,所以,表能夠設計的很是稀疏。 4.無模式:每一行都有一個能夠排序的主鍵和任意多的列,列能夠根據須要動態增長,同一張表中不一樣的行能夠有大相徑庭的列。 5.數據多版本:每一個單元中的數據能夠有多個版本,默認狀況下,版本號自動分配,版本號就是單元格插入時的時間戳。 6.數據類型單一:HBase中的數據都是字符串,沒有類型。
Table table = connection.getTable(TableName.valueOf("表名")); Put put = new Put("112233bbbcccc".getBytes());// 一個PUT表明一行數據,再NEW一個PUT表示第二行數據,每行一個惟一的ROWKEY,此處rowkey爲put構造方法中傳入的值 put.add("column1".getBytes(), null, "aaa".getBytes());// 本行數據的第一列 put.add("column2".getBytes(), null, "bbb".getBytes());// 本行數據的第三列 put.add("column3".getBytes(), null, "ccc".getBytes());// 本行數據的第三列 table.put(put);// 保存數據
能夠看到若是咱們不封裝Hbase的操做,而直接在系統中使用原生API的話,會有多麻煩, 到此,引出咱們的主題,根據Hbase的特性,搭建一套ORM映射操做方式。apache
首先咱們捋清楚幾個Hbase存儲時候的關鍵點api
一、tableName:表名,須要根據此獲取鏈接 二、family:列簇,建議把常常一塊兒訪問的比較相似的列放在同一個Column Family中,這樣就能夠在訪問少數幾個列時,只讀取儘可能少的數據 三、qualifier:列名,對應列的value 四、timestamp:時間戳
根據特色咱們能夠經過自定義註解來處理這些關鍵屬性,以下:緩存
@HbaseTable(tableName="t_demo") // 列名 public class Demo { @HbaseColumn(family="rowkey", qualifier="rowkey") // rowkey值 private String id; @HbaseColumn(family="demo", qualifier="content") // 列 private String content; @HbaseColumn(family="demo", qualifier="avg") // 列 private String avg; }
HbaseTable:springboot
package com.muheda.notice.hbase; import java.lang.annotation.*; /** * @Author: Sorin * @Descriptions: 自定義註解,用於獲取table * @Date: Created in 2018/3/22 */ @Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE }) @Inherited public @interface HbaseTable { String tableName() default ""; }
HbaseColumn:app
package com.muheda.notice.hbase; import java.lang.annotation.*; /** * @Author: Sorin * @Descriptions: 自定義註解,用於描述字段所屬的 family與qualifier. 也就是hbase的列與列簇 * @Date: Created in 2018/3/22 */ @Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.FIELD }) @Inherited public @interface HbaseColumn { String family() default ""; String qualifier() default ""; boolean timestamp() default false; }
接着,咱們來封裝一個Dao的操做:分佈式
package com.muheda.notice.hbase; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.*; /** * @Author: Sorin * @Descriptions: HBaseDao操做公共類 * @Date: Created in 2018/3/22 */ @Component("hBaseDaoUtil") public class HBaseDaoUtil { protected final org.slf4j.Logger logger = LoggerFactory.getLogger(this.getClass()); // 關閉鏈接 public static void close() { if (HconnectionFactory.connection != null) { try { HconnectionFactory.connection.close(); } catch (IOException e) { e.printStackTrace(); } } } // 獲取tableName public String getORMTable(Object obj) { HbaseTable table = obj.getClass().getAnnotation(HbaseTable.class); return table.tableName(); } /** * @Descripton: 建立表 * @Author: Sorin * @param tableName * @param familyColumn * @Date: 2018/3/22 */ public void createTable(String tableName, Set<String> familyColumn) { TableName tn = TableName.valueOf(tableName); try { Admin admin = HconnectionFactory.admin; HTableDescriptor htd = new HTableDescriptor(tn); for (String fc : familyColumn) { HColumnDescriptor hcd = new HColumnDescriptor(fc); htd.addFamily(hcd); } admin.createTable(htd); } catch (IOException e) { e.printStackTrace(); logger.error("建立"+tableName+"表失敗!", e); } } /** * @Descripton: 刪除表 * @Author: Sorin * @param tableName * @Date: 2018/3/22 */ public void dropTable(String tableName) { TableName tn = TableName.valueOf(tableName); try { Admin admin = HconnectionFactory.admin; admin.disableTable(tn); admin.deleteTable(tn); } catch (IOException e) { e.printStackTrace(); logger.error("刪除"+tableName+"表失敗!"); } } /** * @Descripton: 根據條件過濾查詢 * @Author: Sorin * @param obj * @param param * @Date: 2018/3/26 */ public <T> List<T> queryScan(T obj, Map<String, String> param)throws Exception{ List<T> objs = new ArrayList<T>(); String tableName = getORMTable(obj); if (StringUtils.isBlank(tableName)) { return null; } ResultScanner scanner = null; try { Table table = getTable(tableName); Admin admin = HconnectionFactory.admin; if(!admin.isTableAvailable(TableName.valueOf(tableName))){ return objs; } Scan scan = new Scan(); FilterList filter = new FilterList(); // 從緩存中取family和qualifier,拼裝查詢條件 Map<String, List<Map<String, String>>> tableMaps = HconnectionFactory.TableMaps; List<Map<String, String>> lists = tableMaps.get(tableName); for (Map.Entry<String, String> entry : param.entrySet()){ for (Map<String, String> map : lists) { String family = map.get("family"); String qualifier = map.get("qualifier"); if(qualifier.equals(entry.getKey())){ Filter filterDetail = new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(entry.getKey()), CompareFilter.CompareOp.EQUAL, Bytes.toBytes(entry.getValue())); filter.addFilter(filterDetail); } } } scan.setFilter(filter); scanner = table.getScanner(scan); for (Result result : scanner) { T beanClone = (T)BeanUtils.cloneBean(HBaseBeanUtil.resultToBeanNew(result, obj, tableName)); objs.add(beanClone); } } catch (Exception e) { e.printStackTrace(); logger.error("查詢失敗!"); throw new Exception(e); } finally { scanner.close(); } return objs; } /** * @Descripton: 根據rowkey查詢 * @Author: Sorin * @param obj * @param rowkeys * @Date: 2018/3/22 */ public <T> List<T> get(T obj, String ... rowkeys) { List<T> objs = new ArrayList<T>(); String tableName = getORMTable(obj); if (StringUtils.isBlank(tableName)) { return objs; } try { Admin admin = HconnectionFactory.admin; if(!admin.isTableAvailable(TableName.valueOf(tableName))){ return objs; } List<Result> results = getResults(tableName, rowkeys); if (results.isEmpty()) { return objs; } for (int i = 0; i < results.size(); i++) { T bean = null; Result result = results.get(i); if (result == null || result.isEmpty()) { continue; } bean = HBaseBeanUtil.resultToBeanNew(result, obj, tableName); objs.add(bean); } }catch (Exception e){ e.printStackTrace(); } return objs; } /** * @Descripton: 保存實體對象 * @Author: Sorin * @param objs * @Date: 2018/3/22 */ public <T> boolean save(T ... objs) { List<Put> puts = new ArrayList<Put>(); String tableName = ""; try { for (Object obj : objs) { if (obj == null) { continue; } tableName = getORMTable(obj); Put put = HBaseBeanUtil.beanToPut(obj, tableName); puts.add(put); } }catch (Exception e){ e.printStackTrace(); logger.error("保存Hbase異常!"); } return savePut(puts, tableName); } /** * @Descripton: 根據tableName保存 * @Author: Sorin * @param tableName * @param objs * @Date: 2018/3/22 */ public <T> void save(String tableName, T ... objs){ List<Put> puts = new ArrayList<Put>(); for (Object obj : objs) { if (obj == null) { continue; } try { Put put = HBaseBeanUtil.beanToPut(obj, tableName); puts.add(put); } catch (Exception e) { e.printStackTrace(); logger.warn("", e); } } savePut(puts, tableName); } /** * @Descripton: 刪除 * @Author: Sorin * @param obj * @param rowkeys * @Date: 2018/3/22 */ public <T> void delete(T obj, String... rowkeys) { String tableName = ""; tableName = getORMTable(obj); if (StringUtils.isBlank(tableName)) { return; } List<Delete> deletes = new ArrayList<Delete>(); for (String rowkey : rowkeys) { if (StringUtils.isBlank(rowkey)) { continue; } deletes.add(new Delete(Bytes.toBytes(rowkey))); } delete(deletes, tableName); } /** * @Descripton: 批量刪除 * @Author: Sorin * @param deletes * @param tableName * @Date: 2018/3/22 */ private void delete(List<Delete> deletes, String tableName) { try { Table table = getTable(tableName); if (StringUtils.isBlank(tableName)) { logger.info("tableName爲空!"); return; } table.delete(deletes); } catch (IOException e) { e.printStackTrace(); logger.error("刪除失敗!",e); } } /** * @Descripton: 根據tableName獲取列簇名稱 * @Author: Sorin * @param tableName * @Date: 2018/3/22 */ public List<String> familys(String tableName) { try { Table table = getTable(tableName); List<String> columns = new ArrayList<String>(); if (table==null) { return columns; } HTableDescriptor tableDescriptor = table.getTableDescriptor(); HColumnDescriptor[] columnDescriptors = tableDescriptor.getColumnFamilies(); for (HColumnDescriptor columnDescriptor :columnDescriptors) { String columnName = columnDescriptor.getNameAsString(); columns.add(columnName); } return columns; } catch (Exception e) { e.printStackTrace(); logger.error("查詢列簇名稱失敗!" ,e); } return new ArrayList<String>(); } // 保存方法 private boolean savePut(List<Put> puts, String tableName){ if (StringUtils.isBlank(tableName)) { return false; } try { Table table = getTable(tableName); table.put(puts); return true; }catch (IOException e) { e.printStackTrace(); return false; } } // 獲取查詢結果 private List<Result> getResults(String tableName, String... rowkeys) { List<Result> resultList = new ArrayList<Result>(); List<Get> gets = new ArrayList<Get>(); for (String rowkey : rowkeys) { if (StringUtils.isBlank(rowkey)) { continue; } Get get = new Get(Bytes.toBytes(rowkey)); gets.add(get); } try { Table table = getTable(tableName); Result[] results = table.get(gets); Collections.addAll(resultList, results); return resultList; } catch (Exception e) { e.printStackTrace(); return resultList; } } /** * @Descripton: 根據條件過濾查詢(大於等於) * @Author: Sorin * @param obj * @param param * @Date: 2018/3/26 */ public <T> List<T> queryScanGreater(T obj, Map<String, String> param)throws Exception{ List<T> objs = new ArrayList<T>(); String tableName = getORMTable(obj); if (StringUtils.isBlank(tableName)) { return null; } ResultScanner scanner = null; try { Table table = getTable(tableName); Admin admin = HconnectionFactory.admin; if(!admin.isTableAvailable(TableName.valueOf(tableName))){ return objs; } Scan scan = new Scan(); // 從緩存中取family和qualifier,拼裝查詢條件 Map<String, List<Map<String, String>>> tableMaps = HconnectionFactory.TableMaps; List<Map<String, String>> lists = tableMaps.get(tableName); for (Map.Entry<String, String> entry : param.entrySet()){ for (Map<String, String> map : lists) { String family = map.get("family"); String qualifier = map.get("qualifier"); if(qualifier.equals(entry.getKey())){ Filter filter = new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(entry.getKey()), CompareFilter.CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(entry.getValue())); scan.setFilter(filter); } } } scanner = table.getScanner(scan); for (Result result : scanner) { T beanClone = (T)BeanUtils.cloneBean(HBaseBeanUtil.resultToBeanNew(result, obj, tableName)); objs.add(beanClone); } } catch (Exception e) { e.printStackTrace(); logger.error("查詢失敗!"); throw new Exception(e); }finally { scanner.close(); } return objs; } /** * @Descripton: 分頁查詢數據 * @Author: Sorin * @param obj * @param startrowname * @param pageSize * @Date: 2018/4/25 */ public <T> List<T> queryScanPage(T obj, String startrowname, String pageSize, Map<String, String> param) throws Exception{ List<T> objs = new ArrayList<T>(); String tableName = getORMTable(obj); if (StringUtils.isBlank(tableName)) { return null; } ResultScanner scanner = null; try { Table table = getTable(tableName); Filter filter = new PageFilter(Integer.parseInt(pageSize)); FilterList filterList = new FilterList(); Scan scan = new Scan(Bytes.toBytes(startrowname)); // 從緩存中取family和qualifier,拼裝查詢條件 Map<String, List<Map<String, String>>> tableMaps = HconnectionFactory.TableMaps; List<Map<String, String>> lists = tableMaps.get(tableName); for (Map.Entry<String, String> entry : param.entrySet()){ for (Map<String, String> map : lists) { String family = map.get("family"); String qualifier = map.get("qualifier"); if(qualifier.equals(entry.getKey())){ Filter filterDetail = new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(entry.getKey()), CompareFilter.CompareOp.EQUAL, Bytes.toBytes(entry.getValue())); filterList.addFilter(filterDetail); } } } filterList.addFilter(filter); scan.setFilter(filterList); scan.setReversed(true); scanner = table.getScanner(scan); for (Result result : scanner) { T beanClone = (T)BeanUtils.cloneBean(HBaseBeanUtil.resultToBeanNew(result, obj, tableName)); objs.add(beanClone); } }catch (Exception e){ e.printStackTrace(); logger.error("查詢失敗!"); }finally { scanner.close(); } return objs; } /** * 根據rowkey查詢記錄 * @param obj * @param rowkey "rowkey"開始字符 * @param <T> * @return */ public <T> List<T> queryScanRowkey(T obj, String rowkey){ List<T> objs = new ArrayList<T>(); String tableName = getORMTable(obj); if (StringUtils.isBlank(tableName)) { return null; } ResultScanner scanner = null; try { Table table = getTable(tableName); Scan scan = new Scan(); scan.setRowPrefixFilter(Bytes.toBytes(rowkey)); scanner = table.getScanner(scan); for (Result result : scanner) { T beanClone = (T)BeanUtils.cloneBean(HBaseBeanUtil.resultToBeanNew(result, obj, tableName)); objs.add(beanClone); } }catch (Exception e){ e.printStackTrace(); logger.error("查詢失敗!"); }finally { scanner.close(); } return objs; } /** * 根據rowkey查詢記錄-分頁 * @param obj * @param rowkey "rowkey"開始字符 * @param <T> * @return */ public <T> List<T> queryScanRowkeyPage(T obj, String rowkey){ List<T> objs = new ArrayList<T>(); String tableName = getORMTable(obj); if (StringUtils.isBlank(tableName)) { return null; } ResultScanner scanner = null; FilterList filterList = new FilterList(); Scan scan = new Scan(Bytes.toBytes(rowkey)); try { Table table = getTable(tableName); Filter filterDetail = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(rowkey.getBytes())); filterList.addFilter(filterDetail); scan.setFilter(filterList); scanner = table.getScanner(scan); for (Result result : scanner) { T beanClone = (T)BeanUtils.cloneBean(HBaseBeanUtil.resultToBeanNew(result, obj, tableName)); objs.add(beanClone); } }catch (Exception e){ e.printStackTrace(); logger.error("查詢失敗!"); }finally { scanner.close(); } return objs; } /** * @Descripton: 根據表名獲取鏈接,避免每次操做hbase都獲取鏈接 * @Author: Sorin * @param tableName * @Date: 2018/5/4 */ private Table getTable(String tableName){ Table table = null; try { if("bn_user".equals(tableName)){ table = HconnectionFactory.UserTable; }else if("bn_notice_user".equals(tableName)){ table = HconnectionFactory.NoticeUserTable; }else if("bn_notice".equals(tableName)){ table = HconnectionFactory.NoticeTable; }else if("bn_message".equals(tableName)){ table = HconnectionFactory.MessageTable; }else{ HconnectionFactory.connection.getTable(TableName.valueOf(tableName)); } } catch (IOException e) { e.printStackTrace(); } return table; } }
HbaseBeanUtil:oop
package com.muheda.notice.hbase; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.List; /** * @Author: Sorin * @Descriptions: * @Date: Created in 2018/3/22 */ public class HBaseBeanUtil { private static final Logger logger = LoggerFactory.getLogger(HBaseBeanUtil.class); /** * JavaBean轉換爲Put * @param <T> * @param obj * @return * @throws Exception */ public static <T> Put beanToPut(T obj) throws Exception { Put put = new Put(Bytes.toBytes(parseObjId(obj))); Class<?> clazz = obj.getClass(); Field[] fields = clazz.getDeclaredFields(); for (Field field : fields) { if (!field.isAnnotationPresent(HbaseColumn.class)) { continue; } field.setAccessible(true); HbaseColumn orm = field.getAnnotation(HbaseColumn.class); String family = orm.family(); String qualifier = orm.qualifier(); if (StringUtils.isBlank(family) || StringUtils.isBlank(qualifier)) { continue; } Object fieldObj = field.get(obj); if (fieldObj.getClass().isArray()) { logger.error("nonsupport"); } if ("rowkey".equalsIgnoreCase(qualifier) || "rowkey".equalsIgnoreCase(family)) { continue; } if (field.get(obj) != null || StringUtils.isNotBlank(field.get(obj).toString())) { put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(field.get(obj).toString())); } } return put; } /** * 獲取Bean中的id,做爲Rowkey * @param <T> * * @param obj * @return */ public static <T> String parseObjId(T obj) { Class<?> clazz = obj.getClass(); try { Field field = clazz.getDeclaredField("id"); field.setAccessible(true); Object object = field.get(obj); return object.toString(); } catch (NoSuchFieldException e) { logger.error("", e); } catch (SecurityException e) { logger.error("", e); } catch (IllegalArgumentException e) { logger.error("", e); } catch (IllegalAccessException e) { logger.error("", e); } return ""; } /** * HBase result 轉換爲 bean * @param <T> * @param result * @param obj * @return * @throws Exception */ public static <T> T resultToBean(Result result, T obj) throws Exception { if (result == null) { return null; } Class<?> clazz = obj.getClass(); Field[] fields = clazz.getDeclaredFields(); for (Field field : fields) { if (!field.isAnnotationPresent(HbaseColumn.class)) { continue; } HbaseColumn orm = field.getAnnotation(HbaseColumn.class); String family = orm.family(); String qualifier = orm.qualifier(); boolean timeStamp = orm.timestamp(); if (StringUtils.isBlank(family) || StringUtils.isBlank(qualifier)) { continue; } String fieldName = field.getName(); String value = ""; if ("rowkey".equalsIgnoreCase(family)) { value = new String(result.getRow()); } else { value = getResultValueByType(result, family, qualifier, timeStamp); } String firstLetter = fieldName.substring(0, 1).toUpperCase(); String setMethodName = "set" + firstLetter + fieldName.substring(1); Method setMethod = clazz.getMethod(setMethodName, new Class[] { field.getType() }); setMethod.invoke(obj, new Object[] { value }); } return obj; } /** * @param result * @param family * @param qualifier * @param timeStamp * @return */ private static String getResultValueByType(Result result, String family, String qualifier, boolean timeStamp) { if (!timeStamp) { return new String(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier))); } List<Cell> cells = result.getColumnCells(Bytes.toBytes(family), Bytes.toBytes(qualifier)); if (cells.size() == 1) { Cell cell = cells.get(0); return cell.getTimestamp() + ""; } return ""; } }
至此,HBASE的操做咱們已封裝好了,能夠直接向下面這樣使用:
@Component("demoDao") public class DemoDao { @Autowired private HBaseDaoUtil hBaseDaoUtil; /** * @Descripton: * @Author: Sorin * @param demo * @Date: 2018/3/22 */ public void save(Demo demo) { hBaseDaoUtil.save(demo); } /** * @Descripton: * @Author: Sorin * @param demo * @param id * @Date: 2018/3/22 */ public List<Demo> getById(Demo demo, String id) { return hBaseDaoUtil.get(demo, id); } }