上一篇 從0開始的高併發(一)--- zookeeper的基礎概念 咱們在結尾留下了一個分佈式鎖的坑,它保證了咱們在多節點應用的一次調度還有解決分佈式環境下的數據一致性的問題html
好比咱們如今擁有這麼一個集羣,集羣裏面有個緩存服務,集羣中每一個程序都會用到這個緩存,若是此時緩存中有一項緩存過時了,在大併發環境下,同一時刻中許許多多的服務都過來訪問緩存,獲取緩存中的數據,發現緩存過時,就要再去數據庫取,而後更新到緩存服務中去。可是其實咱們僅僅只須要一個請求過來數據庫去更新緩存便可,而後這個場景,咱們該怎麼去作java
咱們參考多線程的場景下會使用到鎖的這個方法,放到如今的併發場景下,咱們也是須要經過一種鎖來實現。node
排他(互斥)性:只有一個線程能獲取到
文件系統(同一個文件不支持多我的去修改)
數據庫:主鍵惟一約束 for update
緩存:redis setnx命令
zookeeper:相似文件系統
阻塞性:其餘未搶到的線程阻塞,直到鎖被釋放再進行搶這個行爲
可重入性:線程獲取鎖後,後續是否可重複得到該鎖
複製代碼
同一個父目錄下面不能有相同的子節點,這就是zookeeper的排他性
經過JDK的柵欄來實現阻塞性
可重入性咱們能夠經過計數器來實現
複製代碼
1.接口難以使用
2.鏈接zookeeper超時不支持自動重連
3.watch註冊一次會失效,須要反覆註冊
4.不支持遞歸建立節點(遞歸建立的話,比方說我要建立一個文件,假如我在idea建立,那我能夠連帶着包一塊兒建立,可是在window我就作不到,這種整一個路徑一併建立下來的就能夠視爲遞歸建立)
5.須要手動設置序列化的問題
複製代碼
org.apache.zookeeper
org.apache.zookeeper.data
connect---鏈接到zookeeper集合
create---建立znode
exist---檢查znode是否存在及其信息
getData---從特定的znode獲取數據
setData---從特定的znode設置數據
getChildren---獲取特定znode中的全部子節點
delete===刪除特定znode及其全部子項
close---關閉鏈接
複製代碼
MyZkSerializer.javaredis
public class MyZkSerializer implements ZkSerializer {
//正常來講咱們還須要進行一個非空判斷,這裏爲了省事沒作,不過嚴格來講是須要作的
//就是簡單的轉換
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
String d = (String) data;
try {
return d.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
return new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
}
複製代碼
ZkClientDemo.java數據庫
public class ZkClientDemo {
public static void main(String[] args) {
// 建立一個zk客戶端
ZkClient client = new ZkClient("localhost:2181");
//實現序列化接口
client.setZkSerializer(new MyZkSerializer());
//建立一個節點zk,在zk節點下再建立一個子節點app6,賦值123
//在以前也已經提到了,zookeeper中的節點既是文件夾也是文件
//源碼中CreateMode是一個枚舉,CreateMode.PERSISTENT---當客戶端斷開鏈接時,znode不會自動刪除
client.create("/zk/app6", "123", CreateMode.PERSISTENT);
client.subscribeChildChanges("/zk/app6", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println(parentPath+"子節點發生變化:"+currentChilds);
}
});
//這裏開始是建立一個watch,可是爲何這個方法會命名爲subscribeDataChanges()呢,緣由是:
//本來watch的設置而後獲取是僅一次性的,如今咱們使用subscribe這個英文,表明訂閱,表明這個watch一直存在
//使用這個方法咱們能夠輕易實現持續監聽的效果,比原生zookeeper方便
client.subscribeDataChanges("/zk/app6", new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println(dataPath+"節點被刪除");
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println(dataPath+"發生變化:"+data);
}
});
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
複製代碼
調用ls /zk---能夠發現app6已經被建立,apache
經過get /zk/app6---可獲取到咱們設置的123這個值緩存
說明咱們的程序沒有問題,能夠成功執行網絡
這裏測試監聽事件多線程
create /zk/app6/tellYourDream時---控制檯打印/zk/app6子節點發生變化:[tellYourDream]併發
delete /zk/app6/tellYourDream---控制檯打印/zk/app6子節點發生變化:[],此時已經不存在任何節點,因此爲空
set /zk/app6 123456---/zk/app6發生變化:123456
delete /zk/app6---同時觸發了兩個監聽事件,/zk/app6子節點發生變化:null 和 /zk/app6節點被刪除
1.持久化節點:不刪除節點永遠存在。且能夠建立子節點
/**
* The znode will not be automatically deleted upon client's disconnect.
* 持久無序
*/
PERSISTENT (0, false, false),
/**
* The znode will not be automatically deleted upon client's disconnect,
* and its name will be appended with a monotonically increasing number.
* 持久有序
*/
PERSISTENT_SEQUENTIAL (2, false, true),
複製代碼
2.非持久節點,換言之就是臨時節點,臨時節點就是客戶端鏈接的時候建立,客戶端掛起的時候,臨時節點自動刪除。不能建立子節點
/**
* The znode will be deleted upon the client's disconnect.
* 臨時無序
*/
EPHEMERAL (1, true, false),
/**
* The znode will be deleted upon the client's disconnect, and its name
* will be appended with a monotonically increasing number.
* 臨時有序
*/
EPHEMERAL_SEQUENTIAL (3, true, true);
複製代碼
還有更多的一些監聽方法,咱們能夠本身去嘗試一下。
咱們以前有提到,zookeeper中同一個子節點下面的節點名稱是不能相同的,咱們能夠利用這個互斥性,就能夠實現分佈式鎖的工具
臨時節點就是建立的時候存在,消失的時候,節點自動刪除,當客戶端失聯,網絡不穩定或者崩潰的時候,這個經過臨時節點所建立的鎖就會自行消除。這樣就能夠完美避免死鎖的問題。因此咱們利用這個特性,實現咱們的需求。
原理其實就是節點不可重名+watch機制。
好比說咱們的程序有多個服務實例,哪一個服務實例都去建立一個lock節點,誰建立了,誰就得到了鎖,剩下咱們沒有建立的應用,就去監聽這個lock節點,若是這個lock節點被刪除掉,這時可能出現兩種狀況,一就是客戶端連不上了,另外一種就是客戶端釋放鎖,將lock節點給刪除掉了。
public class ZkDistributeLock implements Lock {
//咱們須要一個鎖的目錄
private String lockPath;
//咱們須要一個客戶端
private ZkClient client;
//剛剛咱們的客戶端和鎖的目錄,這兩個參數怎麼傳進來?
//那就須要咱們的構造函數來進行傳值
public ZkDistributeLock(String lockPath) {
if(lockPath ==null || lockPath.trim().equals("")) {
throw new IllegalArgumentException("patch不能爲空字符串");
}
this.lockPath = lockPath;
client = new ZkClient("localhost:2181");
client.setZkSerializer(new MyZkSerializer());
}
複製代碼
實現Lock接口要重寫的方法(包括嘗試建立臨時節點tryLock(),解鎖unlock(),上鎖lock(),waitForLock()實現阻塞和喚醒的功能方法)
// trylock方法咱們是會嘗試建立一個臨時節點
@Override
public boolean tryLock() { // 不會阻塞
// 建立節點
try {
client.createEphemeral(lockPath);
} catch (ZkNodeExistsException e) {
return false;
}
return true;
}
@Override
public void unlock() {
client.delete(lockPath);
}
@Override
public void lock() {
// 若是獲取不到鎖,阻塞等待
if (!tryLock()) {
// 沒得到鎖,阻塞本身
waitForLock();
// 從等待中喚醒,再次嘗試得到鎖
lock();
}
}
private void waitForLock() {
final CountDownLatch cdl = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("----收到節點被刪除了-------------");
//喚醒阻塞線程
cdl.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data)
throws Exception {
}
};
client.subscribeDataChanges(lockPath, listener);
// 阻塞本身
if (this.client.exists(lockPath)) {
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 取消註冊
client.unsubscribeDataChanges(lockPath, listener);
}
}
複製代碼
ZkDistributeLock 如今咱們再總結一下流程
獲取鎖,建立節點後
1.成功獲取到的---執行業務---而後釋放鎖
|
|
|
2.獲取失敗,註冊節點的watch---阻塞等待---取消watch---再回到獲取鎖,建立節點的判斷
複製代碼
這個設計會有一個缺點,好比個人實例如今有無數個,此時咱們的lock每次被建立,有人獲取了鎖以後,其餘的人都要被通知阻塞,此時咱們就浪費了不少的網絡資源,也就是驚羣效應。
此時咱們必須進行優化
咱們的Lock做爲一個znode,也能夠建立屬於它的子節點,咱們使用lock建立臨時順序節點,咱們在從0開始的高併發(一)--- zookeeper的基礎概念中已經提到了,zookeeper是有序的,臨時順序節點會自動進行由小到大的自動排序,此時咱們把實例分配至這些順序子節點上,而後編號最小的獲取鎖便可。這很是相似於咱們的公平鎖的概念,也是遵循FIFO原則的
原理:取號 + 最小號取lock + watch
一樣是基於Lock接口的實現
public class ZkDistributeImproveLock implements Lock {
/*
* 利用臨時順序節點來實現分佈式鎖
* 獲取鎖:取排隊號(建立本身的臨時順序節點),而後判斷本身是不是最小號,如是,則得到鎖;不是,則註冊前一節點的watcher,阻塞等待
* 釋放鎖:刪除本身建立的臨時順序節點
*/
//一樣的鎖目錄
private String lockPath;
//一樣的客戶端
private ZkClient client;
private ThreadLocal<String> currentPath = new ThreadLocal<String>();
private ThreadLocal<String> beforePath = new ThreadLocal<String>();
// 鎖重入計數器
private ThreadLocal<Integer> reenterCount = ThreadLocal.withInitial(()->0);
public ZkDistributeImproveLock(String lockPath) {
if(lockPath == null || lockPath.trim().equals("")) {
throw new IllegalArgumentException("patch不能爲空字符串");
}
this.lockPath = lockPath;
client = new ZkClient("localhost:2181");
client.setZkSerializer(new MyZkSerializer());
if (!this.client.exists(lockPath)) {
try {
this.client.createPersistent(lockPath, true);
} catch (ZkNodeExistsException e) {
}
}
}
@Override
public boolean tryLock() {
System.out.println(Thread.currentThread().getName() + "-----嘗試獲取分佈式鎖");
if (this.currentPath.get() == null || !client.exists(this.currentPath.get())) {
//這裏就是先去建立了一個臨時順序節點,在lockpath那裏建立
//用銀行取號來表示這個行爲吧,至關於每一個實例程序先去取號,而後排隊等着叫號的場景
String node = this.client.createEphemeralSequential(lockPath + "/", "locked");
//記錄第一個節點編號
currentPath.set(node);
reenterCount.set(0);
}
// 得到全部的號
List<String> children = this.client.getChildren(lockPath);
// 把這些號進行排序
Collections.sort(children);
// 判斷當前節點是不是最小的,和第一個節點編號作對比
if (currentPath.get().equals(lockPath + "/" + children.get(0))) {
// 鎖重入計數
reenterCount.set(reenterCount.get() + 1);
System.out.println(Thread.currentThread().getName() + "-----得到分佈式鎖");
return true;
} else {
// 取到前一個
// 獲得字節的索引號
int curIndex = children.indexOf(currentPath.get().substring(lockPath.length() + 1));
String node = lockPath + "/" + children.get(curIndex - 1);
beforePath.set(node);
}
return false;
}
@Override
public void lock() {
if (!tryLock()) {
// 阻塞等待
waitForLock();
// 再次嘗試加鎖
lock();
}
}
private void waitForLock() {
final CountDownLatch cdl = new CountDownLatch(1);
// 註冊watcher
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println(Thread.currentThread().getName() + "-----監聽到節點被刪除,分佈式鎖被釋放");
cdl.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
client.subscribeDataChanges(this.beforePath.get(), listener);
// 怎麼讓本身阻塞
if (this.client.exists(this.beforePath.get())) {
try {
System.out.println(Thread.currentThread().getName() + "-----分佈式鎖沒搶到,進入阻塞狀態");
cdl.await();
System.out.println(Thread.currentThread().getName() + "-----釋放分佈式鎖,被喚醒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 醒來後,取消watcher
client.unsubscribeDataChanges(this.beforePath.get(), listener);
}
@Override
public void unlock() {
System.out.println(Thread.currentThread().getName() + "-----釋放分佈式鎖");
if(reenterCount.get() > 1) {
// 重入次數減1,釋放鎖
reenterCount.set(reenterCount.get() - 1);
return;
}
// 刪除節點
if(this.currentPath.get() != null) {
this.client.delete(this.currentPath.get());
this.currentPath.set(null);
this.reenterCount.set(0);
}
}
複製代碼
ps:不用擔憂內存佔滿的問題,JVM會進行垃圾回收
這裏對於curator就不作展開了,有興趣能夠本身去玩下
地址:curator.apache.org/curator-exa…
對於選舉leader,鎖locking,增刪改查的framework等都有實現
距離上一篇的更新彷佛隔了好一段1時間,也是由於上週比較忙抽不出空子來,以後仍是會進行周更(盡力)
下一篇:從零開始的高併發(三)--- Zookeeper集羣的leader選舉