zookeeper watcher

zookeeper watcherjava

All of the read operations in ZooKeeper - getData(), getChildren(), and exists() - have the option of setting a watch as a side effect. Here is ZooKeeper's definition of a watch: a watch event is one-time trigger, sent to the client that set the watch, which occurs when the data for which the watch was set changes。node

getData,getChildren(),exists()這三個方法能夠針對參數中的path設置watcher,當path對應的Node 有相應變化時,server端會給對應的設置了watcher的client發送一個一次性的觸發通知事件。客戶端在收到這個觸發通知事件後,能夠根據本身的業務邏輯進行相應地處理。apache

注意這個watcher的功能是一次性的,若是還想繼續獲得watcher通知,在處理完事件後,要從新註冊。session

下面就作幾個demo來體會一下。app

首先看一下WatchedEvent,該事件有state、type和path屬性,以下,ide

public class WatchedEvent {
    final private KeeperState keeperState;
    final private EventType eventType;
    private String path;
    ........
}

那麼state表示什麼含義,type又表示什麼含義:KeeperState 的註解this

public enum KeeperState {
    /** Unused, this state is never generated by the server */
    @Deprecated
    Unknown (-1),

    /** The client is in the disconnected state - it is not connected
     * to any server in the ensemble. */
    Disconnected (0),

    /** Unused, this state is never generated by the server */
    @Deprecated
    NoSyncConnected (1),

    /** The client is in the connected state - it is connected
     * to a server in the ensemble(全體,總效果) (one of the servers specified
     * in the host connection parameter during ZooKeeper client
     * creation). */
    SyncConnected (3),

    /**
     * Auth failed state
     */
    AuthFailed (4),

    /**
     * The client is connected to a read-only server, that is the
     * server which is not currently connected to the majority.
     * The only operations allowed after receiving this state is
     * read operations.
     * This state is generated for read-only clients only since
     * read/write clients aren't allowed to connect to r/o servers.
     */
    ConnectedReadOnly (5),

    /**
     * SaslAuthenticated: used to notify clients that they are SASL-authenticated,
     * so that they can perform Zookeeper actions with their SASL-authorized permissions.
     */
    SaslAuthenticated(6),

    /** The serving cluster has expired this session. The ZooKeeper
     * client connection (the session) is no longer valid. You must
     * create a new client connection (instantiate a new ZooKeeper
     * instance) if you with to access the ensemble. */
    Expired (-112);
    ........
}

再看EventType,spa

/**
 * Enumeration of types of events that may occur on the ZooKeeper
 */
public enum EventType {
    None(-1),
    NodeCreated(1),
    NodeDeleted(2),
    NodeDataChanged(3),
    NodeChildrenChanged(4);
}

ok,很好明白。下面就作一個示例,code

package com.usfot;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;

/**
 * Created by liyanxin on 2015/3/17.
 */
public class ZookeeperWathcherDemo {

    //互斥鎖
    private static Integer mutex = new Integer(-1);

    public static void main(String args[]) throws KeeperException, InterruptedException, IOException {

        ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 300000, new Watcher() {
            // 監控全部被觸發的事件
            public void process(WatchedEvent event) {
                System.out.println("狀態:" + event.getState() + "|類型:" + event.getType() +
                        "|Wrapper:" + event.getWrapper() + "|Path:" + event.getPath());
            }
        });

        // 建立一個目錄節點
        zk.create("/testRootPath", "testRootData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT);
        // 建立一個子目錄節點
        zk.create("/testRootPath/testChildPathOne", "testChildDataOne".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);


        zk.getData("/testRootPath/testChildPathOne", new Watcher() {
            @Override
            public void process(WatchedEvent event) {

                if (event.getState() == Event.KeeperState.SyncConnected
                        && event.getType() == Event.EventType.NodeDataChanged) {
                    System.out.println("znode:/testRootPath/testChildPathOne data change");
                    synchronized (mutex) {
                        mutex.notify();
                    }
                }
            }
        }, null);

        synchronized (mutex) {
            mutex.wait(); //阻塞直到notify
        }
        zk.close();
    }
}

這段代碼對路徑爲/testRootPath/testChildPathOne的znode註冊了一個watcher,該watcher監聽的state爲SyncConnected,而且類型爲NodeDataChanged,當發生這個時間後,打印消息,而且notify條件變量mutex,使程序結束。orm

是如何使該znode發生改變呢,以下打開zk的客戶端,

[zk: localhost:2181(CONNECTED) 26] set /testRootPath/testChildPathOne mydata
cZxid = 0x700000044
ctime = Tue Mar 17 17:46:12 CST 2015
mZxid = 0x700000045
mtime = Tue Mar 17 17:47:49 CST 2015
pZxid = 0x700000044
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0
[zk: localhost:2181(CONNECTED) 27]

這時那段程序打印消息,以下,

狀態:SyncConnected|類型:None|Wrapper:-1,3,
|Path:null
znode:/testRootPath/testChildPathOne data change

Process finished with exit code 0

參考:http://luzengyi.blog.163.com/blog/static/529188201064113744373/

==================END==================

相關文章
相關標籤/搜索