本身動手實現nacos客戶端一致性hash負載

        最近接到一個需求,因爲文件服務器上傳文件後,不一樣節點之間共享文件須要延遲,上游上傳文件後馬上去下載,若是負載到其餘節點上可能會找不到文件,因此使用文件服務器接入nacos根據相同的trace_id路由到一個節點上,這樣保證上傳後馬上下載的請求都能路由到同一個節點上,研究了兩天nacos原生的client發現並無提供相關功能,因而便產生了一個想法,手擼一個客戶端負載。java

        要想同一個trace_id須要路由到相同的節點上,首先想到的方法就是利用hash算法,目前經常使用於分佈式系統中負載的哈希算法分爲兩種:node

        1.普通hash取模算法

        2.一致性hash緩存

普通hash雖然開發起來快,看起來也知足需求,可是當集羣擴容或者縮容的時候,就會形成trace_id的hash結果與以前不一樣,可能不會路由到一個節點上。安全

而一致性hash在擴容縮容時只會影響哈希環中順時針方向的相鄰的節點, 對其餘節點無影響。可是缺點時數據的分佈和節點的位置有關,由於這些節點不是均勻的分佈在哈希環上的,不過根據選擇適當的hash算法也能夠避免這個缺點,讓數據相對均勻的在hash環上分佈。服務器

網上關於一致性hash的討論已經有不少了,在這裏就放一張圖便於你們理解。負載均衡

其餘的不說,先上代碼dom

1.首先建立NacosClient,監聽對應的服務:分佈式

public class NacosClient {

    //nacos監聽器,處理初始化hash環等邏輯
    private Nodelistener nodelistener;

    //初始化nacosClient,而且監聽服務
    public void init() {
        NamingService naming = null;
        try {
            System.out.println(System.getProperty("serveAddr"));
            naming = NamingFactory.createNamingService(System.getProperty("serveAddr"));
            //註冊監聽器,當集羣節點變化的時候調用nodelistener處理節點信息
            naming.subscribe("test", event -> {
                if (event instanceof NamingEvent) {
                    nodelistener.handlerChange((NamingEvent)event);
                }
            });
        } catch (NacosException e) {
            e.printStackTrace();
        }
    }
}

2.建立Nodelistener,主要處理構建hash環等邏輯:post

public class Nodelistener {

    private List<Instance> servers;
    //利用treeMap構建hash環
    private volatile SortedMap<Long, Instance> sortedMap = new TreeMap<Long, Instance>();
    //虛擬節點
    private int virtualNodeCount = 100;

    public synchronized void handlerChange(NamingEvent event) {
            List<Instance> servers = new ArrayList<Instance>();
            event.getInstances().stream().filter(instance -> {
                return instance.isEnabled() && instance.isHealthy();
            }).forEach(instance -> servers.add(instance));
            this.onChange(servers);
    }


    //每次集羣節點變化時,從新構建hash環
    public void onChange(List<Instance> servers) {
        //只有一個節點的時候這裏暫不考慮,讀者能夠自行處理
        if(servers.size() != 1) {
            SortedMap<Long, Instance> newSortedMap = new TreeMap<Long, Instance>();
            for (int i = 0; i < servers.size(); i ++) {
                for (int j = 0; j < this.virtualNodeCount; j++) {
                    //計算虛擬節點的hash,這裏用到的是MurMurHash,網上還有不少其餘hash實現,
                    //有興趣能夠自行查閱,具體實現細節就不列出了
                    Long hash = HashUtil.getHash(servers.get(i).getIp() + ":" + 
                                                                     servers.get(i).getPort() + j);
                    //把虛擬節點加入hash環
                    sortedMap.put(hash, servers.get(i));
                }
            }
            sortedMap = newSortedMap;
        }
        this.servers = servers;
    }

    //根據傳入的key獲取hash環上順時針到hash環尾部部分全部節點
    public Instance getInstance(String str) {
        Long hash = HashUtil.getHash(str);
        SortedMap<Long, Instance> map = sortedMap.tailMap(hash);
        //這裏證實恰好獲取的是尾部,因此返回全部的節點,實際上是獲取第一個節點
        if (map.isEmpty()) {
            map = sortedMap;
        }
        return map.get(map.firstKey());
    }
}

其中,虛擬節點是一致性hash常常用到的,主要是用於解決hash傾斜問題,即節點數比較少時,數據落在hash環上會形成不均衡,下圖即沒有虛擬節點的狀況:

有虛擬節點的狀況,這樣hash環就均勻分割,相應數據落入的區間也會平衡:

 

3.負載均衡器:

public class LoadBalance {

    private Nodelistener nodelistener;

    //只須要簡單的從hash環中獲取第一個節點
    public Instance doSelect(String key) {
        return nodelistener.getInstance(key);
    }
}

 

測試下結果:

