【Zookeeper系列三】ZooKeeper Java API使用

#0 系列目錄#node

ZooKeeper提供了Java和C的binding. 本文關注Java相關的API.

#1 準備工做# 拷貝ZooKeeper安裝目錄下的zookeeper.x.x.x.jar文件到項目的classpath路徑下.

#2 建立鏈接和回調接口# 首先須要建立ZooKeeper對象, 後續的一切操做都是基於該對象進行的.

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException

如下爲各個參數的詳細說明:

  1. connectString:zookeeper server列表, 以逗號隔開. ZooKeeper對象初始化後, 將從server列表中選擇一個server, 並嘗試與其創建鏈接. 若是鏈接創建失敗, 則會從列表的剩餘項中選擇一個server, 並再次嘗試創建鏈接.

  2. sessionTimeout:指定鏈接的超時時間.

  3. watcher:事件回調接口.

注意, 建立ZooKeeper對象時, 只要對象完成初始化便馬上返回. 創建鏈接是以異步的形式進行的, 當鏈接成功創建後, 會回調watcher的process方法. 若是想要同步創建與server的鏈接, 須要本身進一步封裝.

public class ZKConnection {
    /**
     * server列表, 以逗號分割
     */
    protected String hosts = "localhost:4180,localhost:4181,localhost:4182";
    /**
     * 鏈接的超時時間, 毫秒
     */
    private static final int SESSION_TIMEOUT = 5000;
    private CountDownLatch connectedSignal = new CountDownLatch(1);
    protected ZooKeeper zk;

    /**
     * 鏈接zookeeper server
     */
    public void connect() throws Exception {
        zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new ConnWatcher());
        // 等待鏈接完成
        connectedSignal.await();
    }

    public class ConnWatcher implements Watcher {
        public void process(WatchedEvent event) {
            // 鏈接創建, 回調process接口時, 其event.getState()爲KeeperState.SyncConnected
            if (event.getState() == KeeperState.SyncConnected) {
                // 放開閘門, wait在connect方法上的線程將被喚醒
                connectedSignal.countDown();
            }
        }
    }
}

#3 建立znode# ZooKeeper對象的create方法用於建立znode.

String create(String path, byte[] data, List acl, CreateMode createMode);

如下爲各個參數的詳細說明:

  1. path:znode的路徑.
  2. data:與znode關聯的數據.
  3. acl:指定權限信息, 若是不想指定權限, 能夠傳入Ids.OPEN_ACL_UNSAFE.
  4. createMode:指定znode類型. CreateMode是一個枚舉類, 從中選擇一個成員傳入便可;
/**
 * 建立臨時節點
 */
