ZooKeeper watch機制核心講解

ZooKeeper watch機制核心講解

https://blog.csdn.net/Future_LL/article/details/87700481java

 

Zookeeper中全部讀操做(getData(),getChildren(),exists())均可以設置Watch選項。Watch事件具備one-time trigger(一次性觸發)的特性,若是Watch監視的Znode有變化,那麼就會通知設置該Watch的客戶端。node

客戶端在Znode設置了Watch時,若是Znode內容發生改變,那麼客戶端就會得到Watch事件。例如:客戶端設置getData("/znode1", true)後,若是/znode1發生改變或者刪除,那麼客戶端就會獲得一個/znode1的Watch事件,可是/znode1再次發生變化,那客戶端是沒法收到Watch事件的,除非客戶端設置了新的Watch。apache

Watch
ZooKeeper有watch事件,是一次性觸發的【每次數據要發生變化以前都要手動建立watch】,當watch監視的數據發生時,通知設置了該watch的client,客戶端即watcher。一樣,其watcher是監聽數據發送了某些變化,那就必定會有對應的事件類型和狀態類型,一個客戶端能夠監控多個節點,在代碼中體如今new了幾個就產生幾個watcher,只要節點變化都要執行一邊process
事件類型:(znode節點相關的)【針對的是你所觀察的一個節點而言的】
EventType.NodeCreated 【節點建立】
EventType.NodeDataChanged 【節點數據發生變化】
EventType.NodeChildrenChanged 【這個節點的子節點發生變化】
EventType.NodeDeleted 【刪除當前節點】
狀態類型:(是跟客戶端實例相關的)【ZooKeeper集羣跟應用服務之間的狀態的變動】
KeeperState.Disconnected 【沒有鏈接上】
KeeperState.SyncConnected 【鏈接上】
KeeperState.AuthFailed 【認證失敗】
KeeperState.Expired 【過時】
watcher的特性:一次性,客戶端串行執行,輕量
一次性:對於ZooKeeper的watcher,你只須要記住一點,ZooKeeper有watch事件,是一次性觸發的,當watch監視的數據發生變化時,通知設置該watch的client,即watcher,因爲ZooKeeper的監控都是一次性的,因此每次必須設置監控
客戶端串行執行:客戶端watcher回調的過程是一個串行同步的過程,這爲咱們保證了順序,同時須要開發人員注意一點,千萬不要由於一個watcher的處理邏輯影響了整個客戶端的watcher回調
輕量:WatchedEvent是ZooKeeper整個Watcher通知機制的最小通知單元,整個結構只包含三個部分:通知狀態、事件類型和節點路徑。也就是說Watcher通知很是的簡單,只會告知客戶端發生了事件而不會告知其具體內容,須要客戶端本身去進行獲取,好比NodeDataChanged事件,ZooKeeper只會通知客戶端指定節點的數據發生了變動,而不會直接提供具體的數據內容
package bhz.zookeeper.watcher;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import javafx.scene.shape.Path;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
* Zookeeper Wathcher
* 本類就是一個Watcher類(實現了org.apache.zookeeper.Watcher類)
* @author(alienware)
* @since 2019-2-19
*/
public class ZooKeeperWatcher implements Watcher {

/** 定義原子變量 */
AtomicInteger seq = new AtomicInteger();
/** 定義session失效時間 */
private static final int SESSION_TIMEOUT = 10000;
/** zookeeper服務器地址 */
private static final String CONNECTION_ADDR = "192.168.1.24:2181";
/** zk父路徑設置 */
private static final String PARENT_PATH = "/p";
/** zk子路徑設置 */
private static final String CHILDREN_PATH = "/p/c";
/** 進入標識 */
private static final String LOG_PREFIX_OF_MAIN = "【Main】";
/** zk變量 */
private ZooKeeper zk = null;
/** 用於等待zookeeper鏈接創建以後 通知阻塞程序繼續向下執行 */
private CountDownLatch connectedSemaphore = new CountDownLatch(1);

/**
* 建立ZK鏈接
* @param connectAddr ZK服務器地址列表
* @param sessionTimeout Session超時時間
*/
public void createConnection(String connectAddr, int sessionTimeout) {
this.releaseConnection();
try {
//this表示把當前對象進行傳遞到其中去(也就是在主函數裏實例化的new ZooKeeperWatcher()實例對象)
zk = new ZooKeeper(connectAddr, sessionTimeout, this);
System.out.println(LOG_PREFIX_OF_MAIN + "開始鏈接ZK服務器");
connectedSemaphore.await();
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 關閉ZK鏈接
*/
public void releaseConnection() {
if (this.zk != null) {
try {
this.zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
* 建立節點
* @param path 節點路徑
* @param data 數據內容
* @return
*/
public boolean createPath(String path, String data, boolean needWatch) {
try {
//設置監控(因爲zookeeper的監控都是一次性的因此 每次必須設置監控)
this.zk.exists(path, needWatch);
System.out.println(LOG_PREFIX_OF_MAIN + "節點建立成功, Path: " +
this.zk.create( /**路徑*/
path,
/**數據*/
data.getBytes(),
/**全部可見*/
Ids.OPEN_ACL_UNSAFE,
/**永久存儲*/
CreateMode.PERSISTENT ) +
", content: " + data);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}

/**
* 讀取指定節點數據內容
* @param path 節點路徑
* @return
*/
public String readData(String path, boolean needWatch) {
try {
System.out.println("讀取數據操做...");
return new String(this.zk.getData(path, needWatch, null));
} catch (Exception e) {
e.printStackTrace();
return "";
}
}

/**
* 更新指定節點數據內容
* @param path 節點路徑
* @param data 數據內容
* @return
*/
public boolean writeData(String path, String data) {
try {
System.out.println(LOG_PREFIX_OF_MAIN + "更新數據成功,path:" + path + ", stat: " +
this.zk.setData(path, data.getBytes(), -1));
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}

/**
* 刪除指定節點
*
* @param path
* 節點path
*/
public void deleteNode(String path) {
try {
this.zk.delete(path, -1);
System.out.println(LOG_PREFIX_OF_MAIN + "刪除節點成功,path:" + path);
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 判斷指定節點是否存在
* @param path 節點路徑
*/
public Stat exists(String path, boolean needWatch) {
try {
return this.zk.exists(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}

/**
* 獲取子節點
* @param path 節點路徑
*/
private List<String> getChildren(String path, boolean needWatch) {
try {
System.out.println("讀取子節點操做...");
return this.zk.getChildren(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}

/**
* 刪除全部節點
*/
public void deleteAllTestPath(boolean needWatch) {
if(this.exists(CHILDREN_PATH, needWatch) != null){
this.deleteNode(CHILDREN_PATH);
}
if(this.exists(PARENT_PATH, needWatch) != null){
this.deleteNode(PARENT_PATH);
}
}

/**
* 收到來自Server的Watcher通知後的處理。
*/
@Override
public void process(WatchedEvent event) {

System.out.println("進入 process 。。。。。event = " + event);

try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}

if (event == null) {
return;
}

// 鏈接狀態
KeeperState keeperState = event.getState();
// 事件類型
EventType eventType = event.getType();
// 受影響的path
String path = event.getPath();
// 原子對象seq 記錄進入process的次數
String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";

System.out.println(logPrefix + "收到Watcher通知");
System.out.println(logPrefix + "鏈接狀態:\t" + keeperState.toString());
System.out.println(logPrefix + "事件類型:\t" + eventType.toString());

if (KeeperState.SyncConnected == keeperState) {
// 第一次成功鏈接上ZK服務器
if (EventType.None == eventType) {
System.out.println(logPrefix + "成功鏈接上ZK服務器");
connectedSemaphore.countDown();
}
//建立節點
else if (EventType.NodeCreated == eventType) {
System.out.println(logPrefix + "節點建立");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//更新節點
else if (EventType.NodeDataChanged == eventType) {
System.out.println(logPrefix + "節點數據更新");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//更新子節點
else if (EventType.NodeChildrenChanged == eventType) {
System.out.println(logPrefix + "子節點變動");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//刪除節點
else if (EventType.NodeDeleted == eventType) {
System.out.println(logPrefix + "節點 " + path + " 被刪除");
}
else ;
}
else if (KeeperState.Disconnected == keeperState) {
System.out.println(logPrefix + "與ZK服務器斷開鏈接");
}
else if (KeeperState.AuthFailed == keeperState) {
System.out.println(logPrefix + "權限檢查失敗");
}
else if (KeeperState.Expired == keeperState) {
System.out.println(logPrefix + "會話失效");
}
else ;

System.out.println("--------------------------------------------");

}

/**
* <B>方法名稱:</B>測試zookeeper監控<BR>
* <B>概要說明:</B>主要測試watch功能<BR>
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {

//創建watcher //當前客戶端能夠稱爲一個watcher 觀察者角色
ZooKeeperWatcher zkWatch = new ZooKeeperWatcher();
//建立鏈接
zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);
//System.out.println(zkWatch.zk.toString());

Thread.sleep(1000);

// 清理節點
// zkWatch.deleteAllTestPath(false);

//-----------------第一步: 建立父節點 /p ------------------------//
// 建立父節點
/**
* data:節點的內容是當前的時間毫秒值,needWATCH:是否須要監控
*/
if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "", true)) {

Thread.sleep(1000);

//-----------------第二步: 讀取節點 /p 和 讀取/p節點下的子節點(getChildren)的區別 --------------//
// 讀取數據
// zkWatch.readData(PARENT_PATH, true);
// 也能夠這樣作,表示當前的節點須要繼續watch,更新數據操做就會被監控到
zkWatch.exists(PARENT_PATH,true);

// 讀取子節點(監控childNodeChange事件)
// 這裏的監聽仍是監聽PARENT_PATH,只是這裏多了一個關於子節點的觸發,這裏的觸發仍是父節點
// 只要子節點發生變化【刪除、修改、增長】那麼就觸發,只會觸發:NodeChildrenChanged
zkWatch.getChildren(PARENT_PATH, true);

// 更新數據,由於上邊watch,因此更新操做會被監聽到,當更新結束,上述的watch關係會消失
zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");

Thread.sleep(1000);
// 建立子節點
// 若是needWatch爲false,只是單純的建立了一個子節點,沒有觸發任何watch:NodeCreated
zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "", true);


//-----------------第三步: 創建子節點的觸發 --------------//
// zkWatch.getChildren(CHILDREN_PATH, true);
// zkWatch.createPath(CHILDREN_PATH + "/c1", System.currentTimeMillis() + "", true);
// zkWatch.getChildren(CHILDREN_PATH + "/c1", true);
// zkWatch.createPath(CHILDREN_PATH + "/c1/c2", System.currentTimeMillis() + "", true);

//-----------------第四步: 更新子節點數據的觸發 --------------//
//在進行修改以前,咱們須要watch一下這個節點:
Thread.sleep(1000);
zkWatch.readData(CHILDREN_PATH, true);
zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");

}

Thread.sleep(10000);
// 清理節點
zkWatch.deleteAllTestPath(true);

Thread.sleep(10000);
zkWatch.releaseConnection();

}服務器

---------------------
做者:Future_LL
來源:CSDN
原文:https://blog.csdn.net/Future_LL/article/details/87700481
版權聲明:本文爲博主原創文章,轉載請附上博文連接!session

相關文章
相關標籤/搜索