分佈式緩存設計:一致性Hash算法

緩存做爲數據庫前的一道屏障,它的可用性與緩存命中率都會直接影響到數據庫,因此除了配置主從保證高可用以外還須要設計分佈式緩存來擴充緩存的容量,將數據分佈在多臺機器上若是有一臺不可用了對總體影響也比較小。很少說,你懂我意思的!html

緩存的分佈式集羣和應用服務器不一樣,緩存的分佈式是每臺機器上的緩存數據是不同的,不像應用服務器是無狀態的,每臺機器上的服務是一致的。緩存訪問請求不能夠在緩存服務器集羣中的任意一臺處理,必須找到緩存有須要數據的服務器。因此保證在新上線機器和刪除機器的時候整個集羣中的已經緩存的數據儘量地還被訪問獲得是分佈式緩存系統設計的終極目錄(這句話有點長)。java

如何選擇到正確的服務器須要使用好的路由算法,下面以簡單的餘數Hash算法爲例來選擇服務器:node

首先用緩存數據的key的hash值除以服務器的數目,獲得的餘數就是集羣服務器列表的下標。這樣就能夠定位到正確的服務節點。好比有3臺服務器,node1,node2,node3對應在服務器列表編號爲1,2,3。key爲「Hello」的緩存查詢請求來了以後,計算Hello的Hash值,假如爲23432113,除以3餘數假如是1,那就說明數據在node1上。算法

這種算法可否保證各服務節點保存的數據量大體相同徹底依賴於key的hash值是否夠分散。因此一般不會直接使用java對象的hashCode(),這個方法計算出來的值不夠分散,而是先計算出對象的MD5值而後再調用hashCode()。數據庫

問題來了,假如新增一臺機器,3臺擴展到4臺,這時候的除數變爲4了,以前機器計算出來的餘數都和以前不一致了,也就是說不再能正確命中到緩存服務器,請求就直接到達數據庫,大大增長了數據庫的負載能力。針對這個問題一種解決辦法是在網站訪問量最少的時候擴容服務器,而後經過模擬請求的方法對緩存進行預熱,使得數據在緩存服務器中從新分佈。固然這種方案有點low,另一種方法是改進路由算法,使得新加入的服務器不影響大部分緩存數據的正確命中。目前比較流行的就是一致性Hash算法。緩存

一致性Hash算法大體過程:服務器

1,先構造一個長度爲2^32的整數環(稱爲一致性Hash環,分佈範圍爲[0,2^32-1])數據結構

2,根據節點名稱(例如IP或機器名)的Hash值放置在這個Hash環上。分佈式

3,而後根據須要緩存的數據的key值計算出Hash值,一樣也分佈在Hash環上。函數

4,當取緩存時,根據key算出Hash,而後順時針查找離這個Hash值最近的服務節點,這個緩存值就保存在這個節點中。

一致性Hash算法可使用二叉查找樹來實現,Hash查找的過程實際上就是在二叉樹中查找不小於查找數的最小數值,二叉樹最右邊葉子節點和最左邊的葉子節點相鏈接,就能夠形成環狀的數據結構。在Java中就能夠利用TreeMap輕鬆實現。

二叉查找樹算法參考:http://blog.csdn.net/lcore/article/details/8889176#

如上圖所示,Hash(object1)=key1;Hash(object2)=key2;Hash(object3)=key3;Hash(object4)=key4;分別分佈在Hash環上。若是有三臺機器,Hash(NODE1) = KEY1;Hash(NODE2) = KEY2;Hash(NODE3) = KEY3;因此就以下圖所示,key1找到的是KEY1也就是NODE1;key2找到的是KEY3也就是NODE3;key3找到的是KEY2也就是NODE2;key4找到的是KEY3也就是NODE3。

這時若是新增了一臺NODE4,其Hash爲KEY4,假如在Hash環上分佈以下,那麼key2再也不找KEY3了,找的是KEY4。

新增機器後,key1,key3,key4找的機器相比以前沒有變化。也就是說這種算法能保證以前緩存的大部分數據還能繼續被命中,優於取餘數的Hash算法。但這個算法仍是有一個問題,就是key2被命中到KEY4上了,NODE3有效數據被減小了,若是緩存數據量至關大的時候,會形成集羣上的各個服務器之間的不平衡。

