最近接到一個需求,因爲文件服務器上傳文件後,不一樣節點之間共享文件須要延遲,上游上傳文件後馬上去下載,若是負載到其餘節點上可能會找不到文件,因此使用文件服務器接入nacos根據相同的trace_id路由到一個節點上,這樣保證上傳後馬上下載的請求都能路由到同一個節點上,研究了兩天nacos原生的client發現並無提供相關功能,因而便產生了一個想法,手擼一個客戶端負載。java
要想同一個trace_id須要路由到相同的節點上,首先想到的方法就是利用hash算法,目前經常使用於分佈式系統中負載的哈希算法分爲兩種:node
1.普通hash取模算法
2.一致性hash緩存
普通hash雖然開發起來快,看起來也知足需求,可是當集羣擴容或者縮容的時候,就會形成trace_id的hash結果與以前不一樣,可能不會路由到一個節點上。安全
而一致性hash在擴容縮容時只會影響哈希環中順時針方向的相鄰的節點, 對其餘節點無影響。可是缺點時數據的分佈和節點的位置有關,由於這些節點不是均勻的分佈在哈希環上的,不過根據選擇適當的hash算法也能夠避免這個缺點,讓數據相對均勻的在hash環上分佈。服務器
網上關於一致性hash的討論已經有不少了,在這裏就放一張圖便於你們理解。負載均衡
其餘的不說,先上代碼dom
1.首先建立NacosClient,監聽對應的服務:分佈式
public class NacosClient { //nacos監聽器,處理初始化hash環等邏輯 private Nodelistener nodelistener; //初始化nacosClient,而且監聽服務 public void init() { NamingService naming = null; try { System.out.println(System.getProperty("serveAddr")); naming = NamingFactory.createNamingService(System.getProperty("serveAddr")); //註冊監聽器,當集羣節點變化的時候調用nodelistener處理節點信息 naming.subscribe("test", event -> { if (event instanceof NamingEvent) { nodelistener.handlerChange((NamingEvent)event); } }); } catch (NacosException e) { e.printStackTrace(); } } }
2.建立Nodelistener,主要處理構建hash環等邏輯:post
public class Nodelistener { private List<Instance> servers; //利用treeMap構建hash環 private volatile SortedMap<Long, Instance> sortedMap = new TreeMap<Long, Instance>(); //虛擬節點 private int virtualNodeCount = 100; public synchronized void handlerChange(NamingEvent event) { List<Instance> servers = new ArrayList<Instance>(); event.getInstances().stream().filter(instance -> { return instance.isEnabled() && instance.isHealthy(); }).forEach(instance -> servers.add(instance)); this.onChange(servers); } //每次集羣節點變化時,從新構建hash環 public void onChange(List<Instance> servers) { //只有一個節點的時候這裏暫不考慮,讀者能夠自行處理 if(servers.size() != 1) { SortedMap<Long, Instance> newSortedMap = new TreeMap<Long, Instance>(); for (int i = 0; i < servers.size(); i ++) { for (int j = 0; j < this.virtualNodeCount; j++) { //計算虛擬節點的hash,這裏用到的是MurMurHash,網上還有不少其餘hash實現, //有興趣能夠自行查閱,具體實現細節就不列出了 Long hash = HashUtil.getHash(servers.get(i).getIp() + ":" + servers.get(i).getPort() + j); //把虛擬節點加入hash環 sortedMap.put(hash, servers.get(i)); } } sortedMap = newSortedMap; } this.servers = servers; } //根據傳入的key獲取hash環上順時針到hash環尾部部分全部節點 public Instance getInstance(String str) { Long hash = HashUtil.getHash(str); SortedMap<Long, Instance> map = sortedMap.tailMap(hash); //這裏證實恰好獲取的是尾部,因此返回全部的節點,實際上是獲取第一個節點 if (map.isEmpty()) { map = sortedMap; } return map.get(map.firstKey()); } }
其中,虛擬節點是一致性hash常常用到的,主要是用於解決hash傾斜問題,即節點數比較少時,數據落在hash環上會形成不均衡,下圖即沒有虛擬節點的狀況:
有虛擬節點的狀況,這樣hash環就均勻分割,相應數據落入的區間也會平衡:
3.負載均衡器:
public class LoadBalance { private Nodelistener nodelistener; //只須要簡單的從hash環中獲取第一個節點 public Instance doSelect(String key) { return nodelistener.getInstance(key); } }
測試下結果:
public void test2() { Map<String, Integer> map = new HashMap<String, Integer>(); Random random = new Random(); for (int i = 0; i < 10000; i ++) { String key = String.valueOf(random.nextLong()); Instance instance = loadBalance.doSelect(key); if(!map.containsKey(instance.getIp())) { map.put(instance.getIp(), 0); }else { map.put(instance.getIp(), map.get(instance.getIp()) + 1); } System.out.println("test2 count :" + i); System.out.printf("select IP is :" + instance.getIp()); } System.out.println(map.toString()); }
此處爲了方便就直接用隨機數模擬trace_id,結果以下:
select IP is :127.0.0.0{127.0.0.4=2031, 127.0.0.3=2144, 127.0.0.2=1925, 127.0.0.1=1931, 127.0.0.0=1964}
能夠看到10000次請求被均勻的分佈到了4個節點上。
思考:
1. 本次咱們使用到了treeMap構建hash環,那麼treeMap構建的hash具體的查找效率如何呢?
treeMap是由紅黑樹構成的,其 containsKey(),get(),put(), remove() 方法時間複雜度均爲O(logn),均是對數階,已經算至關不錯了。
2.在Nodelistener 中咱們兩個方法都使用了synchronized 這樣會有什麼影響?
首先由於treeMap是線程不安全的,因此咱們都使用了方法級別的synchronized,因此兩個方法不會同時執行,這樣使用treeMap時,不會形成線程不安全問題,其次能夠保證咱們在獲取hash環中節點的時候,treeMap不會由於節點變化而變化。可是這樣處理的話就會產生一個問題,咱們正在計算trace_id的路由節點時,機器不巧縮容了,treeMap還沒進行更新,恰好路由到的節點時下線的機器,那麼就會訪問失敗,筆者這裏解決這個問題的思路是重試,若是失敗獲取下一個節點,此時文件服務器不一樣節點之間文件已經同步完畢,因此不一樣節點訪問是沒問題的。
public void test(String key) throws InterruptedException { //這裏能夠設置重試次數 for (;;) { Instance instance = loadBalance.doSelect(key); String addr = instance.getIp() + ":" + instance.getPort(); //測試請求 if(post(addr)) { //成功邏輯 ..... break; }else { //等待兩秒,便可以使文件服務器不一樣節點之間同步文件,還能夠等待更新本地hash環 Thread.sleep(2000); //失敗則選取下一個節點 instance = loadBalance.doSelect(key); //此處能夠增長重試次數邏輯和若是重試到hash環上最後一個節點則從新獲取hash環第一個節點邏輯, // 在此就不作論述,讀者能夠自由發揮 continue; } } }
那麼咱們考慮當咱們計算trace_id路由時,正好擴容的狀況,此時treeMap尚未進行更新,狀況以下圖,咱們路由到的節點若是不是圖中標記的受影響區域則不會有影響,若是是圖中受影響的區域計算得出的路由是擴容前的也就是 127.0.0.2-1(真實節點是127.0.0.2),那麼下次相同的trace_id則會路由到新節點,此時會出現同一個trace_id路由到的節點不同的問題,筆者在此處也使用的重試機制。(其實這個地方可使用緩存Key和節點的關係,擴容後關係改變以後再改變圖中受影響的hash環,可是由於trace_id比較特殊,並不適合緩存全部,因此使用了重試機制)
3.每次探知到服務器節點變化的時候都須要從新構建hash環,這樣操做會比較耗時,能夠修改爲每次節點變化只須要改變對應虛擬節點信息,更新本地hash環時間,能夠將onChange方法改造下。
public void onChange(List<Instance> newServers) { //單節點時這裏暫不考慮 if(servers.size() != 1 ) { //TODO .. } Map<String, Instance> oldAddrs = this.servers.stream() .collect(Collectors.toMap(Instance::toInetAddr, instance -> instance)); Map<String, Instance> newAddrs = newServers.stream() .collect(Collectors.toMap(Instance::toInetAddr, instance -> instance)); //remove oldAddrs.forEach((key, value) -> { if (!newAddrs.containsKey(key)) { for(int j = 0; j < virtualNodeCount; j++) { Long hash = HashUtil.getHash( value.toInetAddr() + "&&VM"+ j); sortedMap.remove(hash); } } }); //add newAddrs.forEach((key, value) -> { if (!oldAddrs.containsKey(key)) { for(int j = 0; j < virtualNodeCount; j++) { Long hash = HashUtil.getHash(value.toInetAddr() + "&&VM" + j); sortedMap.put(hash, value); } } }); this.servers = newServers; }