ZooKeeper與Curator註冊和監控

Curator提供了對zookeeper客戶端的封裝,並監控鏈接狀態和會話session,特別是會話session過時後,curator可以從新鏈接zookeeper,而且建立一個新的session。html

對於zk的使用者來講,session的概念相當重要,若是想了解更多session的說明,請訪問:java

http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.htmlapache

zk客戶端和zk服務器間主要可能存在下面幾種異常狀況:api

  1. 短暫失去鏈接:此時客戶端檢測到與服務端的鏈接已經斷開,可是服務端維護的客戶端session還沒有過時,以後客戶端和服務端從新創建了鏈接;當客戶端從新鏈接後,因爲session沒有過時,zookeeper可以保證鏈接恢復後保持正常服務。
  2. 失去鏈接時間很長:此時服務器相對於客戶端的session已通過期了,與先前session相關的watcher和ephemeral的路徑和數據都會消失;當Curator從新建立了與zk的鏈接後,會獲取到session expired異常,Curator會銷燬先前的session,而且會建立一個新的session,須要注意的是,與以前session相關的watcher和ephemeral類型的路徑和數據在新的session中也不會存在,須要開發者在CuratorFramework.getConnectionStateListenable().addListener()中添加狀態監聽事件,對ConnectionState.LOST事件進行監聽,當session過時後,使得以前的session狀態得以恢復。對於ephemeral類型,在客戶端應該保持數據的狀態,以便及時恢復。
  3. 客戶端從新啓動:不論先前的zk session是否已通過期,都須要從新建立臨時節點、添加數據和watch事件,先前的session也會在稍後的一段時間內過時。
  4. Zk服務器從新啓動:因爲zk將session信息存放到了硬盤上,所以重啓後,先前未過時的session仍然存在,在zk服務器啓動後,客戶端與zk服務器建立新的鏈接,並使用先前的session,與1相同。
  5. 須要注意的是,當session過時了,在session過時期間另外的客戶端修改了zk的值,那麼這個修改在客戶端從新鏈接到zk上時,zk客戶端不會接收到這個修改的watch事件(儘管添加了watch),若是須要嚴格的watch邏輯,就須要在curator的狀態監控中添加邏輯。

特別提示:watcher僅僅是一次性的,zookeeper通知了watcher事件後,就會將這個watcher從session中刪除,所以,若是想繼續監控,就要添加新的watcher。服務器

 

下面提供了對persistent和ephemeral兩種類型節點的監控方法,其中get方法說明了persistent節點如何監控,而register方法說明了ephemeral類型的節點如何監控。session

package demo;

import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.concurrent.ConcurrentSkipListSet;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

public class CuratorTest {
    private CuratorFramework zkTools;
    private ConcurrentSkipListSet<String> watchers = new ConcurrentSkipListSet<String>();
    private static Charset charset = Charset.forName("utf-8");

    public CuratorTest() {
        zkTools = CuratorFrameworkFactory.builder()
                .connectString("192.168.0.216:3306")
                .namespace("zk/test")
                .retryPolicy(new RetryNTimes(2000, 20000))
                .build();
        zkTools.start();
    }

