Zookeeper簡單操做使用(一)

 1.導入jarjava

package com.toov5.zookeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class JavaZKTest {
    private static final String CONNECTSTRING ="192.168.91.5";
    private static int   SESSIONTIMEOUT=5000;  //超時時間
    //使用Java併發包的 信號量 控制zk鏈接成功以後 開始建立
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        ZooKeeper zooKeeper = null;
        try {
            //1zk建立了一個鏈接
             zooKeeper =  new ZooKeeper(CONNECTSTRING, SESSIONTIMEOUT, new Watcher() {
                
                public void process(WatchedEvent event) {
                    //監聽節點是否發生變化  鏈接成功 (代碼從上往下執行,建立節點 直接copy)
                    // 獲取事件狀態
                    KeeperState keeperState = event.getState();
                    // 獲取事件類型
                    EventType eventType = event.getType();
                    if (KeeperState.SyncConnected == keeperState) {  //狀態判斷
                        if (EventType.None == eventType) {
                            countDownLatch.countDown(); //--操做 到0時候 await啓動了哦
                            System.out.println("zk 啓動鏈接..."); //才能夠去建立節點的邏輯執行  須要用到信號量
                        }
                    }    
                }
            });    
             countDownLatch.await();  //不爲0  一直等待~~
               //建立持久節點  
               //Ids.OPEN_ACL_UNSAFE鏈接權限
              //// CreateMode對應好多模式  關於 SEQENTIAL 重名狀況下 加了個id 保證惟一性
            String createNode = zooKeeper.create("/test666", "toov5".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("節點名稱"+createNode);
        } catch (Exception e) {
            
        }finally {
            if (zooKeeper != null) {  //關閉鏈接
                zooKeeper.close();          
            }
        }
    
 }
}
View Code

2.能夠經過圖形化界面進行操做使用的工具是 zookeeper-dev-ZooInspector.jarnode

建立節點(znode) 方法:web

create:
    提供了兩套建立節點的方法,同步和異步建立節點方式。
   同步方式:
   參數1,節點路徑《名稱) : InodeName (不容許遞歸建立節點,也就是說在父節點不存在
  的狀況下,不容許建立子節點)
  參數2,節點內容: 要求類型是字節數組(也就是說,不支持序列化方式,若是須要實現序
  列化,可以使用java相關序列化框架,如Hessian、Kryo框架)
  參數3,節點權限: 使用Ids.OPEN_ACL_UNSAFE開放權限便可。(這個參數通常在權展
  沒有過高要求的場景下,不必關注)
  參數4,節點類型: 建立節點的類型: CreateMode,提供四種首點象型apache

  • PERSISTENT                                    持久化節點
  • PERSISTENT_SEQUENTIAL          順序自動編號持久化節點,這種節點會根據當前已存在的節點數自動加 1
  • EPHEMERAL                                   臨時節點, 客戶端session超時這類節點就會被自動刪除
  • EPHEMERAL_SEQUENTIAL          臨時自動編號節點

 


Watcher

在ZooKeeper中,接口類Watcher用於表示一個標準的事件處理器,其定義了事件通知相關的邏輯,包含KeeperState和EventType兩個枚舉類,分別表明了通知狀態和事件類型,同時定義了事件的回調方法:process(WatchedEvent event)。數組

什麼是Watcher接口

同一個事件類型在不一樣的通知狀態中表明的含義有所不一樣,表列舉了常見的通知狀態和事件類型。服務器

 

表7-3中列舉了ZooKeeper中最多見的幾個通知狀態和事件類型。websocket

回調方法process()網絡

process方法是Watcher接口中的一個回調方法,當ZooKeeper向客戶端發送一個Watcher事件通知時,客戶端就會對相應的process方法進行回調,從而實現對事件的處理。process方法的定義以下:session

abstract public void process(WatchedEvent event);數據結構

這個回調方法的定義很是簡單,咱們重點看下方法的參數定義:WatchedEvent。

WatchedEvent包含了每個事件的三個基本屬性:通知狀態(keeperState),事件類型(EventType)和節點路徑(path),其數據結構如圖7-5所示。ZooKeeper使用WatchedEvent對象來封裝服務端事件並傳遞給Watcher,從而方便回調方法process對服務端事件進行處理。

