ZooKeeper示例 實時更新server列表

經過以前的3篇博文, 講述了ZooKeeper的基礎知識點. 能夠看出, ZooKeeper提供的核心功能是很是簡單, 且易於學習的. 可能會給人留下ZooKeeper並不強大的印象, 事實並不是如此, 基於ZooKeeper的核心功能, 咱們能夠擴展出不少很是有意思的應用. 接下來的幾篇博文, 將陸續介紹ZooKeeper的典型應用場景.java

場景描述

在分佈式應用中, 咱們常常同時啓動多個server, 調用方(client)選擇其中之一發起請求.
分佈式應用必須考慮高可用性和可擴展性: server的應用進程可能會崩潰, 或者server自己也可能會宕機. 當server不夠時, 也有可能增長server的數量. 總而言之, server列表並不是一成不變, 而是一直處於動態的增減中.
那麼client如何才能實時的更新server列表呢? 解決的方案不少, 本文將講述利用ZooKeeper的解決方案.node

思路

啓動server時, 在zookeeper的某個znode(假設爲/sgroup)下建立一個子節點. 所建立的子節點的類型應該爲ephemeral, 這樣一來, 若是server進程崩潰, 或者server宕機, 與zookeeper鏈接的session就結束了, 那麼其所建立的子節點會被zookeeper自動刪除. 當崩潰的server恢復後, 或者新增server時, 一樣須要在/sgroup節點下建立新的子節點.
對於client, 只需註冊/sgroup子節點的監聽, 當/sgroup下的子節點增長或減小時, zookeeper會通知client, 此時client更新server列表.git

實現AppServer

AppServer的邏輯很是簡單, 只須在啓動時, 在zookeeper的"/sgroup"節點下新增一個子節點便可.github

public class AppServer {  web

  1.     private String groupNode = "sgroup";  session

  2.     private String subNode = "sub";  app

  3.   

  4.     /** 分佈式

  5.      * 鏈接zookeeper 函數

  6.      * @param address server的地址 學習

  7.      */  

  8.     public void connectZookeeper(String address) throws Exception {  

  9.         ZooKeeper zk = new ZooKeeper("localhost:4180,localhost:4181,localhost:4182"5000new Watcher() {  

  10.             public void process(WatchedEvent event) {  

  11.                 // 不作處理  

  12.             }  

  13.         });  

  14.         // 在"/sgroup"下建立子節點  

  15.         // 子節點的類型設置爲EPHEMERAL_SEQUENTIAL, 代表這是一個臨時節點, 且在子節點的名稱後面加上一串數字後綴  

  16.         // 將server的地址數據關聯到新建立的子節點上  

  17.         String createdPath = zk.create("/" + groupNode + "/" + subNode, address.getBytes("utf-8"),   

  18.             Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);  

  19.         System.out.println("create: " + createdPath);  

  20.     }  

  21.       

  22.     /** 

  23.      * server的工做邏輯寫在這個方法中 

  24.      * 此處不作任何處理, 只讓server sleep 

  25.      */  

  26.     public void handle() throws InterruptedException {  

  27.         Thread.sleep(Long.MAX_VALUE);  

  28.     }  

  29.       

  30.     public static void main(String[] args) throws Exception {  

  31.         // 在參數中指定server的地址  

  32.         if (args.length == 0) {  

  33.             System.err.println("The first argument must be server address");  

  34.             System.exit(1);  

  35.         }  

  36.           

  37.         AppServer as = new AppServer();  

  38.         as.connectZookeeper(args[0]);  

  39.           

  40.         as.handle();  

  41.     }  

  42. }  

將其打成appserver.jar後待用, 生成jar時別忘了指定入口函數. 具體的教程請自行搜索.

 

實現AppClient

AppClient的邏輯比AppServer稍微複雜一些, 須要監聽"/sgroup"下子節點的變化事件, 當事件發生時, 須要更新server列表.
註冊監聽"/sgroup"下子節點的變化事件, 可在getChildren方法中完成. 當zookeeper回調監聽器的process方法時, 判斷該事件是不是"/sgroup"下子節點的變化事件, 若是是, 則調用更新邏輯, 並再次註冊該事件的監聽.

