經過以前的3篇博文, 講述了ZooKeeper的基礎知識點. 能夠看出, ZooKeeper提供的核心功能是很是簡單, 且易於學習的. 可能會給人留下ZooKeeper並不強大的印象, 事實並不是如此, 基於ZooKeeper的核心功能, 咱們能夠擴展出不少很是有意思的應用. 接下來的幾篇博文, 將陸續介紹ZooKeeper的典型應用場景.java
在分佈式應用中, 咱們常常同時啓動多個server, 調用方(client)選擇其中之一發起請求.
分佈式應用必須考慮高可用性和可擴展性: server的應用進程可能會崩潰, 或者server自己也可能會宕機. 當server不夠時, 也有可能增長server的數量. 總而言之, server列表並不是一成不變, 而是一直處於動態的增減中.
那麼client如何才能實時的更新server列表呢? 解決的方案不少, 本文將講述利用ZooKeeper的解決方案.node
啓動server時, 在zookeeper的某個znode(假設爲/sgroup)下建立一個子節點. 所建立的子節點的類型應該爲ephemeral, 這樣一來, 若是server進程崩潰, 或者server宕機, 與zookeeper鏈接的session就結束了, 那麼其所建立的子節點會被zookeeper自動刪除. 當崩潰的server恢復後, 或者新增server時, 一樣須要在/sgroup節點下建立新的子節點.
對於client, 只需註冊/sgroup子節點的監聽, 當/sgroup下的子節點增長或減小時, zookeeper會通知client, 此時client更新server列表.git
AppServer的邏輯很是簡單, 只須在啓動時, 在zookeeper的"/sgroup"節點下新增一個子節點便可.github
public class AppServer { web
private String groupNode = "sgroup"; session
private String subNode = "sub"; app
/** 分佈式
* 鏈接zookeeper 函數
* @param address server的地址 學習
*/
public void connectZookeeper(String address) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:4180,localhost:4181,localhost:4182", 5000, new Watcher() {
public void process(WatchedEvent event) {
// 不作處理
}
});
// 在"/sgroup"下建立子節點
// 子節點的類型設置爲EPHEMERAL_SEQUENTIAL, 代表這是一個臨時節點, 且在子節點的名稱後面加上一串數字後綴
// 將server的地址數據關聯到新建立的子節點上
String createdPath = zk.create("/" + groupNode + "/" + subNode, address.getBytes("utf-8"),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("create: " + createdPath);
}
/**
* server的工做邏輯寫在這個方法中
* 此處不作任何處理, 只讓server sleep
*/
public void handle() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 在參數中指定server的地址
if (args.length == 0) {
System.err.println("The first argument must be server address");
System.exit(1);
}
AppServer as = new AppServer();
as.connectZookeeper(args[0]);
as.handle();
}
}
將其打成appserver.jar後待用, 生成jar時別忘了指定入口函數. 具體的教程請自行搜索.
AppClient的邏輯比AppServer稍微複雜一些, 須要監聽"/sgroup"下子節點的變化事件, 當事件發生時, 須要更新server列表.
註冊監聽"/sgroup"下子節點的變化事件, 可在getChildren方法中完成. 當zookeeper回調監聽器的process方法時, 判斷該事件是不是"/sgroup"下子節點的變化事件, 若是是, 則調用更新邏輯, 並再次註冊該事件的監聽.
public class AppClient {
private String groupNode = "sgroup";
private ZooKeeper zk;
private Stat stat = new Stat();
private volatile List<String> serverList;
/**
* 鏈接zookeeper
*/
public void connectZookeeper() throws Exception {
zk = new ZooKeeper("localhost:4180,localhost:4181,localhost:4182", 5000, new Watcher() {
public void process(WatchedEvent event) {
// 若是發生了"/sgroup"節點下的子節點變化事件, 更新server列表, 並從新註冊監聽
if (event.getType() == EventType.NodeChildrenChanged
&& ("/" + groupNode).equals(event.getPath())) {
try {
updateServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
updateServerList();
}
/**
* 更新server列表
*/
private void updateServerList() throws Exception {
List<String> newServerList = new ArrayList<String>();
// 獲取並監聽groupNode的子節點變化
// watch參數爲true, 表示監聽子節點變化事件.
// 每次都須要從新註冊監聽, 由於一次註冊, 只能監聽一次事件, 若是還想繼續保持監聽, 必須從新註冊
List<String> subList = zk.getChildren("/" + groupNode, true);
for (String subNode : subList) {
// 獲取每一個子節點下關聯的server地址
byte[] data = zk.getData("/" + groupNode + "/" + subNode, false, stat);
newServerList.add(new String(data, "utf-8"));
}
// 替換server列表
serverList = newServerList;
System.out.println("server list updated: " + serverList);
}
/**
* client的工做邏輯寫在這個方法中
* 此處不作任何處理, 只讓client sleep
*/
public void handle() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
AppClient ac = new AppClient();
ac.connectZookeeper();
ac.handle();
}
}
將其打包成appclient.jar後待用, 別忘了指定入口函數.
在運行jar包以前, 須要確認zookeeper中是否已經存在"/sgroup"節點了, 沒有不存在, 則建立該節點. 若是存在, 最好先將其刪除, 而後再從新建立. ZooKeeper的相關命令可參考個人另外一篇博文.
運行appclient.jar: java -jar appclient.jar
開啓多個命令行窗口, 每一個窗口運行appserver.jar進程:java -jar appserver.jar server0000
. "server0000"表示server的地址, 別忘了給每一個server設定一個不一樣的地址. 觀察appclient的輸出.
依次結束appserver的進程, 觀察appclient的輸出.
appclient的輸出相似於:
server list updated: []
server list updated: [server0000]
server list updated: [server0000, server0001]
server list updated: [server0000, server0001, server0002]
server list updated: [server0000, server0001, server0002, server0003]
server list updated: [server0000, server0001, server0002]
server list updated: [server0000, server0001]
server list updated: [server0000]
server list updated: []