#0 系列目錄#node
Zookeeper系列服務器
Zookeeper源碼.net
Zookeeper應用
ZooKeeper提供了Java和C的binding. 本文關注Java相關的API.
#1 準備工做# 拷貝ZooKeeper安裝目錄下的zookeeper.x.x.x.jar文件到項目的classpath路徑下.
#2 建立鏈接和回調接口# 首先須要建立ZooKeeper對象, 後續的一切操做都是基於該對象進行的.
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException
如下爲各個參數的詳細說明:
connectString
:zookeeper server列表, 以逗號隔開. ZooKeeper對象初始化後, 將從server列表中選擇一個server, 並嘗試與其創建鏈接
. 若是鏈接創建失敗, 則會從列表的剩餘項中選擇一個server, 並再次嘗試創建鏈接
.
sessionTimeout
:指定鏈接的超時時間.
watcher
:事件回調接口.
注意, 建立ZooKeeper對象時, 只要對象完成初始化便馬上返回. 創建鏈接是以異步的形式進行的
, 當鏈接成功創建後, 會回調watcher的process方法. 若是想要同步創建與server的鏈接
, 須要本身進一步封裝.
public class ZKConnection { /** * server列表, 以逗號分割 */ protected String hosts = "localhost:4180,localhost:4181,localhost:4182"; /** * 鏈接的超時時間, 毫秒 */ private static final int SESSION_TIMEOUT = 5000; private CountDownLatch connectedSignal = new CountDownLatch(1); protected ZooKeeper zk; /** * 鏈接zookeeper server */ public void connect() throws Exception { zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new ConnWatcher()); // 等待鏈接完成 connectedSignal.await(); } public class ConnWatcher implements Watcher { public void process(WatchedEvent event) { // 鏈接創建, 回調process接口時, 其event.getState()爲KeeperState.SyncConnected if (event.getState() == KeeperState.SyncConnected) { // 放開閘門, wait在connect方法上的線程將被喚醒 connectedSignal.countDown(); } } } }
#3 建立znode# ZooKeeper對象的create方法用於建立znode.
String create(String path, byte[] data, List acl, CreateMode createMode);
如下爲各個參數的詳細說明:
path
:znode的路徑.data
:與znode關聯的數據.acl
:指定權限信息, 若是不想指定權限, 能夠傳入Ids.OPEN_ACL_UNSAFE.createMode
:指定znode類型. CreateMode是一個枚舉類, 從中選擇一個成員傳入便可;/** * 建立臨時節點 */ public void create(String nodePath, byte[] data) throws Exception { zk.create(nodePath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); }
#4 獲取子node列表# ZooKeeper對象的getChildren方法用於獲取子node列表.
List getChildren(String path, boolean watch);
watch參數用於指定是否監聽path node的子node的增長和刪除事件
, 以及path node自己的刪除事件
.
#5 判斷znode是否存在# ZooKeeper對象的exists方法用於判斷指定znode是否存在.
Stat exists(String path, boolean watch);
watch參數用於指定是否監聽path node的建立, 刪除事件, 以及數據更新事件
. 若是該node存在, 則返回該node的狀態信息, 不然返回null.
#6 獲取node中關聯的數據# ZooKeeper對象的getData方法用於獲取node關聯的數據.
byte[] getData(String path, boolean watch, Stat stat);
是否監聽path node的刪除事件, 以及數據更新事件
, 注意, 不監聽path node的建立事件
, 由於若是path node不存在, 該方法將拋出KeeperException.NoNodeException異常.getData方法會將path node的狀態信息設置到該參數中
.#7 更新node中關聯的數據# ZooKeeper對象的setData方法用於更新node關聯的數據.
Stat setData(final String path, byte data[], int version);
若是version和真實的版本不一樣, 更新操做將失敗. 指定version爲-1則忽略版本檢查
.#8 刪除znode# ZooKeeper對象的delete方法用於刪除znode.
void delete(final String path, int version);
#9 其他接口# 請查看ZooKeeper對象的API文檔.
#10 代碼實例#
public class JavaApiSample implements Watcher { private static final int SESSION_TIMEOUT = 10000; // private static final String CONNECTION_STRING = "test.zookeeper.connection_string:2181"; private static final String CONNECTION_STRING = "127.0.0.1:2180,127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"; private static final String ZK_PATH = "/nileader"; private ZooKeeper zk = null; private CountDownLatch connectedSemaphore = new CountDownLatch(1); /** * 建立ZK鏈接 * * @param connectString ZK服務器地址列表 * @param sessionTimeout Session超時時間 */ public void createConnection(String connectString, int sessionTimeout) { this.releaseConnection(); try { zk = new ZooKeeper(connectString, sessionTimeout, this); connectedSemaphore.await(); } catch (InterruptedException e) { System.out.println("鏈接建立失敗,發生 InterruptedException"); e.printStackTrace(); } catch (IOException e) { System.out.println("鏈接建立失敗,發生 IOException"); e.printStackTrace(); } } /** * 關閉ZK鏈接 */ public void releaseConnection() { if (null != this.zk) { try { this.zk.close(); } catch (InterruptedException e) { // ignore e.printStackTrace(); } } } /** * 建立節點 * * @param path 節點path * @param data 初始數據內容 * @return */ public boolean createPath(String path, String data) { try { System.out.println("節點建立成功, Path: " + this.zk.create(path, // data.getBytes(), // ZooDefs.Ids.OPEN_ACL_UNSAFE, // CreateMode.EPHEMERAL) + ", content: " + data); } catch (KeeperException e) { System.out.println("節點建立失敗,發生KeeperException"); e.printStackTrace(); } catch (InterruptedException e) { System.out.println("節點建立失敗,發生 InterruptedException"); e.printStackTrace(); } return true; } /** * 讀取指定節點數據內容 * * @param path 節點path * @return */ public String readData(String path) { try { System.out.println("獲取數據成功,path:" + path); return new String(this.zk.getData(path, false, null)); } catch (KeeperException e) { System.out.println("讀取數據失敗,發生KeeperException,path: " + path); e.printStackTrace(); return ""; } catch (InterruptedException e) { System.out.println("讀取數據失敗,發生 InterruptedException,path: " + path); e.printStackTrace(); return ""; } } /** * 更新指定節點數據內容 * * @param path 節點path * @param data 數據內容 * @return */ public boolean writeData(String path, String data) { try { System.out.println("更新數據成功,path:" + path + ", stat: " + this.zk.setData(path, data.getBytes(), -1)); } catch (KeeperException e) { System.out.println("更新數據失敗,發生KeeperException,path: " + path); e.printStackTrace(); } catch (InterruptedException e) { System.out.println("更新數據失敗,發生 InterruptedException,path: " + path); e.printStackTrace(); } return false; } /** * 刪除指定節點 * * @param path 節點path */ public void deleteNode(String path) { try { this.zk.delete(path, -1); System.out.println("刪除節點成功,path:" + path); } catch (KeeperException e) { System.out.println("刪除節點失敗,發生KeeperException,path: " + path); e.printStackTrace(); } catch (InterruptedException e) { System.out.println("刪除節點失敗,發生 InterruptedException,path: " + path); e.printStackTrace(); } } public static void main(String[] args) { JavaApiSample sample = new JavaApiSample(); sample.createConnection(CONNECTION_STRING, SESSION_TIMEOUT); if (sample.createPath(ZK_PATH, "我是節點初始內容")) { System.out.println(); System.out.println("數據內容: " + sample.readData(ZK_PATH) + "\n"); sample.writeData(ZK_PATH, "更新後的數據"); System.out.println("數據內容: " + sample.readData(ZK_PATH) + "\n"); sample.deleteNode(ZK_PATH); } sample.releaseConnection(); } /** * 收到來自Server的Watcher通知後的處理。 */ @Override public void process(WatchedEvent event) { System.out.println("收到事件通知:" + event.getState() + "\n"); if (Event.KeeperState.SyncConnected == event.getState()) { connectedSemaphore.countDown(); } } }
#11 須要注意的幾個地方#
不能超過1M
. zookeeper的使命是分佈式協做, 而不是數據存儲
.是否監聽相應的事件
. 而create, delete, setData方法則會觸發相應的事件的發生
.大多存在其異步的重載方法
, 具體請查看API說明.