#0 系列目錄#java
Zookeeper系列node
Zookeeper源碼server
Zookeeper應用
#1 場景描述# 在分佈式應用中, 咱們常常同時啓動多個server, 調用方(client)選擇其中之一發起請求.
分佈式應用必須考慮高可用性和可擴展性
: server的應用進程可能會崩潰, 或者server自己也可能會宕機. 當server不夠時, 也有可能增長server的數量. 總而言之, server列表並不是一成不變, 而是一直處於動態的增減中
.
那麼client如何才能實時的更新server列表呢? 解決的方案不少, 本文將講述利用ZooKeeper的解決方案.
#2 思路# 啓動server時, 在zookeeper的某個znode(假設爲/sgroup)下建立一個子節點
. 所建立的子節點的類型應該爲ephemeral, 這樣一來, 若是server進程崩潰, 或者server宕機, 與zookeeper鏈接的session就結束了, 那麼其所建立的子節點會被zookeeper自動刪除. 當崩潰的server恢復後, 或者新增server時, 一樣須要在/sgroup節點下建立新的子節點.
對於client, 只需註冊/sgroup子節點的監聽
, 當/sgroup下的子節點增長或減小時, zookeeper會通知client, 此時client更新server列表.
#3 實現AppServer# AppServer的邏輯很是簡單, 只須在啓動時, 在zookeeper的"/sgroup"節點下新增一個子節點便可.
AppServer.java源碼:
package com.king.server; import org.apache.zookeeper.*; public class AppServer { private String groupNode = "sgroup"; private String subNode = "sub"; /** * 鏈接zookeeper * * @param address server的地址 */ public void connectZookeeper(String address) throws Exception { ZooKeeper zk = new ZooKeeper("localhost:2180,localhost:2181,localhost:2182", 5000, new Watcher() { public void process(WatchedEvent event) { // 不作處理 } }); // 在"/sgroup"下建立子節點 // 子節點的類型設置爲EPHEMERAL_SEQUENTIAL, 代表這是一個臨時節點, 且在子節點的名稱後面加上一串數字後綴 // 將server的地址數據關聯到新建立的子節點上 String createdPath = zk.create("/" + groupNode + "/" + subNode, address.getBytes("utf-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("create: " + createdPath); } /** * server的工做邏輯寫在這個方法中 * 此處不作任何處理, 只讓server sleep */ public void handle() throws InterruptedException { Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws Exception { // 在參數中指定server的地址 if (args.length == 0) { System.err.println("The first argument must be server address"); System.exit(1); } AppServer as = new AppServer(); as.connectZookeeper(args[0]); as.handle(); } }
#4 實現AppClient# AppClient的邏輯比AppServer稍微複雜一些, 須要監聽"/sgroup"下子節點的變化事件, 當事件發生時, 須要更新server列表.
註冊監聽"/sgroup"下子節點的變化事件, 可在getChildren方法中完成. 當zookeeper回調監聽器的process方法時, 判斷該事件是不是"/sgroup"下子節點的變化事件, 若是是, 則調用更新邏輯, 並再次註冊該事件的監聽
.
AppClient.java源碼:
package com.king.server; import java.util.ArrayList; import java.util.List; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class AppClient { private String groupNode = "sgroup"; private ZooKeeper zk; private Stat stat = new Stat(); private volatile List<String> serverList; /** * 鏈接zookeeper */ public void connectZookeeper() throws Exception { zk = new ZooKeeper("localhost:2180,localhost:2181,localhost:2182", 5000, new Watcher() { public void process(WatchedEvent event) { // 若是發生了"/sgroup"節點下的子節點變化事件, 更新server列表, 並從新註冊監聽 if (event.getType() == Event.EventType.NodeChildrenChanged && ("/" + groupNode).equals(event.getPath())) { try { updateServerList(); } catch (Exception e) { e.printStackTrace(); } } } }); updateServerList(); } /** * 更新server列表 */ private void updateServerList() throws Exception { List<String> newServerList = new ArrayList<String>(); // 獲取並監聽groupNode的子節點變化 // watch參數爲true, 表示監聽子節點變化事件. // 每次都須要從新註冊監聽, 由於一次註冊, 只能監聽一次事件, 若是還想繼續保持監聽, 必須從新註冊 List<String> subList = zk.getChildren("/" + groupNode, true); for (String subNode : subList) { // 獲取每一個子節點下關聯的server地址 byte[] data = zk.getData("/" + groupNode + "/" + subNode, false, stat); newServerList.add(new String(data, "utf-8")); } // 替換server列表 serverList = newServerList; System.out.println("server list updated: " + serverList); } /** * client的工做邏輯寫在這個方法中 * 此處不作任何處理, 只讓client sleep */ public void handle() throws InterruptedException { Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws Exception { AppClient ac = new AppClient(); ac.connectZookeeper(); ac.handle(); } }