解決這種不平衡的方式是引入虛擬節點,虛擬節點是根據每一臺真實節點而來的,假如NODE1的ip是10.0.0.1,咱們再也不使用這個IP直接計算Hash值放在Hash環上了。而是使用多個10.0.0.1#一、10.0.0.1#2 ......10.0.0.1#N計算多個Hash值放在Hash環上。這樣就至關於給真實節點擴充了N個虛擬節點。這時再加一臺機器就至關於要往Hash環上放N個節點,這N個節點會均衡分佈在環上。這會給以前每一個key值的指向都會產生變化,也就是至關於把變化的指向擴散給環上的多個節點去承擔了。這樣就保證了算法的平衡性。

不說了,看代碼(Java實現的一致性Hash算法,利用TreeMap):

 

[java]  view plain  copy
 
  1. public class ConsistentHash<T> {  
  2.       
  3.     //自定義hash函數  Md5加密後再取hashcode  
  4.     private final HashFunction hashFunction;  
  5.     //虛擬節點個數  
  6.     private final int numberOfReplicas;  
  7.     //Hash環  
  8.     private final SortedMap<Integer, T> circle = new TreeMap<Integer, T>();  
  9.     //node爲真實服務節點列表  
  10.     public ConsistentHash(HashFunction hashFunction, int numberOfReplicas,  
  11.             Collection<T> nodes) {  
  12.         this.hashFunction = hashFunction;  
  13.         this.numberOfReplicas = numberOfReplicas;  
  14.   
  15.         for (T node : nodes) {  
  16.             add(node);  
  17.         }  
  18.     }  
  19.     /** 
  20.      * 增長一個服務節點  用服務節點名+虛擬節點編號作Hash運算 
  21.      * 在這個例子中每一個服務節點有1000個虛擬節點 
  22.      * @param node 
  23.      */  
  24.     public void add(T node) {  
  25.         for (int i = 0; i < numberOfReplicas; i++) {  
  26.             circle.put(hashFunction.hash(node.toString() + i), node);  
  27.         }  
  28.     }  
  29.     /** 
  30.      * 刪除一個服務節點 將虛擬節點所有刪除 
  31.      * @param node 
  32.      */  
  33.     public void remove(T node) {  
  34.         for (int i = 0; i < numberOfReplicas; i++) {  
  35.             circle.remove(hashFunction.hash(node.toString() + i));  
  36.         }  
  37.     }  
  38.     /** 
  39.      * 根據緩存的key找出其全部緩存服務器 
  40.      * @param key 
  41.      */  
  42.     public T get(Object key) {  
  43.         if (circle.isEmpty()) {  
  44.             return null;  
  45.         }  
  46.         int hash = hashFunction.hash(key);  
  47.         // System.out.println("hash---: " + hash);  
  48.         if (!circle.containsKey(hash)) {  
  49.             SortedMap<Integer, T> tailMap = circle.tailMap(hash);  
  50.             hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();  
  51.         }  
  52.         // System.out.println("hash---: " + hash);  
  53.         return circle.get(hash);  
  54.     }  
  55.   
  56.     public static void main(String[] args) {  
  57.         HashSet<String> set = new HashSet<String>();  
  58.         set.add("A");  
  59.         set.add("B");  
  60.         set.add("C");  
  61.         set.add("D");  
  62.         Map<String, Integer> map = new HashMap<String, Integer>();  
  63.         //定義一個一致性hash對象,有四個節點,分別有1000個虛擬節點  
  64.         ConsistentHash<String> consistentHash = new ConsistentHash<String>(  
  65.                 new HashFunction(), 1000, set);  
  66.         //定義10000條緩存   key爲0到9999  
  67.         int count = 10000;  
  68.         for (int i = 0; i < count; i++) {  
  69.             //根據緩存key去取hash環中對應真實節點名稱  
  70.             String key = consistentHash.get(i);  
  71.             //變量key爲真實的名稱  
  72.             //而後在一個map中開始計數  
  73.             if (map.containsKey(key)) {  
  74.                 map.put(consistentHash.get(i), map.get(key) + 1);  
  75.             } else {  
  76.                 map.put(consistentHash.get(i), 1);  
  77.             }  
  78.         }  
  79.         showServer(map);  
  80.         map.clear();  
  81.         //移除一臺真實節點A  
  82.         consistentHash.remove("A");  
  83.         System.out.println("<<<<  remove server A  >>>>");  
  84.         for (int i = 0; i < count; i++) {  
  85.             String key = consistentHash.get(i);  
  86.             if (map.containsKey(key)) {  
  87.                 map.put(consistentHash.get(i), map.get(key) + 1);  
  88.             } else {  
  89.                 map.put(consistentHash.get(i), 1);  
  90.             }  
  91.         }  
  92.         showServer(map);  
  93.         map.clear();  
  94.         consistentHash.add("E");  
  95.         System.out.println("<<<<  add server E  >>>>");  
  96.         for (int i = 0; i < count; i++) {  
  97.             String key = consistentHash.get(i);  
  98.             if (map.containsKey(key)) {  
  99.                 map.put(consistentHash.get(i), map.get(key) + 1);  
  100.             } else {  
  101.                 map.put(consistentHash.get(i), 1);  
  102.             }  
  103.         }  
  104.         showServer(map);  
  105.         map.clear();  
  106.         consistentHash.add("F");  
  107.         System.out.println("<<<<  add server F,緩存數量加倍  >>>>");  
  108.         count = count * 2;  
  109.         for (int i = 0; i < count; i++) {  
  110.             String key = consistentHash.get(i);  
  111.             if (map.containsKey(key)) {  
  112.                 map.put(consistentHash.get(i), map.get(key) + 1);  
  113.             } else {  
  114.                 map.put(consistentHash.get(i), 1);  
  115.             }  
  116.         }  
  117.         showServer(map);  
  118.     }  
  119.   
  120.     public static void showServer(Map<String, Integer> map) {  
  121.         for (Entry<String, Integer> m : map.entrySet()) {  
  122.             System.out.println("server " + m.getKey() + " : " + m.getValue()  
  123.                     + "個");  
  124.         }  
  125.     }  
  126.       
  127.     static class HashFunction {  
  128.         int hash(Object key) {  
  129.             return Md5Encrypt.md5(key.toString()).hashCode();  
  130.         }  
  131.     }  
  132. }  