提到WatchedEvent,不得不講下WatcherEvent實體。籠統地講,二者表示的是同一個事物,都是對一個服務端事件的封裝。不一樣的是,WatchedEvent是一個邏輯事件,用於服務端和客戶端程序執行過程當中所需的邏輯對象,而WatcherEvent由於實現了序列化接口,所以能夠用於網絡傳輸。

服務端在生成WatchedEvent事件以後,會調用getWrapper方法將本身包裝成一個可序列化的WatcherEvent事件,以便經過網絡傳輸到客戶端。客戶端在接收到服務端的這個事件對象後,首先會將WatcherEvent還原成一個WatchedEvent事件,並傳遞給process方法處理,回調方法process根據入參就可以解析出完整的服務端事件了。

須要注意的一點是,不管是WatchedEvent仍是WatcherEvent,其對ZooKeeper服務端事件的封裝都是機及其簡單的。舉個例子來講,當/zk-book這個節點的數據發生變動時,服務端會發送給客戶端一個「ZNode數據內容變動」事件,客戶端只可以接收到以下信

public class ZkClientWatcher implements Watcher {
    // 集羣鏈接地址
    private static final String CONNECT_ADDRES = "192.168.110.159:2181,192.168.110.160:2181,192.168.110.162:2181";
    // 會話超時時間
    private static final int SESSIONTIME = 2000;
    // 信號量,讓zk在鏈接以前等待,鏈接成功後才能往下走.
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static String LOG_MAIN = "【main】 ";
    private ZooKeeper zk;

