接以前一篇<手寫zookeeper來模擬dubbo的註冊/發現>,使用一致性Hash來進行查找須要尋找的服務.node
Hash處理接口算法
public interface HashFunc { public Long hash(Object key); }
一致性Hash類緩存
public class ConsistentHash<T> { /** * Hash計算對象,用於自定義hash算法 */ HashFunc hashFunc; /** * 複製的節點個數 */ private int numberOfReplicas; /** * 一致性Hash環 */ private SortedMap<Long, T> circle = new TreeMap(); /** * 構造,使用Java默認的Hash算法 * @param numberOfReplicas 複製的節點個數,增長每一個節點的複製節點有利於負載均衡 * @param nodes 節點對象 */ public ConsistentHash(int numberOfReplicas, Collection<T> nodes) { this.numberOfReplicas = numberOfReplicas; this.hashFunc = new HashFunc() { public Long hash(Object key) { // return fnv1HashingAlg(key.toString()); return md5HashingAlg(key.toString()); } }; //初始化節點 for (T node : nodes) { add(node); } } /** * 構造 * @param hashFunc hash算法對象 * @param numberOfReplicas 複製的節點個數,增長每一個節點的複製節點有利於負載均衡 * @param nodes 節點對象 */ public ConsistentHash(HashFunc hashFunc, int numberOfReplicas, Collection<T> nodes) { this.numberOfReplicas = numberOfReplicas; this.hashFunc = hashFunc; //初始化節點 for (T node : nodes) { add(node); } } /** * 增長節點<br> * 每增長一個節點,就會在閉環上增長給定複製節點數<br> * 例如複製節點數是2,則每調用此方法一次,增長兩個虛擬節點,這兩個節點指向同一Node * 因爲hash算法會調用node的toString方法,故按照toString去重 * * @param node 節點對象 */ public void add(T node) { for (int i = 0; i < numberOfReplicas; i++) { circle.put(hashFunc.hash(node.toString() + i), node); } } /** * 移除節點的同時移除相應的虛擬節點 * * @param node 節點對象 */ public void remove(T node) { for (int i = 0; i < numberOfReplicas; i++) { circle.remove(hashFunc.hash(node.toString() + i)); } } /** * 得到一個最近的順時針節點 * * @param key 爲給定鍵取Hash,取得順時針方向上最近的一個虛擬節點對應的實際節點 * @return 節點對象 */ public T get(Object key) { if (circle.isEmpty()) { return null; } long hash = hashFunc.hash(key); if (!circle.containsKey(hash)) { SortedMap<Long, T> tailMap = circle.tailMap(hash); //返回此映射的部分視圖,其鍵大於等於 hash hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey(); } //正好命中 return circle.get(hash); } /** * 使用MD5算法 * @param key * @return */ private static long md5HashingAlg(String key) { MessageDigest md5 = null; try { md5 = MessageDigest.getInstance("MD5"); md5.reset(); md5.update(key.getBytes()); byte[] bKey = md5.digest(); long res = ((long) (bKey[3] & 0xFF) << 24) | ((long) (bKey[2] & 0xFF) << 16) | ((long) (bKey[1] & 0xFF) << 8)| (long) (bKey[0] & 0xFF); return res; } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } return 0l; } /** * 使用FNV1hash算法 * @param key * @return */ private static long fnv1HashingAlg(String key) { final int p = 16777619; int hash = (int) 2166136261L; for (int i = 0; i < key.length(); i++) hash = (hash ^ key.charAt(i)) * p; hash += hash << 13; hash ^= hash >> 7; hash += hash << 3; hash ^= hash >> 17; hash += hash << 5; return hash; } }
發現代碼負載均衡
@Component public class ClientComsumer implements Watcher { //本地緩存服務列表 private static Map<String, List<String>> servermap; @Autowired private ZookeeperServer zkServer ; private ZooKeeper zk; private ConsistentHash consistentHash; @Autowired Environment env; @PostConstruct private void init() throws IOException { String address = env.getProperty("zookeeper.address"); this.zk = zkServer.getConnection(address,this); } private List<String> getNodeList(String serverName) throws KeeperException, InterruptedException, IOException { if (servermap == null) { servermap = new HashMap<>(); } Stat exists = null; try { String s = "/guanjian/" + serverName; exists = zk.exists(s,this); } catch (Exception e) { } //判斷是否存在該服務 if (exists == null) return null; List<String> serverList = servermap.get(serverName); if (serverList != null && serverList.size() > 0) { //將已存在的serverList放入一致性Hash環 this.consistentHash = new ConsistentHash(serverList.size(),serverList); return serverList; } List<String> children = zk.getChildren("/guanjian/" + serverName,this); List<String> list = new ArrayList<>(); for (String s : children) { byte[] data = zk.getData("/guanjian/" + serverName + "/" + s, this, null); String datas = new String(data); NodeStat nodeStat = JSONObject.parseObject(datas, NodeStat.class); if (!Status.stop.equals(nodeStat.getStatus())) { list.add(datas); } } //將list放入一致性Hash環 this.consistentHash = new ConsistentHash(list.size(),list); servermap.put(serverName, list); return list; } public String getServerinfo(String serverName) throws KeeperException, InterruptedException, IOException { try { List<String> nodeList = getNodeList(serverName); if (nodeList == null|| nodeList.size()< 1) { return null; } //這裏使用得隨機負載策略,如需須要本身能夠實現其餘得負載策略 //String snode = nodeList.get((int) (Math.random() * nodeList.size())); //從一致性Hash環的第一個服務開始查找 String snode = (String) this.consistentHash.get(nodeList.get(0)); NodeStat nodeStat = JSONObject.parseObject(snode, NodeStat.class); List<String> children = zk.getChildren("/guanjian/" + serverName,this); //隨機負載後,將隨機取得節點後的狀態更新爲run for (String s : children) { byte[] data = zk.getData("/guanjian/" + serverName + "/" + s, this, null); String datas = new String(data); if (snode.equals(datas)) { nodeStat.setStatus(Status.run); zk.setData("/guanjian/" + serverName + "/" + s,JSONObject.toJSONString(nodeStat).getBytes(),0); break; } } return JSONObject.toJSONString(nodeStat); } catch (Exception e) { e.printStackTrace(); return null; } } @Override public void process(WatchedEvent watchedEvent) { //若是服務節點數據發生變化則清空本地緩存 if (watchedEvent.getType().equals(Event.EventType.NodeChildrenChanged)) { servermap = null; } } }