一個zk的節點能夠被監控,包括這個目錄中存儲的數據的修改,子節點目錄的變化,一旦變化能夠通知設置監控的客戶端,這個功能是zookeeper對於應用最重要的特性,經過這個特性能夠實現的功能包括配置的集中管理,集羣管理,分佈式鎖等等。java
watch機制官方說明:一個Watch事件是一個一次性的觸發器,當被設置了Watch的數據發生了改變的時候,則服務器將這個改變發送給設置了Watch的客戶端,以便通知它們
。
node
1) 一次性的觸發器(one-time trigger) 服務器
當數據改變的時候,那麼一個Watch事件會產生而且被髮送到客戶端中。可是客戶端只會收到一次這樣的通知,若是之後這個數據再次發生改變的時候,以前設置Watch的客戶端將不會再次收到改變的通知,由於Watch機制規定了它是一個一次性的觸發器。 網絡
當設置監視的數據發生改變時,該監視事件會被髮送到客戶端,例如,若是客戶端調用了 getData("/znode1", true) 而且稍後 /znode1 節點上的數據發生了改變或者被刪除了,客戶端將會獲取到 /znode1 發生變化的監視事件,而若是 /znode1 再一次發生了變化,除非客戶端再次對 /znode1 設置監視,不然客戶端不會收到事件通知。異步
2)發送給客戶端(Sent to the client) 分佈式
這個代表了Watch的通知事件是從服務器發送給客戶端的,是異步的,這就代表不一樣的客戶端收到的Watch的時間可能不一樣,可是ZooKeeper有保證:當一個客戶端在看到Watch事件以前是不會看到結點數據的變化的。例如:A=3,此時在上面設置了一次Watch,若是A忽然變成4了,那麼客戶端會先收到Watch事件的通知,而後纔會看到A=4。函數
Zookeeper 客戶端和服務端是經過 Socket 進行通訊的,因爲網絡存在故障,因此監視事件頗有可能不會成功地到達客戶端,監視事件是異步發送至監視者的,Zookeeper 自己提供了保序性(ordering guarantee):即客戶端只有首先看到了監視事件後,纔會感知到它所設置監視的 znode 發生了變化(a client will never see a change for which it has set a watch until it first sees the watch event). 網絡延遲或者其餘因素可能致使不一樣的客戶端在不一樣的時刻感知某一監視事件,可是不一樣的客戶端所看到的一切具備一致的順序。this
3)被設置Watch的數據(The data for which the watch was set)spa
這意味着 znode 節點自己具備不一樣的改變方式。你也能夠想象 Zookeeper 維護了兩條監視鏈表:code
數據監視和子節點監視(data watches and child watches)
getData() and exists() 設置數據監視,getChildren() 設置子節點監視。 或者,你也能夠想象 Zookeeper 設置的不一樣監視返回不一樣的數據,getData() 和 exists() 返回 znode 節點的相關信息,而 getChildren() 返回子節點列表。
所以, setData() 會觸發設置在某一節點上所設置的數據監視(假定數據設置成功),而一次成功的 create() 操做則會出發當前節點上所設置的數據監視以及父節點的子節點監視。一次成功的 delete() 操做將會觸發當前節點的數據監視和子節點監視事件,同時也會觸發該節點父節點的child watch。
能夠註冊watcher的方法:getData、exists、getChildren。
能夠觸發watcher的方法:create、delete、setData。鏈接斷開的狀況下觸發的watcher會丟失。
一個Watcher實例是一個回調函數,被回調一次後就被移除了。若是還須要關注數據的變化,須要再次註冊watcher。
New ZooKeeper時註冊的watcher叫default watcher,它不是一次性的,只對client的鏈接狀態變化做出反應。
什麼樣的操做會產生什麼類型的事件:
event For 「/path」 | event For 「/path/child」 | |
create(「/path」) | EventType.NodeCreated | 無 |
delete(「/path」) | EventType.NodeDeleted | 無 |
setData(「/path」) | EventType.NodeDataChanged | 無 |
create(「/path/child」) | EventType.NodeChildrenChanged(getChild) | EventType.NodeCreated |
delete(「/path/child」) | EventType.NodeChildrenChanged(getChild) | EventType.NodeDeleted |
setData(「/path/child」) | 無 | EventType.NodeDataChanged |
事件類型與watcher的對應關係:
event For 「/path」 | Default Watcher |
exists(「/path」) |
getData(「/path」) |
getChildren(「/path」) |
EventType.None | √ | √ | √ | √ |
EventType.NodeCreated | √ | √ | ||
EventType.NodeDeleted | √ | √ | ||
EventType.NodeDataChanged | √ | √ | ||
EventType.NodeChildrenChanged | √ |
本表總結:exits和getData設置數據監視,而getChildren設置子節點監視
操做與watcher的對應關係:
exits("/path") | getData(「/path」) | getChildren(「/path」) | exits("/path/child") | getData(「/path/child」) | getChildren(「/path/child」) | |
create(「/path」) | √ | √ | 會報錯 | |||
delete(「/path」) | √ | √ | √(這個要注意) | |||
setData(「/path」) | √ | √ | ||||
create(「/path/child」) | √ | √ | √ | |||
delete(「/path/child」) | √ | √ | √ | √ | ||
setData(「/path/child」) | √ | √ | ||||
值得注意的是:getChildren("/path")監視/path的子節點,若是(/path)本身刪了,也會觸發NodeDeleted事件。
因爲zookeeper是一次性監聽,因此咱們必須在wather的process方法裏面再設置監聽。一個方法以下:
如下邏輯是實現的是生產者和消費者模型,消費者監聽某一路徑下面子節點的變化,當生產者有消息發送過來的時候,在該節點下面建立一個子節點,而後把消息放到該子節點裏面,這會觸發消費者的process方法被調用,而後消費者取到該節點下面的子節點(順便設置一個再監聽該節點的子節點),而後取出子節點的內容,作處理,而後刪除該子節點。
public void process(WatchedEvent event) { // TODO Auto-generated method stub if (event.getState() == KeeperState.SyncConnected) { System.out.println("watcher received event"); countDownLatch.countDown(); } System.out.println("回調watcher1實例: 路徑" + event.getPath() + " 類型:"+ event.getType()); // 事件類型,狀態,和檢測的路徑 EventType eventType = event.getType(); KeeperState state = event.getState(); String watchPath = event.getPath(); switch (eventType) { case NodeCreated: break; case NodeDataChanged: break; case NodeChildrenChanged: try { //處理收到的消息 handleMessage(watchPath); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (KeeperException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } break; default: break; } } public void handleMessage(String watchPath) throws KeeperException,InterruptedException, UnsupportedEncodingException { System.out.println("收到消息"); //再監聽該子節點 List<String> Children = this.getChildren(watchPath); for (String a : Children) { String childrenpath = watchPath + "/" + a; byte[] recivedata = this.getData(childrenpath); String recString = new String(recivedata, "UTF-8"); System.out.println("receive the path:" + childrenpath + ":data:"+ recString); //作完了以後,刪除該節點 this.deletNode(childrenpath, -1); } } public List<String> getChildren(String path) throws KeeperException,InterruptedException { //監聽該節點子節點的變化狀況 return this.zooKeeper.getChildren(path, this); } public Stat setData(String path, byte[] data, int version)throws KeeperException, InterruptedException { return this.zooKeeper.setData(path, data, version); }