本文爲實現分佈式任務調度系統中用到的一些關鍵技術點分享——Consistent Hashing算法原理和Java實現,以及效果測試。(代碼實現見:https://github.com/yaohonv/pingpong/tree/master/consistenthashing)java
一致性Hashing在分佈式系統中常常會被用到, 用於儘量地下降節點變更帶來的數據遷移開銷。Consistent Hashing算法在1997年就在論文Consistenthashing and random trees中被提出。node
先來簡單理解下Hash是解決什麼問題。假設一個分佈式任務調度系統,執行任務的節點有n臺機器,現有m個job在這n臺機器上運行,這m個Job須要逐一映射到n個節點中一個,這時候能夠選擇一種簡單的Hash算法來讓m個Job能夠均勻分佈到n個節點中,好比 hash(Job)%n ,看上去很完美,但考慮以下兩種情形:git
一、2兩種情形能夠看到,基本上全部的Job會被從新分配到跟節點變更前不一樣的節點上,意味着須要遷移幾乎全部正在運行的Job,想一想這樣會給系統帶來多大的複雜性和性能損耗。github
另外還有一種狀況,假設節點的硬件處理性能不徹底一致,想讓性能高的節點多被分配一些Job,這時候上述簡單的Hash映射算法更是很難作到。算法
如何解決這種節點變更帶來的大量數據遷移和數據不均勻分配問題呢?一致性哈希算法就很巧妙的解決了這些問題。緩存
Consistent Hashing是一種Hashing算法,典型的特徵是:在減小或者添加節點時,能夠儘量地保證已經存在Key映射關係不變,儘量地減小Key的遷移。數據結構
給定值空間2^32,[0,2^32]是全部hash值的取值空間,形象地描述爲以下一個環(ring):app
2. 節點向值空間映射dom
將節點Node向這個值空間映射,取Node的Hash值,選取一個能夠固定標識一個Node的屬性值進行Hashing,假設以字符串形式輸入,算法以下:分佈式
能夠取Node標識的md5值,而後截取其中32位做爲映射值。md5取值以下:
private byte[] md5(String value) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } md5.reset(); byte[] bytes; try { bytes = value.getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new IllegalStateException(e.getMessage(), e); } md5.update(bytes); return md5.digest(); }
由於映射值只須要32位便可,因此能夠利用如下方式計算最終值(number取0便可):
private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[0 + number * 4] & 0xFF)) & 0xFFFFFFFFL; }
把n個節點Node經過以上方式取得hash值,映射到環形值空間以下:
算法中,將以有序Map的形式在內存中緩存每一個節點的Hash值對應的物理節點信息。緩存於這個內存變量中:private final TreeMap<Long, String> virtualNodes 。
3. 數據向值空間映射
數據Job取hash的方式跟節點Node的方式如出一轍,可使用上述md5->hash的方式一樣將全部Job取得Hash映射到這個環中。
4. 數據和節點映射
當節點和數據都被映射到這個環上後,能夠設定一個規則把哪些數據hash值放在哪些節點Node Hash值上了,規則就是,沿着順時針方向,數據hash值向後找到第一個Node Hash值即認爲該數據hash值對應的數據映射到該Node上。至此,這一個從數據到節點的映射關係就肯定了。
順時針找下一個Node Hash值算法以下:
public String select(Trigger trigger) { String key = trigger.toString(); byte[] digest = md5(key); String node = sekectForKey(hash(digest, 0)); return node; } private String sekectForKey(long hash) { String node; Long key = hash; if (!virtualNodes.containsKey(key)) { SortedMap<Long, String> tailMap = virtualNodes.tailMap(key); if (tailMap.isEmpty()) { key = virtualNodes.firstKey(); } else { key = tailMap.firstKey(); } } node = virtualNodes.get(key); return node; }
Trigger是對Job一次觸發任務的抽象,這裏可忽略關注,重寫了toString方法返回一個標記一個Job的惟一標誌,計算Hash值,從節點Hash值中按規則尋找。 虛擬節點後續介紹。
接下來就能夠見識下一致性哈希基於這樣的數據結構是如何發揮前文提到的優點的。
1. 節點減小時,看須要遷移的節點狀況
假設Node_1宕掉了,圖中數據對象只有Job_1會被從新映射到Node_k,而其餘Job_x扔保持原有映射關係不變。
2. 節點新增時
假設新增Node_i,圖中數據對象只有Job_k會被從新映射到Node_i上,其餘Job_x一樣保持原有映射關係不變。
上述算法過程,會想到兩個問題,第一,數據對象會不會分佈不均勻,特別是新增節點或者減小節點時;第二,前文提到的若是想讓部分節點多映射到一些數據對象,如何處理。虛擬節點這是解決這個問題。
將一個物理節點虛擬出必定數量的虛擬節點,分散到這個值空間上,須要儘量地隨機分散開。
假設有4個物理節點Node,環上的每一個色塊表明一個虛擬節點涵蓋的hash值區域,每種顏色表明一個物理節點。當物理節點較少時,虛擬節點數須要更高來確保更好的一致性表現。經測試,在物理節點爲個位數時,虛擬節點可設置爲160個,此時可帶來較好的表現(後文會給出測試結果,160*n個總節點數狀況下,若是發生一個節點變更,映射關係變化率基本爲1/n,達到預期)。
具體作算法實現時,已知物理節點,虛擬節點數設置爲160,可將這160*n的節點計算出Hash值,以Hash值爲key,以物理節點標識爲value,以有序Map的形式在內存中緩存,做爲後續計算數據對象對應的物理節點時的查詢數據。代碼以下,virtualNodes中緩存着全部虛擬節點Hash值對應的物理節點信息。
public ConsistentHash(List<String> nodes) { this.virtualNodes = new TreeMap<>(); this.identityHashCode = identityHashCode(nodes); this.replicaNumber = 160; for (String node : nodes) { for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = md5(node.toString() + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualNodes.put(m, node); } } } }
以上詳細介紹了一致性哈希(Consistent Hashing)的算法原理和實現過程,接下來給出一個測試結果:
以10個物理節點,160個虛擬節點,1000個數據對象作測試,10個物理節點時,這1000個數據對象映射結果以下:
減小一個節點前,path_7節點數據對象個數:113
減小一個節點前,path_0節點數據對象個數:84
減小一個節點前,path_6節點數據對象個數:97
減小一個節點前,path_8節點數據對象個數:122
減小一個節點前,path_3節點數據對象個數:102
減小一個節點前,path_2節點數據對象個數:99
減小一個節點前,path_4節點數據對象個數:98
減小一個節點前,path_9節點數據對象個數:102
減小一個節點前,path_1節點數據對象個數:99
減小一個節點前,path_5節點數據對象個數:84
減小一個物理節點path_9,此時9個物理節點,原有1000個數據對象映射狀況以下:
減小一個節點後,path_7節點數據對象個數:132
減小一個節點後,path_6節點數據對象個數:107
減小一個節點後,path_0節點數據對象個數:117
減小一個節點後,path_8節點數據對象個數:134
減小一個節點後,path_3節點數據對象個數:104
減小一個節點後,path_4節點數據對象個數:104
減小一個節點後,path_2節點數據對象個數:115
減小一個節點後,path_5節點數據對象個數:89
減小一個節點後,path_1節點數據對象個數:98
先從數量上對比下每一個物理節點上數據對象的個數變化:
減小一個節點後,path_7節點數據對象個數從113變爲132
減小一個節點後,path_6節點數據對象個數從97變爲107
減小一個節點後,path_0節點數據對象個數從84變爲117
減小一個節點後,path_8節點數據對象個數從122變爲134
減小一個節點後,path_3節點數據對象個數從102變爲104
減小一個節點後,path_4節點數據對象個數從98變爲104
減小一個節點後,path_2節點數據對象個數從99變爲115
減小一個節點後,path_5節點數據對象個數從84變爲89
減小一個節點後,path_1節點數據對象個數從99變爲98
能夠看到基本是均勻變化,如今逐個對比每一個數據對象先後映射到的物理節點,發生變化的數據對象佔比狀況,統計以下:
數據對象遷移比率:0.9%
該結果基本體現出一致性哈希所能帶來的最佳表現,儘量地減小節點變更帶來的數據遷移。
最後附上完整的算法代碼,供你們參照。代碼中數據對象是以Trigger抽象,能夠調整成特定場景的,便可運行測試。 https://github.com/yaohonv/pingpong/tree/master/consistenthashing
package com.cronx.core.common; import com.cronx.core.entity.Trigger; import java.io.UnsupportedEncodingException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Collections; import java.util.List; import java.util.SortedMap; import java.util.TreeMap; /** * Created by echov on 2018/1/9. */ public class ConsistentHash { private final TreeMap<Long, String> virtualNodes; private final int replicaNumber; private final int identityHashCode; private static ConsistentHash consistentHash; public ConsistentHash(List<String> nodes) { this.virtualNodes = new TreeMap<>(); this.identityHashCode = identityHashCode(nodes); this.replicaNumber = 160; for (String node : nodes) { for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = md5(node.toString() + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualNodes.put(m, node); } } } } private static int identityHashCode(List<String> nodes){ Collections.sort(nodes); StringBuilder sb = new StringBuilder(); for (String s: nodes ) { sb.append(s); } return sb.toString().hashCode(); } public static String select(Trigger trigger, List<String> nodes) { int _identityHashCode = identityHashCode(nodes); if (consistentHash == null || consistentHash.identityHashCode != _identityHashCode) { synchronized (ConsistentHash.class) { if (consistentHash == null || consistentHash.identityHashCode != _identityHashCode) { consistentHash = new ConsistentHash(nodes); } } } return consistentHash.select(trigger); } public String select(Trigger trigger) { String key = trigger.toString(); byte[] digest = md5(key); String node = sekectForKey(hash(digest, 0)); return node; } private String sekectForKey(long hash) { String node; Long key = hash; if (!virtualNodes.containsKey(key)) { SortedMap<Long, String> tailMap = virtualNodes.tailMap(key); if (tailMap.isEmpty()) { key = virtualNodes.firstKey(); } else { key = tailMap.firstKey(); } } node = virtualNodes.get(key); return node; } private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[0 + number * 4] & 0xFF)) & 0xFFFFFFFFL; } private byte[] md5(String value) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } md5.reset(); byte[] bytes; try { bytes = value.getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new IllegalStateException(e.getMessage(), e); } md5.update(bytes); return md5.digest(); } }