代碼中用了的MD5方法以下:

 

 

[java]  view plain  copy
 
  1. public class Md5Encrypt {  
  2.     /** 
  3.      * Used building output as Hex 
  4.      */  
  5.     private static final char[] DIGITS = { '0', '1', '2', '3', '4', '5', '6',  
  6.             '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };  
  7.   
  8.     /** 
  9.      * 對字符串進行MD5加密 
  10.      */  
  11.     public static String md5(String text) {  
  12.         MessageDigest msgDigest = null;  
  13.         try {  
  14.             msgDigest = MessageDigest.getInstance("MD5");  
  15.         } catch (NoSuchAlgorithmException e) {  
  16.             throw new IllegalStateException(  
  17.                     "System doesn't support MD5 algorithm.");  
  18.         }  
  19.         try {  
  20.             msgDigest.update(text.getBytes("utf-8"));  
  21.         } catch (UnsupportedEncodingException e) {  
  22.             throw new IllegalStateException(  
  23.                     "System doesn't support your  EncodingException.");  
  24.         }  
  25.         byte[] bytes = msgDigest.digest();  
  26.         String md5Str = new String(encodeHex(bytes));  
  27.         return md5Str;  
  28.     }  
  29.   
  30.     public static char[] encodeHex(byte[] data) {  
  31.         int l = data.length;  
  32.         char[] out = new char[l << 1];  
  33.         // two characters form the hex value.  
  34.         for (int i = 0, j = 0; i < l; i++) {  
  35.             out[j++] = DIGITS[(0xF0 & data[i]) >>> 4];  
  36.             out[j++] = DIGITS[0x0F & data[i]];  
  37.         }  
  38.         return out;  
  39.     }  
  40. }  


代碼輸出的結果以下,能看得出來每臺服務器均勻地保存數據:

 

 

[html]  view plain  copy
 
    1. server D : 2487個  
    2. server A : 2675個  
    3. server B : 2373個  
    4. server C : 2465個  
    5. <<<<  remove server A  >>>>  
    6. server D : 3417個  
    7. server B : 3148個  
    8. server C : 3435個  
    9. <<<<  add server E  >>>>  
    10. server D : 2486個  
    11. server E : 2522個  
    12. server B : 2460個  
    13. server C : 2532個  
    14. <<<<  add server F,緩存數量加倍  >>>>  
    15. server D : 3994個  
    16. server E : 4102個  
    17. server F : 4014個  
    18. server B : 3937個  
    19. server C : 3953個  
相關文章
相關標籤/搜索