最近有一個項目,其中某個功能單表數據在可預估的將來達到了億級,初步估算在90億左右。與同事詳細討論後,決定採用一致性Hash算法來完成數據庫的自動擴容和數據遷移。整個程序細節由我同事完成,我只是將其理解併成文,供有相同問題的同行參考。html
參看此文的兄弟,默認各位已經熟悉一致性hash算法了。此文僅僅闡述代碼細節,實現語言爲Java
。node
項目正式上線初期,數據量不會直接爆發式增加到90億,須要時間上的積累(逐步作實驗),最終可能達到90億數據,甚至超過90億數據。git
按照咱們實際瞭解狀況,oracle存儲數據量達到1千萬的時候,性能擅可。而Oracle官方的說法,如單表存儲1g有分區(大體500萬數據),查詢效率很是高。而試驗表中僅四個字段,每條數據數據量較小。因此咱們最終決定以1000萬爲節點,水平拆表。當表數據達到1千萬時,即增長下一波表。進行數據自動遷移。github
按照90億的總量,1000萬數據一個表的劃分,最終大體會產生900個左右的表。因此咱們最終使用了4個數據庫。1個存儲其餘業務模塊的表,3個存儲此大數據表。每一個數據庫大體有300張表。性能上和數量上均可達到咱們的要求。算法
試驗信息表(EXPERIMENT_MESSAGE),掛接車型和試驗的關係。試驗數據表(EXPERIMENT_DATA),存儲試驗數據數據庫
試驗信息表:數據結構
字段 | 含義 |
---|---|
ID | 主鍵,採用UUID生成 |
EXPERIMENT_ID | 試驗表中的ID |
CAR_ID | 車型表中的ID |
... | 其他數十個字段省略 |
試驗數據表:oracle
字段 | 含義 |
---|---|
ID | 主鍵,採用UUID生成 |
EXPERIMENT_MESSAGE_ID | 對應的實驗信息id |
X_VALUE | 試驗數據X值 |
Y_VALUE | 試驗數據Y值 |
咱們採用做一致性hash的key,就是試驗數據表中的EXPERIMENT_MESSAGE_ID
字段。也就是說,每一個試驗數據表,不存則以,存則一次性大體有6000條數據。取同理。性能
一致性Hash算法的hash部分,採用了著名的ketama算法。在此,咱們很少討論ketama算法的細節,若各位有興趣,請查閱ketama算法大數據
public long hash(String key) { if (md5 == null) { try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException("no md5 algorythm found"); } } md5.reset(); md5.update(key.getBytes()); byte[] bKey = md5.digest(); long res = ((long) (bKey[3] & 0xFF) << 24) | ((long) (bKey[2] & 0xFF) << 16) | ((long) (bKey[1] & 0xFF) << 8) | (long) (bKey[0] & 0xFF); return res & 0xffffffffL; }
有了Hash的算法,接下來就要構造Hash環了。Hash環採用的SortedMap數據結構實現。
private final SortedMap<Long, T> circle = new TreeMap<Long, T>();
其中添加節點和移除節點部分,須要根據hash算法獲得節點在環上的位置,具體代碼以下:
/** * 添加虛擬節點 * numberOfReplicas爲虛擬節點的數量,初始化hash環的時候傳入,咱們使用300個虛擬節點 * @param node */ public void add(T node) { for (int i = 0; i < numberOfReplicas; i++) { circle.put(hashFunction.hash(node.toString() + i), node); } } /** * 移除節點 * @param node */ public void remove(T node) { for (int i = 0; i < numberOfReplicas; i++) { circle.remove(hashFunction.hash(node.toString() + i)); } }
而hash環中獲得節點部分比較特殊,根據一致性hash算法的介紹,獲得hash環中的節點,其實是計算出的hash值順時針找到的第一個節點。
/** * 得到一個最近的順時針節點 * @param key 爲給定鍵取Hash,取得順時針方向上最近的一個虛擬節點對應的實際節點 * @return */ public T get(Object key) { if (circle.isEmpty()) { return null; } long hash = hashFunction.hash((String) key); if (!circle.containsKey(hash)) { //返回此映射的部分視圖,其鍵大於等於 hash SortedMap<Long, T> tailMap = circle.tailMap(hash); hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey(); } return circle.get(hash); }
上面完成了一致性hash算法的實現,包含了hash算法和hash環的實現。接下來就要處理具體業務中,如何使用這個hash環和算法了。
咱們業務中,主要操做這張表的數據,也就是增刪查。而後咱們數據庫拆分紅了3個,因此須要增刪查的操做基本一致,都是先經過一致性hash獲得庫,再經過一致性hash獲得表。
獲取數據庫名的操做以下,獲取到數據庫後,根據數據庫名到對應的鏈接池中獲取鏈接。
/** * 根據試驗信息id獲取其所在庫名 * DatabaseType爲咱們數據的枚舉 * @return 數據庫的名稱 **/ private String getDataBase(String experimentMessageId) { //獲取數據源 DatabaseType[] databasetype = DatabaseType.values(); List<String> dataBaselist = new ArrayList<>(); Map<String, DatabaseType> map = new HashMap<>(); for (DatabaseType d:databasetype) { if (!d.equals(DatabaseType.KC)) { dataBaselist.add(d.toString()); map.put(d.toString(), d); } } //獲取數據源hash ConsistentHash<String> dataBaseCon = getConsistentHash(dataBaselist); //獲取id所在數據源 String dataBase = dataBaseCon.get(experimentMessageId); return dataBase; }
獲取表名的操做以下,獲取到數據庫後,在對應的數據庫中找到須要的表,再從該表中查詢數據。
/** * 根據試驗信息id獲取其試驗數據所在表 * @return **/ public String getTableName(String experimentMessageId) { String dataBase = getDataBase(experimentMessageId); //查詢全部試驗數據表 List<String> tables = experimentDataEODao.queryTbaleNames(dataBase, tableName); ConsistentHash<String> consistentHash = getConsistentHash(tables); String tableName = consistentHash.get(experimentMessageId); return tableName; }
剩下的增刪改操做和日常一致,在此很少贅述。
一致性hash勢必涉及到數據遷移問題,咱們採起的數據遷移方式爲定時任務,針對每一個數據庫在天天夜裏全量掃描一次。檢查是否有數據量超過1000萬的表,若存在這樣的表,就把現有的表數量double。
數據遷移只會在同庫之間遷移,不會涉及跨數據庫的狀況。
此方案爲初步方案,後續會改進的更加智能,根據表的數量,增長不一樣數量的表。而不是簡單的把表數量翻倍。
表建立後,將須要遷移的表數據逐個遷移。
在鏈接到數據源後,咱們作了以下事情進行數據遷移
1.獲取庫中全部的表
List<String> tables = getTables(connection, p, d.toString());
2.遍歷表,檢查表中數據是否超過邊界線(咱們爲1000萬)
for (int i = 0; i < tables.size(); i++) { //查詢表內數據量 int num = countByTableName(connection, p, tables.get(i)); //finalNum爲邊界值,此處爲1000萬 if (num > finalNum) { …… } …… }
3.根據全部的表計算現有的虛擬節點
ConsistentHash<String> consistentHashOld = getConsistentHash(tables);
4.把表加倍
List<String> tablesNew = deepCopy(tables); //注意必定要採用深複製 int tableSize = tablesNew.size(); for (int y = 0; y < tableSize; y++) { String tableNameNew = tableName + (tablesNew.size() + 1); //建立表 createTable(connection, p, d.toString(), tableNameNew); tablesNew.add(tableNameNew); tableDelete.add(tableNameNew); }
5.計算加倍後的虛擬節點
ConsistentHash<String> consistentHashNew = getConsistentHash(tablesNew);
6.數據遷移
for (int z = 0; z < tableSize; z++) { String tableNameOld = tablesNew.get(z); //查詢試驗信息id不重複的試驗數據信息 List<String> disData = selectExperimentIdDis(connection, p, tableNameOld); List<String> deleteList = new LinkedList<>(); for (String experimentId : disData) { //若是數據hash計算 原所在表與新建表以後不一致,執行轉移 if (!consistentHashNew.get(experimentId).equals(consistentHashOld.get(experimentId))) { //新增到新表數據 insertHash(connection, p, experimentId, consistentHashOld.get(experimentId), consistentHashNew.get(experimentId)); //刪除數據集合 deleteList.add(experimentId); //刪除舊錶數據 final int defaultDelNum = 1000; if (deleteList.size() == defaultDelNum) { deleteInbatch(connection, p, deleteList, tableNameOld); deleteList.clear(); } } } //刪除舊錶數據 if (deleteList.size() > 0) { deleteInbatch(connection, p, deleteList, tableNameOld); } }
以上爲咱們所作的一致性hash實踐,其中還存在不少問題,好比遷移過程單線程致使遷移較慢、自動擴容機制不智能、遷移過程當中數據訪問不穩定等狀況。
咱們將會在後續的開發中逐步進行完善改進。
以上就是咱們針對一致性hash在oracle分表中的實踐