zookeeper節點Watch機制實例展現

znode以某種方式發生變化時,「觀察」(watch)機制可讓客戶端獲得通知.能夠針對ZooKeeper服務的「操做」來設置觀察,該服務的其餘 操做能夠觸發觀察. html

實現Watcher,複寫process方法,處理收到的變動
node

    /** * Watcher Server,處理收到的變動 * @param watchedEvent */ @Override public void process(WatchedEvent watchedEvent) { LOG.info("收到事件通知:" + watchedEvent.getState() ); if ( Event.KeeperState.SyncConnected == watchedEvent.getState() ) { connectedSemaphore.countDown(); } }

以下實例展現操做節點變化:
apache

public class ZkWatchAPI implements Watcher { public static final Logger LOG = LoggerFactory.getLogger(ZkWatchAPI.class); private static final int SESSION_TIMEOUT = 10000; private ZooKeeper zk = null; private CountDownLatch connectedSemaphore = new CountDownLatch( 1 ); /** * 鏈接Zookeeper * @param connectString Zookeeper服務地址 */
    public void connectionZookeeper(String connectString){ connectionZookeeper(connectString,SESSION_TIMEOUT); } /** * <p>鏈接Zookeeper</p> * <pre> * [關於connectString服務器地址配置] * 格式: 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 * 這個地址配置有多個ip:port之間逗號分隔,底層操做 * ConnectStringParser connectStringParser = new ConnectStringParser(「192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181」); * 這個類主要就是解析傳入地址列表字符串,將其它保存在一個ArrayList中 * ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>(); * 接下去,這個地址列表會被進一步封裝成StaticHostProvider對象,而且在運行過程當中,一直是這個對象來維護整個地址列表。 * ZK客戶端將全部Server保存在一個List中,而後隨機打亂(這個隨機過程是一次性的),而且造成一個環,具體使用的時候,從0號位開始一個一個使用。 * 所以,Server地址可以重複配置,這樣可以彌補客戶端沒法設置Server權重的缺陷,可是也會加大風險。 * * [客戶端和服務端會話說明] * ZooKeeper中,客戶端和服務端創建鏈接後,會話隨之創建,生成一個全局惟一的會話ID(Session ID)。 * 服務器和客戶端之間維持的是一個長鏈接,在SESSION_TIMEOUT時間內,服務器會肯定客戶端是否正常鏈接(客戶端會定時向服務器發送heart_beat,服務器重置下次SESSION_TIMEOUT時間)。 * 所以,在正常狀況下,Session一直有效,而且ZK集羣全部機器上都保存這個Session信息。 * 在出現網絡或其它問題狀況下(例如客戶端所鏈接的那臺ZK機器掛了,或是其它緣由的網絡閃斷),客戶端與當前鏈接的那臺服務器之間鏈接斷了, * 這個時候客戶端會主動在地址列表(實例化ZK對象的時候傳入構造方法的那個參數connectString)中選擇新的地址進行鏈接。 * * [會話時間] * 客戶端並非能夠隨意設置這個會話超時時間,在ZK服務器端對會話超時時間是有限制的,主要是minSessionTimeout和maxSessionTimeout這兩個參數設置的。 * 若是客戶端設置的超時時間不在這個範圍,那麼會被強制設置爲最大或最小時間。 默認的Session超時時間是在2 * tickTime ~ 20 * tickTime * </pre> * @param connectString Zookeeper服務地址 * @param sessionTimeout Zookeeper鏈接超時時間 */
    public void connectionZookeeper(String connectString, int sessionTimeout){ this.releaseConnection(); try { // ZK客戶端容許咱們將ZK服務器的全部地址都配置在這裏
            zk = new ZooKeeper(connectString, sessionTimeout, this ); // 使用CountDownLatch.await()的線程(當前線程)阻塞直到全部其它擁有CountDownLatch的線程執行完畢(countDown()結果爲0)
 connectedSemaphore.await(); } catch ( InterruptedException e ) { LOG.error("鏈接建立失敗,發生 InterruptedException , e " + e.getMessage(), e); } catch ( IOException e ) { LOG.error( "鏈接建立失敗,發生 IOException , e " + e.getMessage(), e ); } } /** * <p>建立zNode節點, String create(path<節點路徑>, data[]<節點內容>, List(ACL訪問控制列表), CreateMode<zNode建立類型>) </p><br/> * <pre> * 節點建立類型(CreateMode) * 一、PERSISTENT:持久化節點 * 二、PERSISTENT_SEQUENTIAL:順序自動編號持久化節點,這種節點會根據當前已存在的節點數自動加 1 * 三、EPHEMERAL:臨時節點客戶端,session超時這類節點就會被自動刪除 * 四、EPHEMERAL_SEQUENTIAL:臨時自動編號節點 * </pre> * @param path zNode節點路徑 * @param data zNode數據內容 * @return 建立成功返回true, 反之返回false. */
    public boolean createPath( String path, String data ) { try { String zkPath =  this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); LOG.info( "節點建立成功, Path: " + zkPath + ", content: " + data ); return true; } catch ( KeeperException e ) { LOG.error( "節點建立失敗, 發生KeeperException! path: " + path + ", data:" + data + ", errMsg:" + e.getMessage(), e ); } catch ( InterruptedException e ) { LOG.error( "節點建立失敗, 發生 InterruptedException! path: " + path + ", data:" + data + ", errMsg:" + e.getMessage(), e ); } return false; } /** * <p>刪除一個zMode節點, void delete(path<節點路徑>, stat<數據版本號>)</p><br/> * <pre> * 說明 * 一、版本號不一致,沒法進行數據刪除操做. * 二、若是版本號與znode的版本號不一致,將沒法刪除,是一種樂觀加鎖機制;若是將版本號設置爲-1,不會去檢測版本,直接刪除. * </pre> * @param path zNode節點路徑 * @return 刪除成功返回true,反之返回false. */
    public boolean deletePath( String path ){ try { this.zk.delete(path,-1); LOG.info( "節點刪除成功, Path: " + path); return true; } catch ( KeeperException e ) { LOG.error( "節點刪除失敗, 發生KeeperException! path: " + path + ", errMsg:" + e.getMessage(), e ); } catch ( InterruptedException e ) { LOG.error( "節點刪除失敗, 發生 InterruptedException! path: " + path + ", errMsg:" + e.getMessage(), e ); } return false; } /** * <p>更新指定節點數據內容, Stat setData(path<節點路徑>, data[]<節點內容>, stat<數據版本號>)</p> * <pre> * 設置某個znode上的數據時若是爲-1,跳過版本檢查 * </pre> * @param path zNode節點路徑 * @param data zNode數據內容 * @return 更新成功返回true,返回返回false */
    public boolean writeData( String path, String data){ try { Stat stat = this.zk.setData(path, data.getBytes(), -1); LOG.info( "更新數據成功, path:" + path + ", stat: " + stat ); return true; } catch (KeeperException e) { LOG.error( "更新數據失敗, 發生KeeperException! path: " + path + ", data:" + data + ", errMsg:" + e.getMessage(), e ); } catch (InterruptedException e) { LOG.error( "更新數據失敗, 發生InterruptedException! path: " + path + ", data:" + data + ", errMsg:" + e.getMessage(), e ); } return false; } /** * <p>讀取指定節點數據內容,byte[] getData(path<節點路徑>, watcher<監視器>, stat<數據版本號>)</p> * @param path zNode節點路徑 * @return 節點存儲的值,有值返回,無值返回null */
    public String readData( String path ){ String data = null; try { data = new String( this.zk.getData( path, false, null ) ); LOG.info( "讀取數據成功, path:" + path + ", content:" + data); } catch (KeeperException e) { LOG.error( "讀取數據失敗,發生KeeperException! path: " + path + ", errMsg:" + e.getMessage(), e ); } catch (InterruptedException e) { LOG.error( "讀取數據失敗,發生InterruptedException! path: " + path + ", errMsg:" + e.getMessage(), e ); } return data; } /** * <p>獲取某個節點下的全部子節點,List getChildren(path<節點路徑>, watcher<監視器>)該方法有多個重載</p> * @param path zNode節點路徑 * @return 子節點路徑集合 說明,這裏返回的值爲節點名 * <pre> * eg. * /node * /node/child1 * /node/child2 * getChild( "node" )戶的集合中的值爲["child1","child2"] * </pre> * * * * @throws KeeperException * @throws InterruptedException */
    public List<String> getChild( String path ){ try{ List<String> list=this.zk.getChildren( path, false ); if(list.isEmpty()){ LOG.info( "中沒有節點" + path ); } return list; }catch (KeeperException e) { LOG.error( "讀取子節點數據失敗,發生KeeperException! path: " + path + ", errMsg:" + e.getMessage(), e ); } catch (InterruptedException e) { LOG.error( "讀取子節點數據失敗,發生InterruptedException! path: " + path + ", errMsg:" + e.getMessage(), e ); } return null; } /** * <p>判斷某個zNode節點是否存在, Stat exists(path<節點路徑>, watch<並設置是否監控這個目錄節點,這裏的 watcher 是在建立 ZooKeeper 實例時指定的 watcher>)</p> * @param path zNode節點路徑 * @return 存在返回true,反之返回false */
    public boolean isExists( String path ){ try { Stat stat = this.zk.exists( path, false ); return null != stat; } catch (KeeperException e) { LOG.error( "讀取數據失敗,發生KeeperException! path: " + path + ", errMsg:" + e.getMessage(), e ); } catch (InterruptedException e) { LOG.error( "讀取數據失敗,發生InterruptedException! path: " + path + ", errMsg:" + e.getMessage(), e ); } return false; } /** * Watcher Server,處理收到的變動 * @param watchedEvent */ @Override public void process(WatchedEvent watchedEvent) { LOG.info("收到事件通知:" + watchedEvent.getState() ); if ( Event.KeeperState.SyncConnected == watchedEvent.getState() ) { connectedSemaphore.countDown(); } } /** * 關閉ZK鏈接 */
    public void releaseConnection() { if ( null != zk ) { try { this.zk.close(); } catch ( InterruptedException e ) { LOG.error("release connection error ," + e.getMessage() ,e); } } } public static void main(String [] args){ // 定義父子類節點路徑
        String rootPath = "/nodeRoot"; String child1Path = rootPath + "/nodeChildren1"; String child2Path = rootPath + "/nodeChildren2"; ZkWatchAPI zkWatchAPI = new ZkWatchAPI(); // 鏈接zk服務器
        zkWatchAPI.connectionZookeeper("192.168.155.47:2181"); // 建立節點數據
        if ( zkWatchAPI.createPath( rootPath, "<父>節點數據" ) ) { System.out.println( "節點[" + rootPath + "]數據內容[" + zkWatchAPI.readData( rootPath ) + "]" ); } // 建立子節點, 讀取 + 刪除
        if ( zkWatchAPI.createPath( child1Path, "<父-子(1)>節點數據" ) ) { System.out.println( "節點[" + child1Path + "]數據內容[" + zkWatchAPI.readData( child1Path ) + "]" ); zkWatchAPI.deletePath(child1Path); System.out.println( "節點[" + child1Path + "]刪除值後[" + zkWatchAPI.readData( child1Path ) + "]" ); } // 建立子節點, 讀取 + 修改
        if ( zkWatchAPI.createPath( child2Path, "<父-子(2)>節點數據" ) ) { System.out.println( "節點[" + child2Path + "]數據內容[" + zkWatchAPI.readData( child2Path ) + "]" ); zkWatchAPI.writeData( child2Path, "<父-子(2)>節點數據,更新後的數據" ); System.out.println( "節點[" + child2Path+ "]數據內容更新後[" + zkWatchAPI.readData( child2Path ) + "]" ); } // 獲取子節點
        List<String> childPaths = zkWatchAPI.getChild(rootPath); if(null != childPaths){ System.out.println( "節點[" + rootPath + "]下的子節點數[" + childPaths.size() + "]" ); for(String childPath : childPaths){ System.out.println(" |--節點名[" +  childPath +  "]"); } } // 判斷節點是否存在
        System.out.println( "檢測節點[" + rootPath + "]是否存在:" + zkWatchAPI.isExists(rootPath) ); System.out.println( "檢測節點[" + child1Path + "]是否存在:" + zkWatchAPI.isExists(child1Path) ); System.out.println( "檢測節點[" + child2Path + "]是否存在:" + zkWatchAPI.isExists(child2Path) ); zkWatchAPI.releaseConnection(); } }
