主要介紹下原生zookeeper客戶端API使用、zkClient工具包和分佈式鎖的簡單實現。java
導入依賴jarapache
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
</dependency>複製代碼
實現對zookeeper的基本操做vim
複製代碼
private static String connection = "127.0.0.1:2181";//鏈接信息
private static String rootPath ="/study";//目錄
/*建立一個Watcher*/
private static Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("path:"+event.getPath()+",type:"+event.getType());
if(event.getType().equals(Event.EventType.NodeChildrenChanged)){//子節點變化觸發
System.out.println("子節點變化觸發");
}else if(event.getType().equals(Event.EventType.NodeCreated)){//建立節點觸發
System.out.println("建立節點觸發");
}else if(event.getType().equals(Event.EventType.NodeDataChanged)){//節點數據變化
System.out.println("節點數據變化");
}else if(event.getType().equals(Event.EventType.NodeDeleted)){//節點刪除
System.out.println("節點刪除");
}else {
System.out.println("其餘");
}
}
};
public static void main(String[] args) throws Exception {
ZooKeeper zooKeeper = new ZooKeeper(connection,2000,watcher);
//沒有建立父節點時拋出異常 KeeperException$NoNodeException
/*zooKeeper.create(firstPath+"/test","this is my first create".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);*/
String firstPath = rootPath;
//判斷節點是否存在,兩個參數:一、節點路徑;二、是否監控(Watcher即初始化ZooKeeper時傳入的Watcher)
if(zooKeeper.exists(firstPath,true) != null){
//刪除節點
zooKeeper.delete(firstPath,-1);
}
if(zooKeeper.exists(firstPath,true) == null){
//建立一個持久節點節點 ,四個參數:一、節點路徑;二、節點數據;三、節點權限;四、建立模式
String path = zooKeeper.create(firstPath,"this is my first create".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
Stat stat = new Stat();
//獲取該節點下的數據,三個參數:一、節點路徑;二、書否監控該節點;三、版本等信息能夠經過一個Stat對象來指定
String data = new String(zooKeeper.getData(firstPath,false,stat));
System.out.println("data:"+data);
System.out.println(stat.toString());
//註冊watcher
zooKeeper.register(watcher);
//修改節點數據 ,version -1 匹配全部版本
zooKeeper.setData(firstPath,"update date".getBytes(),-1);
System.out.println("修改後:"+new String(zooKeeper.getData(firstPath,watcher,null)));
//建立子節點,並設置watcher
String childrenPath = firstPath+"/first";
//獲取孩子節點,並註冊watcher監聽Event.EventType.NodeChildrenChanged事件
zooKeeper.getChildren(firstPath,true);
zooKeeper.exists(childrenPath,watcher);
zooKeeper.create(childrenPath,"this is a child".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);//觸發連個watcher
//獲取孩子節點,並註冊watcher監聽Event.EventType.NodeChildrenChanged事件
List<String> children = zooKeeper.getChildren(firstPath,true);
System.out.println(children);
zooKeeper.exists(childrenPath,watcher);
zooKeeper.setData(childrenPath,"modify child data".getBytes(),-1);
//判斷子節點是否存在,並在子節點註冊watcher
zooKeeper.exists(childrenPath,watcher);
zooKeeper.delete(childrenPath,-1);//觸發了了兩個watcher,
//關閉鏈接
zooKeeper.close();
}複製代碼
須要注意的是原生的API建立節點只能在父目錄存在時才能建立,刪除時也不能遞歸刪除。因爲watch機制只觸發一次的特性,若是想一直監聽節點變化就要不斷的註冊watch,這對於開發者而言是不太友好的。
bash
導入依賴jar
分佈式
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>複製代碼
要使用zkClient必須實現它的序列化接口ZkSerializeride
public class MyZkSerializer implements ZkSerializer {
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
String d = (String) data;
try {
return d.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
return new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
} 複製代碼
代碼示例:工具
public static void main(String[] args) {
// 建立一個zk客戶端
ZkClient client = new ZkClient("localhost:2181");
String path = "/zkClient";
client.setZkSerializer(new MyZkSerializer());
//建立節點
client.create(path, "123", CreateMode.PERSISTENT);
// 遞歸建立節點(持久節點)
client.createPersistent("/key1/key2/key3",true);
//建立對子節點的監聽
IZkChildListener iZkChildListener = new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println(parentPath+"子節點發生變化:"+currentChilds);
}
};
//註冊watcher,每次觸發了watcher事件以後會自動註冊,
client.subscribeChildChanges(path,iZkChildListener);
//取消註冊
// client.unsubscribeChildChanges(path,iZkChildListener);
IZkDataListener iZkDataListener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println(dataPath+"節點被刪除");
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println(dataPath+"發生變化:"+data);
}
};
client.subscribeDataChanges(path, iZkDataListener);
// if(client.exists(path)){//當前節點是否存在
// client.deleteRecursive(path);//刪除節點,能夠遞歸刪除目錄下全部子節點
// }
//獲取數據
Object data = client.readData(path);
System.out.println(data);
//獲取子節點列表
List<String> children = client.getChildren(path);
System.out.println(children);
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}複製代碼
分佈式鎖實現的基於客戶端斷開鏈接臨時節點自動刪除這一特性上實現。ui
實現思路有兩種:this
1.建立同一個臨時節點,建立成功的獲取鎖,其餘服務對這一個臨時節點註冊watcher,一旦節點被刪除全部註冊了watcher的服務都將受到通知進行搶鎖。spa
2.利用臨時順序節點,每個服務建立一個臨時順序節點,搶鎖時由當前排序最小的獲取鎖。其餘的註冊前一個節點的watcher事件,監聽節點刪除事件,前一個節點刪除則再次發起搶鎖操做。
下面一段代碼利用臨時順序節點實現的鎖
package com.top.learn;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class ZKDistributed implements Lock {
private String lockPath;
private ZkClient client;
/*
* 利用臨時順序節點來實現分佈式鎖
* 獲取鎖:取排隊號(建立本身的臨時順序節點),而後判斷本身是不是最小號,如是,則得到鎖;不是,則註冊前一節點的watcher,阻塞等待
* 釋放鎖:刪除本身建立的臨時順序節點
*/
//存放當前節點路徑
ThreadLocal<String> currentPath = new ThreadLocal<>();
//存放前一個節點路勁
ThreadLocal<String> beforePath = new ThreadLocal<>();
public ZKDistributed(String connection,String path){
//這裏能夠作一些參數校驗
this.lockPath = path;
client = new ZkClient(connection);
if(!client.exists(path)){
client.createPersistent(path,true);
}
}
@Override
public boolean tryLock() {
String path = currentPath.get();
if(path == null || !client.exists(path)){//未建立過節點
//建立一個臨時順序節點
path = client.createEphemeralSequential(lockPath + "/","locked");
currentPath.set(path);
}
//獲取全部的自節點
List<String> children = client.getChildren(lockPath);
//排序
Collections.sort(children);
if(currentPath.get().equals(lockPath + "/" + children.get(0))){
//當前是最小節點,獲取鎖
return true;
}else {
int index = children.indexOf(currentPath);
beforePath.set(children.get(index-1)); //獲取上一個節點路徑
}
return false;
}
@Override
public void lock() {
while(!tryLock()){
// 阻塞等待
waitForLock();
// 再次嘗試加鎖
lock();
}
}
private void waitForLock() {
CountDownLatch countDownLatch = new CountDownLatch(1);
IZkDataListener dataListener = new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
//節點刪除,取消阻塞
countDownLatch.countDown();
}
};
//註冊對前一個節點的監聽
client.subscribeDataChanges(beforePath.get(),dataListener);
try {
if(client.exists(beforePath.get())){//前一個節點存在纔等待
countDownLatch.await();
}
}catch (Exception e){
}
//取消watcher
client.unsubscribeDataChanges(beforePath.get(),dataListener);
}
@Override
public void unlock() {
client.deleteRecursive(currentPath.get());
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
} 複製代碼
上面一段代碼沒有對鎖進行可重入實現。