上一篇的內容是補充了ZAB協議和分佈式隊列的一種實現,ZAB咱們談到了它的一個協議流程和在和follower失聯時的崩潰恢復,還有如何進行數據同步和丟棄事務。分佈式隊列的具體代碼實現中的結構,還有類中定義的每一個方法基本也都有說起了,相信你們也必定可以本身動手完成代碼的補充併成功運行。分佈式隊列的代碼邏輯以下圖,注意使用的虛實線和線的顏色都指代了不一樣的行爲。java
從零開始的高併發(一)--- Zookeeper的基礎概念node
從零開始的高併發(二)--- Zookeeper實現分佈式鎖服務器
從零開始的高併發(三)--- Zookeeper集羣的搭建和leader選舉markdown
從零開始的高併發(四)--- Zookeeper的經典應用場景數據結構
分佈式環境下,咱們的服務不少,配置確是共用一套的。處理起來會十分麻煩,配置中心能夠幫助咱們解決系統參數配置及參數的動態修改問題併發
運維管理人員把配置修改以後進行提交,把配置推送到配置中心,咱們分佈式應用下的每一個應用實例能夠經過對watch事件的監聽來獲取配置中心文件的變動,在不重啓服務的狀況下也能作到把應用中的一些屬性從內存中替換掉app
假設咱們如今擁有這麼一臺zookeeper服務器,咱們須要建立一個配置中心的根目錄distributeConfigure,注意圖中的server是指咱們集羣中的某項服務,server1-file1.cnf是指這個服務下的配置文件1,2,3···,server.port也屬於其中一個配置,從這一層開始對應zookeeper下的一個個節點,也就是說,咱們把每一項配置,好比剛剛的服務端口server.port,都看做是一個znode,而後一個一個往zookeeper中存過去便可。運維
此時咱們不關心服務是否掛掉或者怎樣,也不關心節點的順序(除非是服務功能中有特殊要求),還有就是,咱們的配置通常也會有該配置的名字,可是對於順序通常也是不要求的,因此咱們就會選用持久節點來記錄配置。分佈式
此時應用服務要作的事情就很簡單了,就是對這些個節點進行監控,只要節點存在變化,應用服務就把節點下的數據取過來便可。ide
前面提到的都是一個配置項對應一個znode,那咱們其實也能夠換一種想法,好比我不少個配置項都放在了同一個文件下,那我就換成,一個文件對應一個znode,把文件下的內容都放置在znode的value裏面
剛剛咱們也提到過了,運維管理人員把配置修改以後進行提交推送給配置中心,那咱們如今就得實現一個運維人員須要用到的接口出來,可以對配置文件進行讀寫操做。
public interface ConfigureWriter {
/**
* 建立一個新的配置文件
* @param fileName 文件名稱
* @param items 配置項
* @return 新文件的在zk上的路徑
*/
String createCnfFile(String fileName, Properties items);
/**
* 刪除一個配置文件
* @param fileName
*/
void deleteCnfFile(String fileName);
/**
* 修改一個配置文件
* @param fileName
* @param items
*/
void modifyCnfItem(String fileName, Properties items);
/**
* 加載配置文件
* @param fileName
* @return
*/
Properties loadCnfFile(String fileName);
複製代碼
}
/**
* 配置文件讀取器
* ConfigureReader
*/
public interface ConfigureReader {
/**
* 讀取配置文件
* @param fileName 配置文件名稱
* @param ChangeHandler 配置發生變化的處理器
* @return 若是存在文件配置,則返回Properties對象,不存在返回null
*/
Properties loadCnfFile(String fileName);
/**
* 監聽配置文件變化,此操做只須要調用一次。
* @param fileName
* @param changeHandler
*/
void watchCnfFile(String fileName, ChangeHandler changeHandler);
/**
* 配置文件變化處理器
* ChangeHandler
*/
interface ChangeHandler {
/**
* 配置文件發生變化後給一個完整的屬性對象
* @param newProp
*/
void itemChange(Properties newProp);
}
}
複製代碼
剛剛也提到了,運維人員須要使用到ConfigureWriter這個接口進行配置文件的讀寫操做,中途爲了確保zookeeper上不存在這個節點,先執行了一次writer.deleteCnfFile(fileName),使用了一個線程去讀配置文件
public class ConfigureTest {
public static void main(String[] args) {
// 模擬運維人員建立配置文件,引用ConfigureWriter接口操做
ConfigureWriter writer = new ZkConfigureCenter();
String fileName = "trade-application.properties";
writer.deleteCnfFile(fileName); // 測試,確保配置中心沒有這個問題
Properties items = new Properties();
items.put("abc.gc.a", "123");
items.put("abc.gc.b", "3456");
// 建立配置文件,內容爲 properties items的內容。
String znodePath = writer.createCnfFile(fileName, items);
System.out.println("new file: "+znodePath);
new Thread(()->{
readCnf();
}).start();
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3秒後修改文件內容,有新增、有刪除、有修改
items.put("abc.gc.a", "haha"); // 修改
items.put("abc.gc.c", "xx"); // 新增
items.remove("abc.gc.b"); // 刪除
writer.modifyCnfItem(fileName, items);
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 模擬應用程序加載配置文件,監聽配置文件的變化
*/
public static void readCnf() {
// 應用引用ConfigureReader接口進行操做
System.out.println("讀取並監聽配置文件");
ConfigureReader reader = new ZkConfigureCenter();
String fileName = "trade-application.properties";
Properties p = reader.loadCnfFile(fileName); // 讀取配置文件
System.out.println(p);
// 監聽配置文件
reader.watchCnfFile(fileName, new ChangeHandler() {
@Override
public void itemChange(Properties newProp) {
System.out.println("發現數據發生變化:"+ newProp);
}
});
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
複製代碼
其實就是把咱們的每個配置項都放入zookeeper上
首先,咱們創建了一個這樣的配置文件 /distributeConfigure/cnfFile/trade-application.properties,該文件下的內容是
客戶端這時也讀取到了這個配置文件的相關信息
而後咱們對這個配置文件進行了一些修改,在測試代碼的進程處已經寫得很是清楚作了哪些修改了,新增刪除修改3個操做都進行了一次
此時若是斷開鏈接,就會出現報錯
咱們能夠打開zkClient經過命令來檢查一下程序的可靠性,經過ls /path命令查看一下這些節點都是否正常以後,咱們來手動修改一下節點數據
此時事情還沒完,假設咱們一次性對幾百甚至上千個配置進行了修改,那豈不是一會兒會彈出幾百上千條通知嗎,因此咱們還要考慮一下請求合併,還有就是,咱們可能也不止一個運維人員,也有可能同時有好幾我的對同一個配置文件進行了修改,因此咱們也能夠考慮用鎖來鎖定這個配置文件,只讓一我的來進行修改,不過咱們要注意,由於zookeeper中對寫事務的提交是有原子性的,寫操做都會按順序來進行,不過咱們就會進行模擬,只容許一我的修改的狀況。
瞭解以上問題以後,咱們來講說上面代碼中未說起的配置中心的實現
public class ZkConfigureCenter implements ConfigureWriter, ConfigureReader {}
複製代碼
zkClient已是連着好幾篇文的老油條了,confRootPath是根目錄,confFilePath是配置文件目錄,fileLockPath是模擬鎖的目錄,由於咱們會針對一個文件使用一把鎖,那就確定不止使用一把,因此咱們就創建一個目錄來存放這些鎖。每當發起一次寫操做,那就增長一個節點當文件鎖。
private String confRootPath;
private String confFilePath;
private String fileLockPath;
private static final String default_confRootPath = "/distributeConfigure";
private ZkClient client;
複製代碼
public ZkConfigureCenter() {
this(default_confRootPath);
}
public ZkConfigureCenter(String path) {
if(path == null || path.trim().equals("")) {
throw new IllegalArgumentException("patch不能爲空字符串");
}
confRootPath = path;
confFilePath = confRootPath+"/cnfFile";
fileLockPath = confRootPath+"/writeLock";
client = new ZkClient("localhost:2181");
client.setZkSerializer(new MyZkSerializer());
if (!this.client.exists(confFilePath)) {
try {
this.client.createPersistent(confFilePath, true);
} catch (ZkNodeExistsException e) {
}
}
}
//簡單的參數檢查
private void checkElement(String v) {
if (v == null) throw new NullPointerException();
if("".equals(v.trim())) {
throw new IllegalArgumentException("不能使用空格");
}
if(v.startsWith(" ") || v.endsWith(" ")) {
throw new IllegalArgumentException("先後不能包含空格");
}
}
複製代碼
首先這配置文件總得有個fileName吧,items就是表明這個配置文件下的各項屬性,具體內容請看註釋,裏面使用到了咱們在 從零開始的高併發(二)--- Zookeeper實現分佈式鎖 中的ZkDistributeImproveLock.java,代碼位置在"使用zookeeper來進行開發"的 3 - ② zookeeper實現分佈式鎖方式二,若是想要跑一下前面的測試代碼的話,建議去ctrl+c/+v一下便可,記得要把我刪掉的不須要覆寫的方法補全
@Override
public String createCnfFile(String fileName, Properties items) {
checkElement(fileName);
// 建立配置文件Node
String cfgNode = confFilePath+"/"+fileName;
//若是配置文件已經存在,總不能把別人的給覆寫掉吧
if(client.exists(cfgNode)) {
throw new IllegalArgumentException("["+fileName+"]文件已存在!");
}
//沒問題了,建立持久節點
client.createPersistent(cfgNode, true);
// 建立配置文件中的配置項
if(items == null) {return cfgNode;}
//這裏咱們建立了帶上這個配置文件名字的一把分佈式鎖,不一樣的文件名就意味着不一樣的鎖
// ZkDistributeImproveLock的實現(參考"從零開始的高併發(二)--- Zookeeper實現分佈式鎖")
Lock distributeWriteLock = new ZkDistributeImproveLock(fileLockPath+"/"+fileName);
distributeWriteLock.lock();
try {
//如下就是對properties進行遍歷而後把屬性值一個個寫進去而已
//若是真的沒看懂這個,建議使用IDEA進行debug一下,
items.keySet().iterator();
Set<Map.Entry<Object, Object>> entrySet = items.entrySet();
for (Map.Entry<Object, Object> entry : entrySet) {
System.out.println(entry.getKey() + "=" + entry.getValue());
String cfgItemNode = cfgNode +"/"+ entry.getKey().toString();
client.createPersistent(cfgItemNode, entry.getValue());
}
} finally {
distributeWriteLock.unlock();
}
return cfgNode;
}
複製代碼
刪除和建立須要徵用同一把鎖,否則我建立的時候你就把個人給刪了這不太團結吧,deleteRecursive()方法是一個遞歸刪除方法,若是沒有獲取到鎖,會進行阻塞,也能夠在分佈式鎖實現中指定一個恢復時間,這個時間內沒有獲取到鎖,就把進程給結束掉,或者使用try,沒有獲取到,也就是有人在修改,那這時咱們給個返回值也好,拋出個異常也好,告訴該進程有人在修改便可
@Override
public void deleteCnfFile(String fileName) {
checkElement(fileName);
String cfgNode = confFilePath+"/"+fileName;
Lock distributeWriteLock = new ZkDistributeImproveLock(fileLockPath+"/"+fileName);
//獲取鎖
distributeWriteLock.lock();
try {
client.deleteRecursive(cfgNode);
} finally {
//釋放鎖
distributeWriteLock.unlock();
}
}
複製代碼
下面是deleteRecursive的源碼
它就會先從本身的子目錄,也就是children那裏開始找,而後遍歷刪除
這裏我是把提交過來的properties文件整個放入了znode裏面,提交過來的會默認爲最新的一份配置,主要思路就在於,先獲取原來的,而後再和如今新傳過來的進行比對。
由於若是咱們使用剛剛那個demo中先刪後增的方法,可能我100個配置我就只修改了一個配置,可是仍是把整個文件給從新弄一份,這樣會引起不少的監聽,因此try代碼塊裏面是先獲取到原來的配置信息,Set existentItemSet主要做用是去重,若是這個集合中包含了所修改的配置信息,就再判斷數據是否已經有變更,有變更的狀況下修改,而後把多餘的配置(這裏的多餘指的是沒有進行過對比處理的數據,由於修改或者說是不變都已是通過對比處理的)給刪除便可。若是這個集合中不包含我如今寫入的配置項,就要新增。
@Override
public void modifyCnfItem(String fileName, Properties items) {
checkElement(fileName);
// 獲取子節點信息
String cfgNode = confFilePath+"/"+fileName;
// 簡單粗暴的實現
if(items == null) {throw new NullPointerException("要修改的配置項不能爲空");}
items.keySet().iterator();
Set<Map.Entry<Object, Object>> entrySet = items.entrySet();
Lock distributeWriteLock = new ZkDistributeImproveLock(fileLockPath+"/"+fileName);
distributeWriteLock.lock();
try {
// 獲取zk中已存在的配置信息
List<String> itemNodes = client.getChildren(cfgNode);
Set<String> existentItemSet = itemNodes.stream().collect(Collectors.toSet());
for (Map.Entry<Object, Object> entry : entrySet) {
System.out.println(entry.getKey() + "=" + entry.getValue());
String itemName = entry.getKey().toString();
String itemData = entry.getValue().toString();
String cfgItemNode = cfgNode + "/" + itemName;
if(existentItemSet.contains(itemName)) {// zk中存在的配置項
String itemNodeData = client.readData(cfgItemNode);
if(! eql(itemNodeData, itemData)) { // 數據不一致才須要修改
client.writeData(cfgItemNode, itemData);
}
existentItemSet.remove(itemName); // 剩下的就是須要刪除的配置項
} else { // zk中不存在的配置項,新的配置項
client.createPersistent(cfgItemNode, itemData);
}
}
// existentItemSet中剩下的就是須要刪除的
if(!existentItemSet.isEmpty()) {
for(String itemName : existentItemSet) {
String cfgItemNode = cfgNode + "/" + itemName;
client.delete(cfgItemNode);
}
}
} finally {
distributeWriteLock.unlock();
}
}
複製代碼
比較簡單,沒啥好說
@Override
public Properties loadCnfFile(String fileName) {
if(! fileName.startsWith("/")) {
fileName = confFilePath+"/"+fileName;
}
return loadNodeCnfFile(fileName);
}
private Properties loadNodeCnfFile(String cfgNode) {
checkElement(cfgNode);
if(! client.exists(cfgNode)) {
throw new ZkNoNodeException(cfgNode);
}
// 獲取子節點信息
List<String> itemNodes = client.getChildren(cfgNode);
// 讀取配置信息,並裝載到Properties中
if(itemNodes == null || itemNodes.isEmpty()) {
return new Properties();
}
Properties file = new Properties();
itemNodes.stream().forEach((e)->{
String itemNameNode = cfgNode + "/" + e;
String data = client.readData(itemNameNode, true);
file.put(e, data);
});
return file;
}
複製代碼
這裏子節點的數據讀取(也就是我剛剛打開zookeeper的zkClient而後用命令來新增節點)會有個問題,當咱們須要新增節點的時候,咱們是不會觸發咱們DataChange監聽事件的,那是由於,我新增節點的時候,根本就尚未這個節點,在尚未這個節點的時候,是沒法監聽內容的變動的。因此咱們在仍是須要經過子節點的handleChildChange()來補救這個監聽,這就是爲何代碼最後須要用到client.subscribeChildChanges(···),此時監聽父節點的子節點變動,若是子節點有發生變化了,那就觸發事件便可,fileNodePath是父節點的路徑
triggerHandler請參考 ⑨ 的內容
@Override
public void watchCnfFile(String fileName, ChangeHandler changeHandler) {
if(! fileName.startsWith("/")) {
fileName = confFilePath+"/"+fileName;
}
final String fileNodePath = fileName;
// 讀取文件
Properties p = loadNodeCnfFile(fileNodePath);
if(p != null) {
// 合併5秒配置項變化,5秒內變化只觸發一次處理事件
int waitTime = 5;
final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(1);
scheduled.setRemoveOnCancelPolicy(true);
final List<ScheduledFuture<?>> futureList = new ArrayList<ScheduledFuture<?>>();
Set<Map.Entry<Object, Object>> entrySet = p.entrySet();
for (Map.Entry<Object, Object> entry : entrySet) {
System.out.println("監控:"+fileNodePath+"/"+entry.getKey().toString());
client.subscribeDataChanges(fileNodePath+"/"+entry.getKey().toString(), new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("觸發刪除:"+dataPath);
triggerHandler(futureList, scheduled, waitTime, fileNodePath, changeHandler);
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("觸發修改:"+dataPath);
triggerHandler(futureList, scheduled, waitTime, fileNodePath, changeHandler);
}
});
}
client.subscribeChildChanges(fileNodePath, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("觸發子節點:"+parentPath);
triggerHandler(futureList, scheduled, waitTime, fileNodePath, changeHandler);
}
});
}
}
複製代碼
在 ⑧ 那裏咱們已經看到了一個配置項的修改會觸發這麼多個監聽事件,這種作法不太可取,回到咱們的ConfigureReader中,咱們已經定義好的這麼一個接口,把配置發生變化的配置項在5秒(waitTime)內所進行的修改都合併成一個事件
/**
* 配置文件變化處理器
* ChangeHandler
*/
interface ChangeHandler {
/**
* 配置文件發生變化後給一個完整的屬性對象
* @param newProp
*/
void itemChange(Properties newProp);
}
複製代碼
再回到ZkConfigureCenter.java,比較方便的理解就是,咱們把咱們對提交上來的修改,根據時間劃分爲5秒一塊,此時在這5秒以內最後一個修改任務以前的future,若是仍未執行成功,會進行cancel()取消掉,而後remove掉,咱們只取這5秒內的最後一個事件做爲咱們監聽事件觸發的條件,因此與其說合並事件,不如就是單純認爲,咱們取了5秒內futureList的最後一個future。
/**
* 合併修改變化事件,5秒鐘內發生變化的合併到一個事件進行
* @param futureList 裝有定時觸發任務的列表
* @param scheduled 定時任務執行器
* @param waitTime 延遲時間,單位秒
* @param fileName zk配置文件的節點
* @param changeHandler 事件處理器
*/
private void triggerHandler(List<ScheduledFuture<?>> futureList, ScheduledThreadPoolExecutor scheduled, int waitTime, String fileName, ChangeHandler changeHandler) {
if(futureList != null && !futureList.isEmpty()) {
for(int i = 0 ; i < futureList.size(); i++) {
ScheduledFuture<?> future = futureList.get(i);
if(future != null && !future.isCancelled() && !future.isDone()) {
future.cancel(true);
futureList.remove(future);
i--;
}
}
}
ScheduledFuture<?> future = scheduled.schedule(()->{
Properties p = loadCnfFile(fileName);
changeHandler.itemChange(p);
}, waitTime, TimeUnit.SECONDS);
futureList.add(future);
}
複製代碼
至此配置中心的模擬實現就結束了。整個類的代碼都在 ① ~ ⑨ 中,能夠直接ctrl+c/+v使用
配置中心的知識總結其實就是下面4個知識點
持久節點+watch機制+分佈式鎖+事件合併
複製代碼
master和關於一些zookeeper官網的一些treasure的介紹擱置到下一篇···
next:從零開始的高併發(六)--- Zookeeper的經典應用場景3