View Code

代碼運行結果:服務器

   [     74]   INFO - rg.apache.zookeeper.ClientCnxn - Socket connection established to 192.168.155.47/192.168.155.47:2181, initiating session [ 97]   INFO - rg.apache.zookeeper.ClientCnxn - Session establishment complete on server 192.168.155.47/192.168.155.47:2181, sessionid = 0x24c11eded7f000b, negotiated timeout = 10000 [ 99]   INFO -       com.test.zk.ZkWatchAPI - 收到事件通知:SyncConnected [ 119]   INFO -       com.test.zk.ZkWatchAPI - 節點建立成功, Path: /nodeRoot, content: <父>節點數據 [ 130]   INFO -       com.test.zk.ZkWatchAPI - 讀取數據成功, path:/nodeRoot, content:<父>節點數據 節點[/nodeRoot]數據內容[<父>節點數據] [ 140]   INFO -       com.test.zk.ZkWatchAPI - 節點建立成功, Path: /nodeRoot/nodeChildren1, content: <父-子(1)>節點數據 [ 145]   INFO -       com.test.zk.ZkWatchAPI - 讀取數據成功, path:/nodeRoot/nodeChildren1, content:<父-子(1)>節點數據 節點[/nodeRoot/nodeChildren1]數據內容[<父-子(1)>節點數據] [ 156]   INFO -       com.test.zk.ZkWatchAPI - 節點刪除成功, Path: /nodeRoot/nodeChildren1 [ 171]  ERROR -       com.test.zk.ZkWatchAPI - 讀取數據失敗,發生KeeperException! path: /nodeRoot/nodeChildren1, errMsg:KeeperErrorCode = NoNode for /nodeRoot/nodeChildren1 ... 節點[/nodeRoot/nodeChildren1]刪除值後[null] [ 185]   INFO -       com.test.zk.ZkWatchAPI - 節點建立成功, Path: /nodeRoot/nodeChildren2, content: <父-子(2)>節點數據 [ 200]   INFO -       com.test.zk.ZkWatchAPI - 讀取數據成功, path:/nodeRoot/nodeChildren2, content:<父-子(2)>節點數據 節點[/nodeRoot/nodeChildren2]數據內容[<父-子(2)>節點數據] [ 213]   INFO -       com.test.zk.ZkWatchAPI - 更新數據成功, path:/nodeRoot/nodeChildren2, stat: 21474836549,21474836550,1426235422098,1426235422123,1,0,0,0,43,0,21474836549 [ 228]   INFO -       com.test.zk.ZkWatchAPI - 讀取數據成功, path:/nodeRoot/nodeChildren2, content:<父-子(2)>節點數據,更新後的數據 節點[/nodeRoot/nodeChildren2]數據內容更新後[<父-子(2)>節點數據,更新後的數據] 節點[/nodeRoot]下的子節點數[1] |--節點名[nodeChildren2] 檢測節點[/nodeRoot]是否存在:true 檢測節點[/nodeRoot/nodeChildren1]是否存在:false 檢測節點[/nodeRoot/nodeChildren2]是否存在:true [ 319]   INFO - rg.apache.zookeeper.ClientCnxn - EventThread shut down [ 319]   INFO - org.apache.zookeeper.ZooKeeper - Session: 0x24c11eded7f000b closed

客戶端命令行查看數據:
網絡

轉載請註明出處:[http://www.cnblogs.com/dennisit/p/4340746.html]session

相關文章
相關標籤/搜索