模擬Dubbo的zookeeper一致性Hash發現

接以前一篇<手寫zookeeper來模擬dubbo的註冊/發現>,使用一致性Hash來進行查找須要尋找的服務.node

Hash處理接口算法

public interface HashFunc {
    public Long hash(Object key);
}

一致性Hash類緩存

public class ConsistentHash<T> {
    /**
     * Hash計算對象,用於自定義hash算法
     */
    HashFunc hashFunc;
    /**
     * 複製的節點個數
     */
    private int numberOfReplicas;
    /**
     * 一致性Hash環
     */
    private SortedMap<Long, T> circle = new TreeMap();

    /**
     * 構造,使用Java默認的Hash算法
     * @param numberOfReplicas 複製的節點個數,增長每一個節點的複製節點有利於負載均衡
     * @param nodes            節點對象
     */
    public ConsistentHash(int numberOfReplicas, Collection<T> nodes) {
        this.numberOfReplicas = numberOfReplicas;
        this.hashFunc = new HashFunc() {
            public Long hash(Object key) {
//          return fnv1HashingAlg(key.toString());
                return md5HashingAlg(key.toString());
            }
        };
        //初始化節點
        for (T node : nodes) {
            add(node);
        }
    }

    /**
     * 構造
     * @param hashFunc         hash算法對象
     * @param numberOfReplicas 複製的節點個數,增長每一個節點的複製節點有利於負載均衡
     * @param nodes            節點對象
     */
    public ConsistentHash(HashFunc hashFunc, int numberOfReplicas, Collection<T> nodes) {
        this.numberOfReplicas = numberOfReplicas;
        this.hashFunc = hashFunc;
        //初始化節點
        for (T node : nodes) {
            add(node);
        }
    }

    /**
     * 增長節點<br>
     * 每增長一個節點,就會在閉環上增長給定複製節點數<br>
     * 例如複製節點數是2,則每調用此方法一次,增長兩個虛擬節點,這兩個節點指向同一Node
     * 因爲hash算法會調用node的toString方法,故按照toString去重
     *
     * @param node 節點對象
     */
    public void add(T node) {
        for (int i = 0; i < numberOfReplicas; i++) {
            circle.put(hashFunc.hash(node.toString() + i), node);
        }
    }

    /**
     * 移除節點的同時移除相應的虛擬節點
     *
     * @param node 節點對象
     */
    public void remove(T node) {
        for (int i = 0; i < numberOfReplicas; i++) {
            circle.remove(hashFunc.hash(node.toString() + i));
        }
    }

    /**
     * 得到一個最近的順時針節點
     *
     * @param key 爲給定鍵取Hash,取得順時針方向上最近的一個虛擬節點對應的實際節點
     * @return 節點對象
     */
    public T get(Object key) {
        if (circle.isEmpty()) {
            return null;
        }
        long hash = hashFunc.hash(key);
        if (!circle.containsKey(hash)) {
            SortedMap<Long, T> tailMap = circle.tailMap(hash); //返回此映射的部分視圖,其鍵大於等於 hash
            hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
        }
        //正好命中
        return circle.get(hash);
    }

    /**
     * 使用MD5算法
     * @param key
     * @return
     */
    private static long md5HashingAlg(String key) {
        MessageDigest md5 = null;
        try {
            md5 = MessageDigest.getInstance("MD5");
            md5.reset();
            md5.update(key.getBytes());
            byte[] bKey = md5.digest();
            long res = ((long) (bKey[3] & 0xFF) << 24) | ((long) (bKey[2] & 0xFF) << 16) | ((long) (bKey[1] & 0xFF) << 8)| (long) (bKey[0] & 0xFF);
            return res;
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        }
        return 0l;
    }

    /**
     * 使用FNV1hash算法
     * @param key
     * @return
     */
    private static long fnv1HashingAlg(String key) {
        final int p = 16777619;
        int hash = (int) 2166136261L;
        for (int i = 0; i < key.length(); i++)
            hash = (hash ^ key.charAt(i)) * p;
        hash += hash << 13;
        hash ^= hash >> 7;
        hash += hash << 3;
        hash ^= hash >> 17;
        hash += hash << 5;
        return hash;
    }
}

發現代碼負載均衡

@Component
public class ClientComsumer implements Watcher {

    //本地緩存服務列表
    private static Map<String, List<String>> servermap;
    @Autowired
    private ZookeeperServer zkServer ;
    private ZooKeeper zk;
    private ConsistentHash consistentHash;
    @Autowired
    Environment env;

    @PostConstruct
    private void init() throws IOException {
        String address = env.getProperty("zookeeper.address");
        this.zk = zkServer.getConnection(address,this);
    }

    private List<String> getNodeList(String serverName) throws KeeperException, InterruptedException, IOException {
        if (servermap == null) {
            servermap = new HashMap<>();
        }
        Stat exists = null;
        try {
            String s = "/guanjian/" + serverName;
            exists = zk.exists(s,this);
        } catch (Exception e) {
        }

        //判斷是否存在該服務
        if (exists == null) return null;
        List<String> serverList = servermap.get(serverName);
        if (serverList != null && serverList.size() > 0) {
            //將已存在的serverList放入一致性Hash環
            this.consistentHash = new ConsistentHash(serverList.size(),serverList);
            return serverList;
        }
        List<String> children = zk.getChildren("/guanjian/" + serverName,this);
        List<String> list = new ArrayList<>();
        for (String s : children) {
            byte[] data = zk.getData("/guanjian/" + serverName + "/" + s, this, null);
            String datas = new String(data);
            NodeStat nodeStat = JSONObject.parseObject(datas, NodeStat.class);
            if (!Status.stop.equals(nodeStat.getStatus())) {
                list.add(datas);
            }
        }
        //將list放入一致性Hash環
        this.consistentHash = new ConsistentHash(list.size(),list);
        servermap.put(serverName, list);
        return list;
    }

    public String getServerinfo(String serverName) throws KeeperException, InterruptedException, IOException {
        try {
            List<String> nodeList = getNodeList(serverName);
            if (nodeList == null|| nodeList.size()< 1) {
                return null;
            }
            //這裏使用得隨機負載策略,如需須要本身能夠實現其餘得負載策略
            //String snode = nodeList.get((int) (Math.random() * nodeList.size()));
            //從一致性Hash環的第一個服務開始查找
            String snode = (String) this.consistentHash.get(nodeList.get(0));
            NodeStat nodeStat = JSONObject.parseObject(snode, NodeStat.class);
            List<String> children = zk.getChildren("/guanjian/" + serverName,this);
            //隨機負載後,將隨機取得節點後的狀態更新爲run
            for (String s : children) {
                byte[] data = zk.getData("/guanjian/" + serverName + "/" + s, this, null);
                String datas = new String(data);
                if (snode.equals(datas)) {
                    nodeStat.setStatus(Status.run);
                    zk.setData("/guanjian/" + serverName + "/" + s,JSONObject.toJSONString(nodeStat).getBytes(),0);
                    break;
                }
            }
            return JSONObject.toJSONString(nodeStat);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        //若是服務節點數據發生變化則清空本地緩存
        if (watchedEvent.getType().equals(Event.EventType.NodeChildrenChanged)) {
            servermap = null;
        }
    }
}
相關文章
相關標籤/搜索