public class AppClient {  

  1.     private String groupNode = "sgroup";  

  2.     private ZooKeeper zk;  

  3.     private Stat stat = new Stat();  

  4.     private volatile List<String> serverList;  

  5.   

  6.     /** 

  7.      * 鏈接zookeeper 

  8.      */  

  9.     public void connectZookeeper() throws Exception {  

  10.         zk = new ZooKeeper("localhost:4180,localhost:4181,localhost:4182"5000new Watcher() {  

  11.             public void process(WatchedEvent event) {  

  12.                 // 若是發生了"/sgroup"節點下的子節點變化事件, 更新server列表, 並從新註冊監聽  

  13.                 if (event.getType() == EventType.NodeChildrenChanged   

  14.                     && ("/" + groupNode).equals(event.getPath())) {  

  15.                     try {  

  16.                         updateServerList();  

  17.                     } catch (Exception e) {  

  18.                         e.printStackTrace();  

  19.                     }  

  20.                 }  

  21.             }  

  22.         });  

  23.   

  24.         updateServerList();  

  25.     }  

  26.   

  27.     /** 

  28.      * 更新server列表 

  29.      */  

  30.     private void updateServerList() throws Exception {  

  31.         List<String> newServerList = new ArrayList<String>();  

  32.   

  33.         // 獲取並監聽groupNode的子節點變化  

  34.         // watch參數爲true, 表示監聽子節點變化事件.   

  35.         // 每次都須要從新註冊監聽, 由於一次註冊, 只能監聽一次事件, 若是還想繼續保持監聽, 必須從新註冊  

  36.         List<String> subList = zk.getChildren("/" + groupNode, true);  

  37.         for (String subNode : subList) {  

  38.             // 獲取每一個子節點下關聯的server地址  

  39.             byte[] data = zk.getData("/" + groupNode + "/" + subNode, false, stat);  

  40.             newServerList.add(new String(data, "utf-8"));  

  41.         }  

  42.   

  43.         // 替換server列表  

  44.         serverList = newServerList;  

  45.   

  46.         System.out.println("server list updated: " + serverList);  

  47.     }  

  48.   

  49.     /** 

  50.      * client的工做邏輯寫在這個方法中 

  51.      * 此處不作任何處理, 只讓client sleep 

  52.      */  

  53.     public void handle() throws InterruptedException {  

  54.         Thread.sleep(Long.MAX_VALUE);  

  55.     }  

  56.   

  57.     public static void main(String[] args) throws Exception {  

  58.         AppClient ac = new AppClient();  

  59.         ac.connectZookeeper();  

  60.   

  61.         ac.handle();  

  62.     }  

  63. }  

將其打包成appclient.jar後待用, 別忘了指定入口函數.

 

運行

在運行jar包以前, 須要確認zookeeper中是否已經存在"/sgroup"節點了, 沒有不存在, 則建立該節點. 若是存在, 最好先將其刪除, 而後再從新建立. ZooKeeper的相關命令可參考個人另外一篇博文.
運行appclient.jar: java -jar appclient.jar 開啓多個命令行窗口, 每一個窗口運行appserver.jar進程:java -jar appserver.jar server0000. "server0000"表示server的地址, 別忘了給每一個server設定一個不一樣的地址. 觀察appclient的輸出.
依次結束appserver的進程, 觀察appclient的輸出.
appclient的輸出相似於:

 

  1. server list updated: []  

  2. server list updated: [server0000]  

  3. server list updated: [server0000, server0001]  

  4. server list updated: [server0000, server0001, server0002]  

  5. server list updated: [server0000, server0001, server0002, server0003]  

  6. server list updated: [server0000, server0001, server0002]  

  7. server list updated: [server0000, server0001]  

  8. server list updated: [server0000]  

  9. server list updated: []  

相關文章
相關標籤/搜索