ZooKeeper Java Api 使用樣例

package com.pa.zookeeper.test1;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

 
/** 
 * ZooKeeper Java Api 使用樣例<br> 
 * ZK Api Version: 3.4.3  
 */ 
public class ZookeeperTest2 implements Watcher { 
 
    private static final int SESSION_TIMEOUT = 10000; 
    private static final String CONNECTION_STRING = "192.168.56.103:2181"; 
//    private static final String CONNECTION_STRING = "192.168.56.103:3000,192.168.56.103:3001,192.168.56.103:3002"; 
    private static final String ZK_PATH = "/nileader"; 
    private ZooKeeper zk = null; 
    private static int i = 0;
     
    private CountDownLatch connectedSemaphore = new CountDownLatch( 1 ); 
 
    static void p(String msg){
    	System.out.println(++i + ",msg=" + msg);
    }
    static ZookeeperTest2 sample = new ZookeeperTest2(); 
    public static void main( String[] args ) { 
        p("main-1");
        sample.createConnection( CONNECTION_STRING, SESSION_TIMEOUT ); 
        if ( sample.createPath( ZK_PATH, "我是節點初始內容" ) ) {
//            System.out.println(); 
            System.out.println( "數據內容: " + sample.readData( ZK_PATH ) + "\n" ); 
            sample.writeData( ZK_PATH, "我是更新後的數據" ); 
            System.out.println( "更新後的數據內容: " + sample.readData( ZK_PATH ) + "\n" ); 
            sample.deleteNode( ZK_PATH ); 
        } 
        sample.releaseConnection(); 
    } 
    
    /** 
     * 建立ZK鏈接 
     * @param connectString  ZK服務器地址列表 
     * @param sessionTimeout   Session超時時間 
     */ 
    public void createConnection( String connectString, int sessionTimeout ) { 
        this.releaseConnection(); 
        try { 
        	p("開始建立鏈接");
            zk = new ZooKeeper( connectString, sessionTimeout, this ); 
            connectedSemaphore.await(); 
            p("鏈接建立完畢");
        } catch ( InterruptedException e ) { 
            System.out.println( "鏈接建立失敗,發生 InterruptedException" ); 
            e.printStackTrace(); 
        } catch ( IOException e ) { 
            System.out.println( "鏈接建立失敗,發生 IOException" ); 
            e.printStackTrace(); 
        } 
    } 
 
    /** 
     * 關閉ZK鏈接 
     */ 
    public void releaseConnection() { 
        if ( this.zk != null ) { 
            try { 
            	p("關閉鏈接");
                this.zk.close(); 
            } catch ( InterruptedException e ) { 
                // ignore 
                e.printStackTrace(); 
            } 
        } 
    } 
    
    /** 
     * 收到來自Server的Watcher通知後的處理。 
     */ 
    @Override 
    public void process( WatchedEvent event ) {
       p("process收到事件通知:[state=" + event.getState() + ",type=" + event.getType() + ",path=" + event.getPath() + "]"); 
        if ( KeeperState.SyncConnected == event.getState() ) { 
            connectedSemaphore.countDown(); 
        } 
        if ( KeeperState.SyncConnected == event.getState() && !event.getPath().equals("") ) { 
        	System.out.println( "process------中獲取數據內容: "); 
        	System.out.println( "process------" + sample.readData( ZK_PATH ) + "\n" ); 
        } 
    } 
 
    /** 
     *  建立節點 
     * @param path 節點path 
     * @param data 初始數據內容 
     * @return 
     */ 
    public boolean createPath( String path, String data ) { 
        try { 
            p( "開始建立節點, Path: " 
                    + this.zk.create( path, // 
                                              data.getBytes(), // 
                                              Ids.OPEN_ACL_UNSAFE, // 
                                              CreateMode.EPHEMERAL ) 
                    + ", content: " + data );
            p("節點建立完畢");
        } catch ( KeeperException e ) { 
            System.out.println( "節點建立失敗,發生KeeperException" ); 
            e.printStackTrace(); 
        } catch ( InterruptedException e ) { 
            System.out.println( "節點建立失敗,發生 InterruptedException" ); 
            e.printStackTrace(); 
        } 
        return true; 
    } 
 