public void create(String nodePath, byte[] data) throws Exception {
    zk.create(nodePath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}

#4 獲取子node列表# ZooKeeper對象的getChildren方法用於獲取子node列表.

List getChildren(String path, boolean watch);

watch參數用於指定是否監聽path node的子node的增長和刪除事件, 以及path node自己的刪除事件.

#5 判斷znode是否存在# ZooKeeper對象的exists方法用於判斷指定znode是否存在.

Stat exists(String path, boolean watch);

watch參數用於指定是否監聽path node的建立, 刪除事件, 以及數據更新事件. 若是該node存在, 則返回該node的狀態信息, 不然返回null.

#6 獲取node中關聯的數據# ZooKeeper對象的getData方法用於獲取node關聯的數據.

byte[] getData(String path, boolean watch, Stat stat);
  1. watch參數用於指定是否監聽path node的刪除事件, 以及數據更新事件, 注意, 不監聽path node的建立事件, 由於若是path node不存在, 該方法將拋出KeeperException.NoNodeException異常.
  2. stat參數是個傳出參數, getData方法會將path node的狀態信息設置到該參數中.

#7 更新node中關聯的數據# ZooKeeper對象的setData方法用於更新node關聯的數據.

Stat setData(final String path, byte data[], int version);
  1. data爲待更新的數據.
  2. version參數指定要更新的數據的版本, 若是version和真實的版本不一樣, 更新操做將失敗. 指定version爲-1則忽略版本檢查.
  3. 返回path node的狀態信息.

#8 刪除znode# ZooKeeper對象的delete方法用於刪除znode.

void delete(final String path, int version);

#9 其他接口# 請查看ZooKeeper對象的API文檔.

#10 代碼實例#

public class JavaApiSample implements Watcher {

	private static final int SESSION_TIMEOUT = 10000;
	//    private static final String CONNECTION_STRING = "test.zookeeper.connection_string:2181";
	private static final String CONNECTION_STRING = "127.0.0.1:2180,127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
	private static final String ZK_PATH = "/nileader";
	private ZooKeeper zk = null;

	private CountDownLatch connectedSemaphore = new CountDownLatch(1);

	/**
	 * 建立ZK鏈接
	 *
	 * @param connectString  ZK服務器地址列表
	 * @param sessionTimeout Session超時時間
	 */
	public void createConnection(String connectString, int sessionTimeout) {
		this.releaseConnection();
		try {
			zk = new ZooKeeper(connectString, sessionTimeout, this);
			connectedSemaphore.await();
		} catch (InterruptedException e) {
			System.out.println("鏈接建立失敗,發生 InterruptedException");
			e.printStackTrace();
		} catch (IOException e) {
			System.out.println("鏈接建立失敗,發生 IOException");
			e.printStackTrace();
		}
	}

	/**
	 * 關閉ZK鏈接
	 */
	public void releaseConnection() {
		if (null != this.zk) {
			try {
				this.zk.close();
			} catch (InterruptedException e) {
				// ignore
				e.printStackTrace();
			}
		}
	}

	/**
	 * 建立節點
	 *
	 * @param path 節點path
	 * @param data 初始數據內容
	 * @return
	 */
	public boolean createPath(String path, String data) {
		try {
			System.out.println("節點建立成功, Path: "
					+ this.zk.create(path, //
					data.getBytes(), //
					ZooDefs.Ids.OPEN_ACL_UNSAFE, //
					CreateMode.EPHEMERAL)
					+ ", content: " + data);
		} catch (KeeperException e) {
			System.out.println("節點建立失敗,發生KeeperException");
			e.printStackTrace();
		} catch (InterruptedException e) {
			System.out.println("節點建立失敗,發生 InterruptedException");
			e.printStackTrace();
		}
		return true;
	}

	/**
	 * 讀取指定節點數據內容
	 *
	 * @param path 節點path
	 * @return
	 */
	public String readData(String path) {
		try {
			System.out.println("獲取數據成功,path:" + path);
			return new String(this.zk.getData(path, false, null));
		} catch (KeeperException e) {
			System.out.println("讀取數據失敗,發生KeeperException,path: " + path);
			e.printStackTrace();
			return "";
		} catch (InterruptedException e) {
			System.out.println("讀取數據失敗,發生 InterruptedException,path: " + path);
			e.printStackTrace();
			return "";
		}
	}

	/**
	 * 更新指定節點數據內容
	 *
	 * @param path 節點path
	 * @param data 數據內容
	 * @return
	 */
	public boolean writeData(String path, String data) {
		try {
			System.out.println("更新數據成功,path:" + path + ", stat: " +
					this.zk.setData(path, data.getBytes(), -1));
		} catch (KeeperException e) {
			System.out.println("更新數據失敗,發生KeeperException,path: " + path);
			e.printStackTrace();
		} catch (InterruptedException e) {
			System.out.println("更新數據失敗,發生 InterruptedException,path: " + path);
			e.printStackTrace();
		}
		return false;
	}

	/**
	 * 刪除指定節點
	 *
	 * @param path 節點path
	 */
	public void deleteNode(String path) {
		try {
			this.zk.delete(path, -1);
			System.out.println("刪除節點成功,path:" + path);
		} catch (KeeperException e) {
			System.out.println("刪除節點失敗,發生KeeperException,path: " + path);
			e.printStackTrace();
		} catch (InterruptedException e) {
			System.out.println("刪除節點失敗,發生 InterruptedException,path: " + path);
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {

		JavaApiSample sample = new JavaApiSample();
		sample.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);
		if (sample.createPath(ZK_PATH, "我是節點初始內容")) {
			System.out.println();
			System.out.println("數據內容: " + sample.readData(ZK_PATH) + "\n");
			sample.writeData(ZK_PATH, "更新後的數據");
			System.out.println("數據內容: " + sample.readData(ZK_PATH) + "\n");
			sample.deleteNode(ZK_PATH);
		}

		sample.releaseConnection();
	}

	/**
	 * 收到來自Server的Watcher通知後的處理。
	 */
	@Override
	public void process(WatchedEvent event) {
		System.out.println("收到事件通知:" + event.getState() + "\n");
		if (Event.KeeperState.SyncConnected == event.getState()) {
			connectedSemaphore.countDown();
		}

	}

}

#11 須要注意的幾個地方#

  1. znode中關聯的數據不能超過1M. zookeeper的使命是分佈式協做, 而不是數據存儲.
  2. getChildren, getData, exists方法可指定是否監聽相應的事件. 而create, delete, setData方法則會觸發相應的事件的發生.
  3. 以上介紹的幾個方法大多存在其異步的重載方法, 具體請查看API說明.
相關文章
相關標籤/搜索