public void test2() {
        Map<String, Integer> map = new HashMap<String, Integer>();
        Random random = new Random();
        for (int i = 0; i < 10000; i ++) {
            String key = String.valueOf(random.nextLong());
            Instance instance = loadBalance.doSelect(key);
            if(!map.containsKey(instance.getIp())) {
                map.put(instance.getIp(), 0);
            }else {
                map.put(instance.getIp(), map.get(instance.getIp()) + 1);
            }
            System.out.println("test2 count :" + i);
            System.out.printf("select IP is :" + instance.getIp());
        }
        System.out.println(map.toString());
    }

此處爲了方便就直接用隨機數模擬trace_id,結果以下:

select IP is :127.0.0.0{127.0.0.4=2031, 127.0.0.3=2144, 127.0.0.2=1925, 127.0.0.1=1931, 127.0.0.0=1964}

能夠看到10000次請求被均勻的分佈到了4個節點上。

思考:

1. 本次咱們使用到了treeMap構建hash環,那麼treeMap構建的hash具體的查找效率如何呢?

          treeMap是由紅黑樹構成的,其 containsKey(),get(),put(), remove() 方法時間複雜度均爲O(logn),均是對數階,已經算至關不錯了。

2.在Nodelistener 中咱們兩個方法都使用了synchronized 這樣會有什麼影響?

          首先由於treeMap是線程不安全的,因此咱們都使用了方法級別的synchronized,因此兩個方法不會同時執行,這樣使用treeMap時,不會形成線程不安全問題,其次能夠保證咱們在獲取hash環中節點的時候,treeMap不會由於節點變化而變化。可是這樣處理的話就會產生一個問題,咱們正在計算trace_id的路由節點時,機器不巧縮容了,treeMap還沒進行更新,恰好路由到的節點時下線的機器,那麼就會訪問失敗,筆者這裏解決這個問題的思路是重試,若是失敗獲取下一個節點,此時文件服務器不一樣節點之間文件已經同步完畢,因此不一樣節點訪問是沒問題的。

public void test(String key) throws InterruptedException {
        //這裏能夠設置重試次數
        for (;;) {
            Instance instance = loadBalance.doSelect(key);
            String addr = instance.getIp() + ":" + instance.getPort();
            //測試請求
            if(post(addr)) {
                //成功邏輯
                .....
                break;
            }else {
                //等待兩秒,便可以使文件服務器不一樣節點之間同步文件,還能夠等待更新本地hash環
                Thread.sleep(2000);
                //失敗則選取下一個節點
                instance = loadBalance.doSelect(key);
                //此處能夠增長重試次數邏輯和若是重試到hash環上最後一個節點則從新獲取hash環第一個節點邏輯,
                // 在此就不作論述,讀者能夠自由發揮
                continue;
            }
        }

    }

         那麼咱們考慮當咱們計算trace_id路由時,正好擴容的狀況,此時treeMap尚未進行更新,狀況以下圖,咱們路由到的節點若是不是圖中標記的受影響區域則不會有影響,若是是圖中受影響的區域計算得出的路由是擴容前的也就是 127.0.0.2-1(真實節點是127.0.0.2),那麼下次相同的trace_id則會路由到新節點,此時會出現同一個trace_id路由到的節點不同的問題,筆者在此處也使用的重試機制。(其實這個地方可使用緩存Key和節點的關係,擴容後關係改變以後再改變圖中受影響的hash環,可是由於trace_id比較特殊,並不適合緩存全部,因此使用了重試機制)

3.每次探知到服務器節點變化的時候都須要從新構建hash環,這樣操做會比較耗時,能夠修改爲每次節點變化只須要改變對應虛擬節點信息,更新本地hash環時間,能夠將onChange方法改造下。

public void onChange(List<Instance> newServers) {
        //單節點時這裏暫不考慮
        if(servers.size() != 1 ) {
            //TODO ..
        }
        Map<String, Instance> oldAddrs = 
               this.servers.stream()
                           .collect(Collectors.toMap(Instance::toInetAddr, instance -> instance));
        Map<String, Instance> newAddrs = 
               newServers.stream()
                           .collect(Collectors.toMap(Instance::toInetAddr, instance -> instance));
        //remove
        oldAddrs.forEach((key, value) -> {
            if (!newAddrs.containsKey(key)) {
                for(int j = 0; j < virtualNodeCount; j++) {
                    Long hash = HashUtil.getHash( value.toInetAddr() + "&&VM"+ j);
                    sortedMap.remove(hash);
                }
            }
        });
        //add
        newAddrs.forEach((key, value) -> {
            if (!oldAddrs.containsKey(key)) {
                for(int j = 0; j < virtualNodeCount; j++) {
                    Long hash = HashUtil.getHash(value.toInetAddr() + "&&VM" + j);
                    sortedMap.put(hash, value);
                }
            }
        });
        this.servers = newServers;
    }
相關文章
相關標籤/搜索