    /** 
     * 讀取指定節點數據內容 
     * @param path 節點path 
     * @return 
     */ 
    public String readData( String path ) { 
        try { 
            p("-------------開始獲取數據,path:" + path ); 
            String data = new String( this.zk.getData( path, true, null ) ); 
            p("-------------獲取數據完畢:" + data); 
            return data;
        } catch ( KeeperException e ) { 
            System.out.println( "讀取數據失敗,發生KeeperException,path: " + path  ); 
            e.printStackTrace(); 
            return ""; 
        } catch ( InterruptedException e ) { 
            System.out.println( "讀取數據失敗,發生 InterruptedException,path: " + path  ); 
            e.printStackTrace(); 
            return ""; 
        } 
    } 
 
    /** 
     * 更新指定節點數據內容 
     * @param path 節點path 
     * @param data  數據內容 
     * @return 
     */ 
    public boolean writeData( String path, String data ) { 
        try { 
            p( "++++++++++開始更新數據");
    		p("path:" + path + ", stat: " + 
                                                this.zk.setData( path, data.getBytes(), -1 ) );
            p("+++++++++++數據更新完畢");
        } catch ( KeeperException e ) { 
            System.out.println( "更新數據失敗,發生KeeperException,path: " + path  ); 
            e.printStackTrace(); 
        } catch ( InterruptedException e ) { 
            System.out.println( "更新數據失敗,發生 InterruptedException,path: " + path  ); 
            e.printStackTrace(); 
        } 
        return false; 
    } 
 
    /** 
     * 刪除指定節點 
     * @param path 節點path 
     */ 
    public void deleteNode( String path ) { 
        try { 
        	p("開始刪除節點");
            this.zk.delete( path, -1 ); 
            System.out.println( "刪除節點成功,path:" + path ); 
        } catch ( KeeperException e ) { 
            System.out.println( "刪除節點失敗,發生KeeperException,path: " + path  ); 
            e.printStackTrace(); 
        } catch ( InterruptedException e ) { 
            System.out.println( "刪除節點失敗,發生 InterruptedException,path: " + path  ); 
            e.printStackTrace(); 
        } 
    } 
 
}

執行以後的日誌以下:java

1,msg=main-1
2,msg=開始建立鏈接
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
3,msg=process收到事件通知:[state=SyncConnected,type=None,path=null]
4,msg=鏈接建立完畢
5,msg=開始建立節點, Path: /nileader, content: 我是節點初始內容
6,msg=節點建立完畢
7,msg=-------------開始獲取數據,path:/nileader
8,msg=-------------獲取數據完畢:我是節點初始內容
數據內容: 我是節點初始內容apache

9,msg=++++++++++開始更新數據
10,msg=process收到事件通知:[state=SyncConnected,type=NodeDataChanged,path=/nileader]
process------中獲取數據內容: 
11,msg=-------------開始獲取數據,path:/nileader
12,msg=path:/nileader, stat: 520,521,1484807555448,1484807555462,1,0,0,97307556030054430,24,0,520服務器

13,msg=+++++++++++數據更新完畢
14,msg=-------------開始獲取數據,path:/nileader
15,msg=-------------獲取數據完畢:我是更新後的數據
process------我是更新後的數據session

16,msg=-------------獲取數據完畢:我是更新後的數據
更新後的數據內容: 我是更新後的數據app

17,msg=開始刪除節點
18,msg=process收到事件通知:[state=SyncConnected,type=NodeDeleted,path=/nileader]
process------中獲取數據內容: 
19,msg=-------------開始獲取數據,path:/nileader
刪除節點成功,path:/nileader
20,msg=關閉鏈接
 ide

經過屢次運行,上述代碼出現過NoNodeException,應該是執行代碼的順序沒有保證,偶爾出現執行的前後順序不一樣致使異常,後面再細看。。。this

例子是從其餘地方修改的,出處已忘記了。。。日誌

相關文章
相關標籤/搜索