1.導入jarjava
package com.toov5.zookeeper; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class JavaZKTest { private static final String CONNECTSTRING ="192.168.91.5"; private static int SESSIONTIMEOUT=5000; //超時時間 //使用Java併發包的 信號量 控制zk鏈接成功以後 開始建立 private static final CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws IOException, KeeperException, InterruptedException { ZooKeeper zooKeeper = null; try { //1zk建立了一個鏈接 zooKeeper = new ZooKeeper(CONNECTSTRING, SESSIONTIMEOUT, new Watcher() { public void process(WatchedEvent event) { //監聽節點是否發生變化 鏈接成功 (代碼從上往下執行,建立節點 直接copy) // 獲取事件狀態 KeeperState keeperState = event.getState(); // 獲取事件類型 EventType eventType = event.getType(); if (KeeperState.SyncConnected == keeperState) { //狀態判斷 if (EventType.None == eventType) { countDownLatch.countDown(); //--操做 到0時候 await啓動了哦 System.out.println("zk 啓動鏈接..."); //才能夠去建立節點的邏輯執行 須要用到信號量 } } } }); countDownLatch.await(); //不爲0 一直等待~~ //建立持久節點 //Ids.OPEN_ACL_UNSAFE鏈接權限 //// CreateMode對應好多模式 關於 SEQENTIAL 重名狀況下 加了個id 保證惟一性 String createNode = zooKeeper.create("/test666", "toov5".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("節點名稱"+createNode); } catch (Exception e) { }finally { if (zooKeeper != null) { //關閉鏈接 zooKeeper.close(); } } } }
2.能夠經過圖形化界面進行操做使用的工具是 zookeeper-dev-ZooInspector.jarnode
建立節點(znode) 方法:web
create:
提供了兩套建立節點的方法,同步和異步建立節點方式。
同步方式:
參數1,節點路徑《名稱) : InodeName (不容許遞歸建立節點,也就是說在父節點不存在
的狀況下,不容許建立子節點)
參數2,節點內容: 要求類型是字節數組(也就是說,不支持序列化方式,若是須要實現序
列化,可以使用java相關序列化框架,如Hessian、Kryo框架)
參數3,節點權限: 使用Ids.OPEN_ACL_UNSAFE開放權限便可。(這個參數通常在權展
沒有過高要求的場景下,不必關注)
參數4,節點類型: 建立節點的類型: CreateMode,提供四種首點象型apache
在ZooKeeper中,接口類Watcher用於表示一個標準的事件處理器,其定義了事件通知相關的邏輯,包含KeeperState和EventType兩個枚舉類,分別表明了通知狀態和事件類型,同時定義了事件的回調方法:process(WatchedEvent event)。數組
同一個事件類型在不一樣的通知狀態中表明的含義有所不一樣,表列舉了常見的通知狀態和事件類型。服務器
表7-3中列舉了ZooKeeper中最多見的幾個通知狀態和事件類型。websocket
回調方法process()網絡
process方法是Watcher接口中的一個回調方法,當ZooKeeper向客戶端發送一個Watcher事件通知時,客戶端就會對相應的process方法進行回調,從而實現對事件的處理。process方法的定義以下:session
abstract public void process(WatchedEvent event);數據結構
這個回調方法的定義很是簡單,咱們重點看下方法的參數定義:WatchedEvent。
WatchedEvent包含了每個事件的三個基本屬性:通知狀態(keeperState),事件類型(EventType)和節點路徑(path),其數據結構如圖7-5所示。ZooKeeper使用WatchedEvent對象來封裝服務端事件並傳遞給Watcher,從而方便回調方法process對服務端事件進行處理。
提到WatchedEvent,不得不講下WatcherEvent實體。籠統地講,二者表示的是同一個事物,都是對一個服務端事件的封裝。不一樣的是,WatchedEvent是一個邏輯事件,用於服務端和客戶端程序執行過程當中所需的邏輯對象,而WatcherEvent由於實現了序列化接口,所以能夠用於網絡傳輸。
服務端在生成WatchedEvent事件以後,會調用getWrapper方法將本身包裝成一個可序列化的WatcherEvent事件,以便經過網絡傳輸到客戶端。客戶端在接收到服務端的這個事件對象後,首先會將WatcherEvent還原成一個WatchedEvent事件,並傳遞給process方法處理,回調方法process根據入參就可以解析出完整的服務端事件了。
須要注意的一點是,不管是WatchedEvent仍是WatcherEvent,其對ZooKeeper服務端事件的封裝都是機及其簡單的。舉個例子來講,當/zk-book這個節點的數據發生變動時,服務端會發送給客戶端一個「ZNode數據內容變動」事件,客戶端只可以接收到以下信
public class ZkClientWatcher implements Watcher { // 集羣鏈接地址 private static final String CONNECT_ADDRES = "192.168.110.159:2181,192.168.110.160:2181,192.168.110.162:2181"; // 會話超時時間 private static final int SESSIONTIME = 2000; // 信號量,讓zk在鏈接以前等待,鏈接成功後才能往下走. private static final CountDownLatch countDownLatch = new CountDownLatch(1); private static String LOG_MAIN = "【main】 "; private ZooKeeper zk; public void createConnection(String connectAddres, int sessionTimeOut) { try { zk = new ZooKeeper(connectAddres, sessionTimeOut, this); System.out.println(LOG_MAIN + "zk 開始啓動鏈接服務器...."); countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } } public boolean createPath(String path, String data) { try { this.exists(path, true); this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(LOG_MAIN + "節點建立成功, Path:" + path + ",data:" + data); } catch (Exception e) { e.printStackTrace(); return false; } return true; } /** * 判斷指定節點是否存在 * * @param path * 節點路徑 */ public Stat exists(String path, boolean needWatch) { try { return this.zk.exists(path, needWatch); } catch (Exception e) { e.printStackTrace(); return null; } } public boolean updateNode(String path,String data) throws KeeperException, InterruptedException { exists(path, true); this.zk.setData(path, data.getBytes(), -1); return false; } public void process(WatchedEvent watchedEvent) { // 獲取事件狀態 KeeperState keeperState = watchedEvent.getState(); // 獲取事件類型 EventType eventType = watchedEvent.getType(); // zk 路徑 String path = watchedEvent.getPath(); System.out.println("進入到 process() keeperState:" + keeperState + ", eventType:" + eventType + ", path:" + path); // 判斷是否創建鏈接 if (KeeperState.SyncConnected == keeperState) { if (EventType.None == eventType) { // 若是創建創建成功,讓後程序往下走 System.out.println(LOG_MAIN + "zk 創建鏈接成功!"); countDownLatch.countDown(); } else if (EventType.NodeCreated == eventType) { System.out.println(LOG_MAIN + "事件通知,新增node節點" + path); } else if (EventType.NodeDataChanged == eventType) { System.out.println(LOG_MAIN + "事件通知,當前node節點" + path + "被修改...."); } else if (EventType.NodeDeleted == eventType) { System.out.println(LOG_MAIN + "事件通知,當前node節點" + path + "被刪除...."); } } System.out.println("--------------------------------------------------------"); } public static void main(String[] args) throws KeeperException, InterruptedException { ZkClientWatcher zkClientWatcher = new ZkClientWatcher(); zkClientWatcher.createConnection(CONNECT_ADDRES, SESSIONTIME); // boolean createResult = zkClientWatcher.createPath("/p15", "pa-644064"); zkClientWatcher.updateNode("/pa2","7894561"); } }
注意在建立節點時候,必定要寫 「/」 好比「/test」 !!!!!!
下面的例子頗有意義的,監聽的:
package com.toov5.zookeeper; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.jboss.netty.handler.codec.http.websocketx.WebSocketHandshakeException; import org.apache.zookeeper.ZooKeeper; public class JavaZKTest { private static final String CONNECTSTRING ="192.168.91.5"; private static int SESSIONTIMEOUT=5000; //超時時間 //使用Java併發包的 信號量 控制zk鏈接成功以後 開始建立 private static final CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws IOException, KeeperException, InterruptedException { ZooKeeper zooKeeper = null; try { //1zk建立了一個鏈接 zooKeeper = new ZooKeeper(CONNECTSTRING, SESSIONTIMEOUT, new Watcher() { public void process(WatchedEvent event) { //監聽節點是否發生變化 鏈接成功 (代碼從上往下執行,建立節點 直接copy) // 獲取事件狀態 KeeperState keeperState = event.getState(); // 獲取事件類型 EventType eventType = event.getType(); if (KeeperState.SyncConnected == keeperState) { //狀態判斷 if (EventType.None == eventType) { countDownLatch.countDown(); //--操做 到0時候 await啓動了哦 System.out.println("zk 啓動鏈接..."); //才能夠去建立節點的邏輯執行 須要用到信號量 } if (EventType.NodeCreated==eventType) { System.out.println("zk時間通知,獲取當前在建立節點.."); } } } }); countDownLatch.await(); //不爲0 一直等待~~ String path="/tooov5"; zooKeeper.exists(path, true); //true時候有事件通知 //建立持久節點 //Ids.OPEN_ACL_UNSAFE鏈接權限 //// CreateMode對應好多模式 關於 SEQENTIAL 重名狀況下 加了個id 保證惟一性 String createNode = zooKeeper.create(path, "toov5".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("節點名稱"+createNode); } catch (Exception e) { }finally { if (zooKeeper != null) { //關閉鏈接 zooKeeper.close(); } } } }
節點的 增長 刪除 修改 均可以監聽到!!!!!!!!!
package com.toov5.zookeeper;
import java.io.IOException;import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.Watcher.Event.EventType;import org.apache.zookeeper.Watcher.Event.KeeperState;import org.apache.zookeeper.ZooDefs.Ids;import org.jboss.netty.handler.codec.http.websocketx.WebSocketHandshakeException;import org.apache.zookeeper.ZooKeeper;
public class JavaZKTest { private static final String CONNECTSTRING ="192.168.91.5"; private static int SESSIONTIMEOUT=5000; //超時時間 //使用Java併發包的 信號量 控制zk鏈接成功以後 開始建立 private static final CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws IOException, KeeperException, InterruptedException { ZooKeeper zooKeeper = null; try { //1zk建立了一個鏈接 zooKeeper = new ZooKeeper(CONNECTSTRING, SESSIONTIMEOUT, new Watcher() { public void process(WatchedEvent event) { //監聽節點是否發生變化 鏈接成功 (代碼從上往下執行,建立節點 直接copy) // 獲取事件狀態 KeeperState keeperState = event.getState(); // 獲取事件類型 EventType eventType = event.getType(); if (KeeperState.SyncConnected == keeperState) { //狀態判斷 if (EventType.None == eventType) { countDownLatch.countDown(); //--操做 到0時候 await啓動了哦 System.out.println("zk 啓動鏈接..."); //才能夠去建立節點的邏輯執行 須要用到信號量 } if (EventType.NodeCreated==eventType) { System.out.println("zk時間通知,獲取當前在建立節點.."); } } } }); countDownLatch.await(); //不爲0 一直等待~~ String path="/tooov5"; zooKeeper.exists(path, true); //true時候有事件通知 //建立持久節點 //Ids.OPEN_ACL_UNSAFE鏈接權限 //// CreateMode對應好多模式 關於 SEQENTIAL 重名狀況下 加了個id 保證惟一性 String createNode = zooKeeper.create(path, "toov5".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("節點名稱"+createNode); } catch (Exception e) { }finally { if (zooKeeper != null) { //關閉鏈接 zooKeeper.close(); } } }}