添加依賴
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
建立會話
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
connectString |
形如ip:port,ip:port/path1/path2,表示zk服務器列表,帶path路徑的表示基於此path操做 |
sessionTimeout |
客戶端會話超時時間,毫秒值,在sessionTimeout時間內沒有進行有效的心跳檢測,則認爲會話超時 |
watcher |
默認監聽器,能夠不設置,傳null便可 |
canBeReadOnly |
boolean值,true表示只讀,默認爲false |
sessionId和sessionPasswd |
會話id和會話的祕鑰,當一個會話建立後,會自動生成對應的id和祕鑰,主要用來恢復會話 |
package com.banary.base;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
public class ZookeeperFactory {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
createZk2();
}
public static void createZk1() throws Exception{
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
System.out.println(zooKeeper.getState());
countDownLatch.await();
System.out.println("zk實例建立成功");
}
public static void createZk2() throws Exception{
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
System.out.println(zooKeeper.getState());
countDownLatch.await();
long sessionId = zooKeeper.getSessionId();
byte[] sessionPasswd = zooKeeper.getSessionPasswd();
//使用錯的sessionId和sessionPasswd
zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher(), 1l, "test".getBytes());
//使用對的sessionId和sessionPasswd
zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher(), sessionId, sessionPasswd);
Thread.sleep(Integer.MAX_VALUE);
}
private static class DemoWatcher implements Watcher{
public void process(WatchedEvent watchedEvent) {
System.out.println("收到zk event:" + watchedEvent);
countDownLatch.countDown();
}
}
}
- 注意
建立zk對象的方法是異步的,此處採用CountDownLatch實現同步
建立節點
- 方法(不支持遞歸建立,若是該節點已存在,會拋出異常NodeExistsException)
#同步方法
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException
#異步方法
public void create(String path, byte[] data, List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)
path |
要建立的數據節點的路徑 |
data |
字節數組,該節點的初始內容,須要本身序列化成字節數組 |
acl |
節點的安全策略,詳見ZooDefs.Ids |
createMode |
枚舉類型,相見CreateMode |
cb |
異步建立時的回調函數,須要開發者本身實現對應的AsyncCallback子接口,如StringCallback,重寫void processResult(int var1, String var2, Object var3, String var4)方法,當zk服務器建立完節點,客戶端自動調用該方法 |
ctx |
回調函數上下文,和回調函數一塊兒使用 |
package com.banary.base;
import org.apache.zookeeper.*;
import java.util.concurrent.CountDownLatch;
public class CreateDemo {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws Exception{
asyncCreate();
}
/**
* 同步建立
* @throws Exception
*/
public static void syncCreate() throws Exception{
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
//阻塞,直到zk連接成功
countDownLatch.await();
//建立臨時節點,沒有權限限制
String path1 = zooKeeper.create("/zk-test-ephemeral-", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("臨時節點1建立成功:"+ path1);
//建立臨時順序節點,沒有權限限制
String path2 = zooKeeper.create("/zk-test-ephemeral-", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("臨時節點2建立成功:" + path2);
}
/**
* 異步建立
* @throws Exception
*/
public static void asyncCreate() throws Exception{
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
//阻塞,直到zk連接成功
countDownLatch.await();
//建立臨時節點,沒有權限限制
zooKeeper.create("/zk-test-ephemeral-", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
new DemoCallback(), "path1");
//建立臨時順序節點,沒有權限限制
zooKeeper.create("/zk-test-ephemeral-", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new DemoCallback(), "path2" );
Thread.sleep(Integer.MAX_VALUE);
}
private static class DemoWatcher implements Watcher{
public void process(WatchedEvent watchedEvent) {
if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
countDownLatch.countDown();
}
}
}
private static class DemoCallback implements AsyncCallback.StringCallback {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("建立節點:[" + rc + ", " + path + ", " + ctx.toString() +
", 真實path:" + name + "]");
}
}
}
rc |
服務端響應碼:0 建立成功;-4 客戶端和服務端的連接斷開;-110 節點已存在;-112 會話過時 |
path |
對應create方法中節點路徑參數值 |
ctx |
接口調用時傳如API的ctx,即對應異步create方法中的ctx參數 |
name |
建立成功後,對應節點的真是路徑 |
- 同步方法會阻塞線程,異步方法不會
- 同步會拋出異常,異步方法不會,異常信息是經過狀態碼的方式傳遞到回調函數中
刪除
- 方法(刪除時不支持遞歸刪除,也就是說某個節點若是存在子節點,則不能刪除)
#同步刪除
public void delete(String path, int version) throws InterruptedException, KeeperException
#異步刪除
public void delete(String path, int version, VoidCallback cb, Object ctx)
path |
要刪除的節點對應的path |
version |
數據節點的版本號,樂觀鎖 |
cb |
異步刪除時的回調函數 |
ctx |
回調函數的參數 |
package com.banary.base;
import org.apache.zookeeper.*;
import java.util.concurrent.CountDownLatch;
public class DeleteDemo {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws Exception{
asyncDelete();
}
public static void syncDelete() throws Exception{
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
countDownLatch.await();
String path = zooKeeper.create("/deleteDemo", "deleteDemo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Thread.sleep(10000);
zooKeeper.delete(path, 0);
}
public static void asyncDelete() throws Exception{
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
countDownLatch.await();
String path = zooKeeper.create("/deleteDemo", "deleteDemo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Thread.sleep(5000);
zooKeeper.delete(path, 0, new DemoCallback(), "asyncDelete");
Thread.sleep(5000);
}
private static class DemoWatcher implements Watcher{
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("建立zk會話:" + watchedEvent.getState());
if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
System.out.println("zk會話建立成功");
countDownLatch.countDown();
}
}
}
public static class DemoCallback implements AsyncCallback.VoidCallback{
@Override
public void processResult(int rc, String path, Object ctx) {
System.out.println("刪除節點成功:[" + rc + ", " + path + ", " + ctx.toString() + "]");
}
}
}
查詢節點數據內容getData
public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException
public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException
public void getData(String path, Watcher watcher, DataCallback cb, Object ctx)
public void getData(String path, boolean watch, DataCallback cb, Object ctx)
path |
要獲取數據的節點的路徑 |
watcher |
監聽器 |
watch |
true表示使用默認的監聽器,即建立zk對象時註冊的監聽器 |
stat |
數據節點的狀態信息,傳入一箇舊的stat變量,服務器響應後會用的新的stat變量替換 |
cb |
異步方法的回調函數 |
ctx |
回調函數上下文參數 |
package com.banary.base;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
public class GetDataDemo {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws Exception{
asyncGetData();
}
/**
* 同步獲取數據
* @throws Exception
*/
public static void syncGetData() throws Exception{
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
countDownLatch.await();
String path = zooKeeper.create("/syncGetData", "syncGetData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(new String(zooKeeper.getData(path,true, new Stat())));
}
/**
* 異步獲取數據
*/
public static void asyncGetData() throws Exception{
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
countDownLatch.await();
String path = zooKeeper.create("/asyncGetData", "asyncGetData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(new String(zooKeeper.getData(path,true, new Stat())));
Thread.sleep(Integer.MAX_VALUE);
}
private static class DemoWatcher implements Watcher{
@Override
public void process(WatchedEvent watchedEvent) {
if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
countDownLatch.countDown();
}
}
}
private static class DemoCallback implements AsyncCallback.DataCallback{
@Override
public void processResult(int rc, String path, Object ctx, byte[] bytes, Stat stat) {
System.out.println("異步獲取的數據內容爲:" + new String(bytes));
}
}
}
查詢子節點getChildren
public List<String> getChildren(String path, Watcher watcher) throws KeeperException, InterruptedException
public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException
public void getChildren(String path, Watcher watcher, ChildrenCallback cb, Object ctx)
public void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)
public List<String> getChildren(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException
public List<String> getChildren(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException
public void getChildren(String path, Watcher watcher, Children2Callback cb, Object ctx)
public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)
path |
要查詢的節點 |
watcher |
註冊一個監聽器 |
watch |
使用默認的監聽器 |
cb |
異步方法的回調函數 |
ctx |
回調函數上下文參數 |
stat |
闖入一箇舊的Stat對象,查詢後,會被服務端響應的新Stat對象替換 |
修改
#同步方法
public Stat setData(String path, byte[] data, int version) throws KeeperException, InterruptedException
#異步方法
public void setData(String path, byte[] data, int version, StatCallback cb, Object ctx)
path |
要修改的數據節點的路徑 |
data |
修改後的數據 |
version |
數據修改時基於的版本號,即樂觀鎖 |
cb |
異步方法的回調函數 |
ctx |
回調函數上下文參數 |
package com.banary.base;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
public class UpdateDemo {
public static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws Exception{
asyncUpdate();
}
/**
* 同步更新
*/
public static void syncUpdate() throws Exception{
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
countDownLatch.await();
String path = zooKeeper.create("/syncUpdate", "syncUpdate".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
Stat stat = new Stat();
System.out.println(new String(zooKeeper.getData(path, true, stat)));
System.out.println(stat.getCzxid() + "," + stat.getMzxid() + "," + stat.getVersion());
//-1 表示不加樂觀鎖
zooKeeper.setData(path, "32123".getBytes(), -1);
System.out.println(new String(zooKeeper.getData(path, true, stat)));
System.out.println(stat.getCzxid() + "," + stat.getMzxid() + "," + stat.getVersion());
}
/**
* 異步更新
* @throws Exception
*/
public static void asyncUpdate() throws Exception{
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
countDownLatch.await();
String path = zooKeeper.create("/asyncUpdate", "asyncUpdate".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
Stat stat = new Stat();
System.out.println(new String(zooKeeper.getData(path, true, stat)));
System.out.println(stat.getCzxid() + "," + stat.getMzxid() + "," + stat.getVersion());
zooKeeper.setData(path, "dsadsa".getBytes(), -1, new DemoCallback(), "回調");
Thread.sleep(Integer.MAX_VALUE);
}
private static class DemoWatcher implements Watcher{
@Override
public void process(WatchedEvent watchedEvent) {
if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
countDownLatch.countDown();
}else if(watchedEvent.getType() == Event.EventType.NodeDataChanged){
System.out.println("節點數據內容發生了變化");
}
}
}
private static class DemoCallback implements AsyncCallback.StatCallback{
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println("修改節點:[" + rc + ", " + path + ", " + ctx.toString() +
", stat:" + stat.toString() + "]");
}
}
}
判斷節點是否存在
public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException
public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException
public void exists(String path, Watcher watcher, StatCallback cb, Object ctx)
public void exists(String path, boolean watch, StatCallback cb, Object ctx)
path |
要判斷的節點路徑 |
watcher |
註冊一個監聽器,用來監聽節點被建立,刪除,更新 |
watch |
是否使用默認的監聽器 |
cb |
異步方法的回調函數 |
ctx |
回調函數上下文參數 |
package com.banary.base;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
public class ExistsNodeDemo {
public static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws Exception{
asyncExists();
}
/**
* 同步
*/
public static void syncExists() throws Exception{
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
countDownLatch.await();
//建立
String path = zooKeeper.create("/syncExists", "syncExists".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
Stat stat = zooKeeper.exists(path, true);
System.out.println(stat.toString());
//更新
stat = zooKeeper.setData(path, "3213".getBytes(), -1);
System.out.println(stat.toString());
//刪除
zooKeeper.delete(path, -1);
Thread.sleep(Integer.MAX_VALUE);
}
/**
* 異步
*/
public static void asyncExists() throws Exception{
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
countDownLatch.await();
//建立
String path = zooKeeper.create("/asyncExists", "asyncExists".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
zooKeeper.exists(path, true, new DemoCallback(), "asyncExists");
Thread.sleep(Integer.MAX_VALUE);
}
private static class DemoWatcher implements Watcher {
@Override
public void process(WatchedEvent watchedEvent) {
if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
countDownLatch.countDown();
}else if(Event.EventType.NodeCreated == watchedEvent.getType()){
System.out.println("建立節點");
}else if(Event.EventType.NodeDataChanged == watchedEvent.getType()){
System.out.println("修改節點");
}else if(Event.EventType.NodeDeleted == watchedEvent.getType()){
System.out.println("刪除節點");
}
}
}
public static class DemoCallback implements AsyncCallback.StatCallback{
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println("[" + rc + ", " + path + ", " + ctx.toString() +
", stat:" + stat.toString() + "]");
}
}
}
權限控制
#zk會話對象的方法
public void addAuthInfo(String scheme, byte[] auth)
scheme |
權限控制模式,枚舉值:world,auth,digest,ip和super |
auth |
權限信息 |
package com.banary.base;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
public class AuthDemo {
private static CountDownLatch countDownLatch = null;
public static void main(String[] args) throws Exception{
authCreate();
}
public static void authCreate() throws Exception{
countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
countDownLatch.await();
zooKeeper.addAuthInfo("digest", "auth".getBytes());
String path = zooKeeper.create("/auth", "auth".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper2 = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
countDownLatch.await();
zooKeeper2.addAuthInfo("digest", "auth".getBytes());
System.out.println(new String(zooKeeper2.getData(path, true, new Stat())));
countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper1 = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
countDownLatch.await();
System.out.println(new String(zooKeeper1.getData(path, true, new Stat())));
}
private static class DemoWatcher implements Watcher {
public void process(WatchedEvent watchedEvent) {
if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
countDownLatch.countDown();
}
}
}
}
總結
- 除了建立會話是異步的,其餘操做都存在同步和異步方法,同步會拋出異常
- 建立會話、查詢(包括查詢該節點的數據和子節點和判斷節點存在)均可以註冊監聽器,也能夠使用會話的默認監聽器