    public void createConnection(String connectAddres, int sessionTimeOut) {
        try {
            zk = new ZooKeeper(connectAddres, sessionTimeOut, this);
            System.out.println(LOG_MAIN + "zk 開始啓動鏈接服務器....");
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public boolean createPath(String path, String data) {
        try {
            this.exists(path, true);
            this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println(LOG_MAIN + "節點建立成功, Path:" + path + ",data:" + data);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    /**
     * 判斷指定節點是否存在
     * 
     * @param path
     *            節點路徑
     */
    public Stat exists(String path, boolean needWatch) {
        try {
            return this.zk.exists(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    public boolean updateNode(String path,String data) throws KeeperException, InterruptedException {
        exists(path, true);
        this.zk.setData(path, data.getBytes(), -1);
        return false;
    }

    public void process(WatchedEvent watchedEvent) {

        // 獲取事件狀態
        KeeperState keeperState = watchedEvent.getState();
        // 獲取事件類型
        EventType eventType = watchedEvent.getType();
        // zk 路徑
        String path = watchedEvent.getPath();
        System.out.println("進入到 process() keeperState:" + keeperState + ", eventType:" + eventType + ", path:" + path);
        // 判斷是否創建鏈接
        if (KeeperState.SyncConnected == keeperState) {
            if (EventType.None == eventType) {
                // 若是創建創建成功,讓後程序往下走
                System.out.println(LOG_MAIN + "zk 創建鏈接成功!");
                countDownLatch.countDown();
            } else if (EventType.NodeCreated == eventType) {
                System.out.println(LOG_MAIN + "事件通知,新增node節點" + path);
            } else if (EventType.NodeDataChanged == eventType) {
                System.out.println(LOG_MAIN + "事件通知,當前node節點" + path + "被修改....");
            }
            else if (EventType.NodeDeleted == eventType) {
                System.out.println(LOG_MAIN + "事件通知,當前node節點" + path + "被刪除....");
            }

        }
        System.out.println("--------------------------------------------------------");
    }

    public static void main(String[] args) throws KeeperException, InterruptedException {
        ZkClientWatcher zkClientWatcher = new ZkClientWatcher();
        zkClientWatcher.createConnection(CONNECT_ADDRES, SESSIONTIME);
//        boolean createResult = zkClientWatcher.createPath("/p15", "pa-644064");
        zkClientWatcher.updateNode("/pa2","7894561");
    }

}
View Code

 

注意在建立節點時候,必定要寫 「/」  好比「/test」 !!!!!!

下面的例子頗有意義的,監聽的:

package com.toov5.zookeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import org.apache.zookeeper.ZooKeeper;

public class JavaZKTest {
    private static final String CONNECTSTRING ="192.168.91.5";
    private static int   SESSIONTIMEOUT=5000;  //超時時間
    //使用Java併發包的 信號量 控制zk鏈接成功以後 開始建立
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        ZooKeeper zooKeeper = null;
        try {
            //1zk建立了一個鏈接
             zooKeeper =  new ZooKeeper(CONNECTSTRING, SESSIONTIMEOUT, new Watcher() {
                
                public void process(WatchedEvent event) {
                    //監聽節點是否發生變化  鏈接成功 (代碼從上往下執行,建立節點 直接copy)
                    // 獲取事件狀態
                    KeeperState keeperState = event.getState();
                    // 獲取事件類型
                    EventType eventType = event.getType();
                    if (KeeperState.SyncConnected == keeperState) {  //狀態判斷
                        if (EventType.None == eventType) {
                            countDownLatch.countDown(); //--操做 到0時候 await啓動了哦
                            System.out.println("zk 啓動鏈接..."); //才能夠去建立節點的邏輯執行  須要用到信號量
                        }
                        if (EventType.NodeCreated==eventType) {
                            System.out.println("zk時間通知,獲取當前在建立節點..");
                        }
                    }    
                }
            });    
             countDownLatch.await();  //不爲0  一直等待~~
             String path="/tooov5";
             zooKeeper.exists(path, true); //true時候有事件通知
               //建立持久節點  
               //Ids.OPEN_ACL_UNSAFE鏈接權限
              //// CreateMode對應好多模式  關於 SEQENTIAL 重名狀況下 加了個id 保證惟一性
            String createNode = zooKeeper.create(path, "toov5".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("節點名稱"+createNode);
        } catch (Exception e) {
            
        }finally {
            if (zooKeeper != null) {  //關閉鏈接
                zooKeeper.close();          
            }
        }
    
 }
}
View Code

 

節點的 增長 刪除 修改 均可以監聽到!!!!!!!!!

package com.toov5.zookeeper;
import java.io.IOException;import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.Watcher.Event.EventType;import org.apache.zookeeper.Watcher.Event.KeeperState;import org.apache.zookeeper.ZooDefs.Ids;import org.jboss.netty.handler.codec.http.websocketx.WebSocketHandshakeException;import org.apache.zookeeper.ZooKeeper;
public class JavaZKTest {    private static final String CONNECTSTRING ="192.168.91.5";    private static int   SESSIONTIMEOUT=5000;  //超時時間    //使用Java併發包的 信號量 控制zk鏈接成功以後 開始建立    private static final CountDownLatch countDownLatch = new CountDownLatch(1);    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {        ZooKeeper zooKeeper = null;        try {            //1zk建立了一個鏈接             zooKeeper =  new ZooKeeper(CONNECTSTRING, SESSIONTIMEOUT, new Watcher() {                                public void process(WatchedEvent event) {                    //監聽節點是否發生變化  鏈接成功 (代碼從上往下執行,建立節點 直接copy)                    // 獲取事件狀態                    KeeperState keeperState = event.getState();                    // 獲取事件類型                    EventType eventType = event.getType();                    if (KeeperState.SyncConnected == keeperState) {  //狀態判斷                        if (EventType.None == eventType) {                            countDownLatch.countDown(); //--操做 到0時候 await啓動了哦                            System.out.println("zk 啓動鏈接..."); //才能夠去建立節點的邏輯執行  須要用到信號量                        }                        if (EventType.NodeCreated==eventType) {                            System.out.println("zk時間通知,獲取當前在建立節點..");                        }                    }                    }            });                 countDownLatch.await();  //不爲0  一直等待~~             String path="/tooov5";             zooKeeper.exists(path, true); //true時候有事件通知               //建立持久節點                 //Ids.OPEN_ACL_UNSAFE鏈接權限              //// CreateMode對應好多模式  關於 SEQENTIAL 重名狀況下 加了個id 保證惟一性            String createNode = zooKeeper.create(path, "toov5".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);            System.out.println("節點名稱"+createNode);        } catch (Exception e) {                    }finally {            if (zooKeeper != null) {  //關閉鏈接                zooKeeper.close();                      }        }     }}

相關文章
相關標籤/搜索