    public void addReconnectionWatcher(final String path, final ZookeeperWatcherType watcherType,
            final CuratorWatcher watcher) {
        synchronized (this) {
            if (!watchers.contains(watcher.toString()))// 不要添加劇復的監聽事件
            {
                watchers.add(watcher.toString());
                System.out.println("add new watcher " + watcher);
                zkTools.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                    @Override
                    public void stateChanged(CuratorFramework client, ConnectionState newState) {
                        System.out.println(newState);
                        if (newState == ConnectionState.LOST) {// 處理session過時
                            try {
                                if (watcherType == ZookeeperWatcherType.EXITS) {
                                    zkTools.checkExists().usingWatcher(watcher).forPath(path);
                                } else if (watcherType == ZookeeperWatcherType.GET_CHILDREN) {
                                    zkTools.getChildren().usingWatcher(watcher).forPath(path);
                                } else if (watcherType == ZookeeperWatcherType.GET_DATA) {
                                    zkTools.getData().usingWatcher(watcher).forPath(path);
                                } else if (watcherType == ZookeeperWatcherType.CREATE_ON_NO_EXITS) {
                                    // ephemeral類型的節點session過時了,須要從新建立節點,而且註冊監聽事件,以後監聽事件中,
                                    // 會處理create事件,將路徑值恢復到先前狀態
                                    Stat stat = zkTools.checkExists().usingWatcher(watcher)
                                            .forPath(path);
                                    if (stat == null) {
                                        System.err.println("to create");
                                        zkTools.create().creatingParentsIfNeeded()
                                                .withMode(CreateMode.EPHEMERAL)
                                                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
                                    }
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });
            }
        }
    }

    public void create() throws Exception {
        zkTools.create()// 建立一個路徑
                .creatingParentsIfNeeded()// 若是指定的節點的父節點不存在,遞歸建立父節點
                .withMode(CreateMode.PERSISTENT)// 存儲類型(臨時的仍是持久的)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)// 訪問權限
                .forPath("zk/test");// 建立的路徑
    }

    public void put() throws Exception {
        // 對路徑節點賦值
        zkTools.setData().forPath("zk/test", "hello world".getBytes(Charset.forName("utf-8")));
    }

    public void get() throws Exception {
        String path = "zk/test";
        ZKWatch watch = new ZKWatch(path);
        byte[] buffer = zkTools.getData().usingWatcher(watch).forPath(path);
        System.out.println(new String(buffer, charset));
        // 添加session過時的監控
        addReconnectionWatcher(path, ZookeeperWatcherType.GET_DATA, watch);
    }

    public void register() throws Exception {
        String ip = InetAddress.getLocalHost().getHostAddress();
        String registeNode = "zk/register/" + ip;// 節點路徑
        byte[] data = "disable".getBytes(charset);// 節點值
        CuratorWatcher watcher = new ZKWatchRegister(registeNode, data); // 建立一個register watcher
        Stat stat = zkTools.checkExists().forPath(registeNode);
        if (stat != null) {
            zkTools.delete().forPath(registeNode);
        }
        zkTools.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(registeNode, data);// 建立的路徑和值
        // 添加到session過時監控事件中
        addReconnectionWatcher(registeNode, ZookeeperWatcherType.CREATE_ON_NO_EXITS, watcher);
        data = zkTools.getData().usingWatcher(watcher).forPath(registeNode);
        System.out.println("get path form zk : " + registeNode + ":" + new String(data, charset));
    }

    public static void main(String[] args) throws Exception {
        CuratorTest test = new CuratorTest();
        test.get();
        test.register();
        Thread.sleep(10000000000L);
    }


    public class ZKWatch implements CuratorWatcher {
        private final String path;

        public String getPath() {
            return path;
        }

        public ZKWatch(String path) {
            this.path = path;
        }

        @Override
        public void process(WatchedEvent event) throws Exception {
            System.out.println(event.getType());
            if (event.getType() == EventType.NodeDataChanged) {
                byte[] data = zkTools.getData().usingWatcher(this).forPath(path);
                System.out.println(path + ":" + new String(data, Charset.forName("utf-8")));
            }
        }
    }

    public class ZKWatchRegister implements CuratorWatcher {
        private final String path;
        private byte[] value;

        public String getPath() {
            return path;
        }

        public ZKWatchRegister(String path, byte[] value) {
            this.path = path;
            this.value = value;
        }

        @Override
        public void process(WatchedEvent event) throws Exception {
            System.out.println(event.getType());
            if (event.getType() == EventType.NodeDataChanged) {
                // 節點數據改變了,須要記錄下來,以便session過時後,可以恢復到先前的數據狀態
                byte[] data = zkTools.getData().usingWatcher(this).forPath(path);
                value = data;
                System.out.println(path + ":" + new String(data, charset));
            } else if (event.getType() == EventType.NodeDeleted) {
                // 節點被刪除了,須要建立新的節點
                System.out.println(path + ":" + path + " has been deleted.");
                Stat stat = zkTools.checkExists().usingWatcher(this).forPath(path);
                if (stat == null) {
                    zkTools.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
                            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
                }
            } else if (event.getType() == EventType.NodeCreated) {
                // 節點被建立時,須要添加監聽事件(建立多是因爲session過時後,curator的狀態監聽部分觸發的)
                System.out.println(path + ":" + " has been created!" + "the current data is "
                        + new String(value));
                zkTools.setData().forPath(path, value);
                zkTools.getData().usingWatcher(this).forPath(path);
            }
        }
    }
    public enum ZookeeperWatcherType {
        GET_DATA, GET_CHILDREN, EXITS, CREATE_ON_NO_EXITS
    }

}
相關文章
相關標籤/搜索