上一篇博客《基於ZooKeeper與zkclient的統一配置管理實現(一)》分享了基於ZooKeeper原生api實現的統一配置管理,本篇文章將經過使用zkclient封裝後的api來再次實現該功能。java
實現的效果與上一篇文章相似,這裏再也不贅述。node
系統仍然是由四個組件組成:api
集羣或單機版的ZooKeeper服務端,主要用以存儲IConfigPublisher發佈的配置文件信息服務器
配置文件的發佈器,負責將配置文件信息發佈到ZooKeeperServer中去app
配置文件變動狀況的訂閱器,由客戶端開啓對服務器配置文件信息的訂閱,當配置信息發生變動時,負責將本地的信息更新成最新的狀態框架
配置文件更改器,通常由用戶手動調用,用來更改配置文件的信息異步
啓動ZooKeeper集羣的方法同上一篇博客,這裏再也不贅述,各位能夠自行異步《基於ZooKeeper與zkclient的統一配置管理實現(一)》查看。ide
其中IConfigPublisher和IConfigSubscriber是接口,看一下兩個接口的定義:測試
/** * Config files publisher * @author hwang * */ public interface IConfigPublisher { /** * publish config files under {@link configDir} to {@link configRootNode} of {@link zkServerHost} * @param zkServerHost * @param configRootNode * @param configDir */ public void publish(String zkServerHost,String configRootNode,String configDir); }
IConfigPublisher接口只有一個publish方法,主要的工做就是把configDir目錄下的配置文件發佈到zkServerHost的configRootNode節點中去。ui
/** * Subscribe Config files change * @author hwang * */ public interface IConfigSubscriber { /** * <p>Subscribe config files change event under rootNode {@link configRootNode} of {@link zkServerHost}</p> * <p>include the dataChange and childrenChange </p> * @param zkServerHost * @param configRootNode */ public void subscribe(String zkServerHost,String configRootNode); }
IConfigSubscriber接口也只有一個方法,主要的工做就是訂閱ZkServerHost中的configRootNode節點的變化狀況。
如今來看下這兩個主要的核心接口的實現狀況。
IConfigPublisher接口的實現類是ZkConfigPublisher,看一下ZkConfigPublisher是怎麼實現publish方法的:
public class ZkConfigPublisher implements IConfigPublisher{ private static final Log logger = LogFactory.getLog(ZkConfigSubscriber.class); private ZkClient client; private String configRootNode; @Override public void publish(String zkServerHost,String configRootNode,String configDir){ try{ if(client==null){ client = new ZkClient(ZkConstant.ZK_CLUSTER_HOSTS,ZkConstant.ZK_SESSION_TIMEOUT); client.setZkSerializer(new ZkUtils.StringSerializer(ZkConstant.CONF_CHAR_SET)); } this.configRootNode = configRootNode; String rootNode = "/" + configRootNode; // 建立根節點 ZkClientNodeUtil.createNode(client, rootNode, configDir); // 掃描全部配置文件 this.scanConfigFiles(configDir,ZkConstant.ACCEPT_SUFFIX); }catch(Exception e){ logger.error("",e); } } /** * 掃描指定目錄下的全部配置文件,並將內容寫入到zookeeper節點中 * @param path 掃描的目錄 * @param acceptSuffix 接受的文件後綴 * @throws KeeperException * @throws InterruptedException * @throws IOException */ private void scanConfigFiles(String path,String acceptSuffix) throws KeeperException, InterruptedException, IOException{ File dir = new File(path); if(dir.exists() && dir.isDirectory()){ File[] subFiles = dir.listFiles(); for(File file : subFiles){ String absPath = file.getAbsolutePath(); String fileName = file.getName(); if(file.isDirectory() || (null!=acceptSuffix && !fileName.endsWith(acceptSuffix))){ this.scanConfigFiles(absPath,acceptSuffix); }else{ String parentDir = file.getParentFile().getAbsolutePath(); // 讀取文件內容 String fileContent = FileUtils.readFileToString(file,ZkConstant.CONF_CHAR_SET); // 建立目錄節點 ZkClientNodeUtil.createDirNode(client, configRootNode, parentDir); // 建立該目錄下的文件節點 ZkClientNodeUtil.createFileNode(client, configRootNode, parentDir, fileName, fileContent); } } } } }
實現方法很簡單,先建立了一個ZkClient對象,而後建立了一個根節點,最後掃描了指定目錄下的全部配置文件,並將符合要求的配置文件(及目錄)加入到ZooKeeperServer中去。
IConfigSubscriber接口的實現類是ZkConfigSubscriber,看一下ZkConfigSubscriber是怎麼實現subscribe方法的:
public class ZkConfigSubscriber implements IConfigSubscriber{ private static final Log logger = LogFactory.getLog(ZkConfigSubscriber.class); private ZkClient client; @Override public void subscribe(String zkServerHost, String configRootNode) { try{ if(client==null){ client = new ZkClient(ZkConstant.ZK_CLUSTER_HOSTS,ZkConstant.ZK_SESSION_TIMEOUT); client.setZkSerializer(new ZkUtils.StringSerializer(ZkConstant.CONF_CHAR_SET)); } String rootNode = "/" + configRootNode; this.clearConfigDir(client,rootNode); this.subscribeRootNode(client, rootNode); // 等待配置信息變動 Thread.currentThread().join(); }catch(Exception e){ logger.error("",e); } } /** * 清空本地的配置文件目錄 * @param client * @param rootNode * @throws IOException * @throws InterruptedException * @throws KeeperException */ private void clearConfigDir(ZkClient client,String rootNode) throws IOException{ if(client.exists(rootNode)){ String configDir = client.readData(rootNode); FileUtils.deleteDirectory(new File(configDir)); logger.info("Delete config dir:"+configDir); } } /** * 訂閱根節點和遞歸訂閱全部子節點 * @param client * @param rootNodePath * @throws KeeperException * @throws InterruptedException * @throws IOException */ private void subscribeRootNode(ZkClient client,String rootNodePath) throws IOException{ if(client.exists(rootNodePath)){ logger.debug("subscribe node:"+rootNodePath); ZkConfigSubscriber.subscribePath(client, rootNodePath); List<String> subList = client.getChildren(rootNodePath); if(null!=subList && subList.size()>0){ // 將節點的全部子節點保存起來 NodeChildrenChangedWrapper.addChildren(rootNodePath, subList); } for (String subNode : subList) { this.subscribeSubNode(client,rootNodePath,subNode); } }else{ logger.warn("rootNode:"+rootNodePath+" does not exists!"); } } /** * 訂閱子節點 * @param client * @param currentNode * @param subNode * @throws KeeperException * @throws InterruptedException * @throws IOException */ private void subscribeSubNode(ZkClient client,String currentNode,String subNode) throws IOException{ String nodePath = currentNode+"/"+subNode; if(nodePath.startsWith("/")){ // 訂閱子節點 if(client.exists(nodePath)){ // sync content to client String content = client.readData(nodePath); OnetimeConfigSyncer.syncToClient(content); logger.debug("subscribe node:"+nodePath); ZkConfigSubscriber.subscribePath(client, nodePath); List<String> subList = client.getChildren(nodePath); if(null!=subList && subList.size()>0){ // 將節點的全部子節點保存起來 NodeChildrenChangedWrapper.addChildren(nodePath, subList); } for (String _subNode : subList) { this.subscribeSubNode(client,nodePath,_subNode); } }else{ logger.warn("subNode:"+nodePath+" does not exists!"); } } } }
subscribe方法的具體實現也很簡單,先是清空本地的配置文件目錄,而後訂閱根節點和遞歸訂閱全部子節點。訂閱的時候,會將每一個節點和該節點的子節點的狀況保存到Map中去,具體的緣由在上一篇博客中已經作過說明,這個再也不贅述。具體執行訂閱的方法是ZkConfigSubscriber類中的subscribePath()方法,來看下該方法的內容:
/** * Store the paths that already subscribed */ private static Set<String> subscribedPathSet = new CopyOnWriteArraySet<String>(); public static void subscribePath(ZkClient client,String path){ if(!subscribedPathSet.contains(path)){ subscribedPathSet.add(path); // Subscribe ChildChange and DataChange event at path client.subscribeChildChanges(path, new ChildrenChangeListener(client)); client.subscribeDataChanges(path, new DataChangeListener(client)); logger.info("Subscribe ChildChange and DataChange event at path:"+path); } } public static void unsubscribePath(ZkClient client,String path){ if(subscribedPathSet.contains(path)){ subscribedPathSet.remove(path); // Unsubscribe ChildChange and DataChange event at path client.unsubscribeChildChanges(path, new ChildrenChangeListener(client)); client.unsubscribeDataChanges(path, new DataChangeListener(client)); logger.info("Unsubscribe ChildChange and DataChange event at path:"+path); } }
主要是用一個CopyOnWriteArraySet存儲全部已經訂閱的節點的path,防止重複訂閱。
其中訂閱時使用了兩個Listener類,分別是ChildrenChangeListener和DataChangeListener。
先來看下ChildrenChangeListener的實現:
/** * ChildrenChangeListener * @author hwang * */ public static class ChildrenChangeListener implements IZkChildListener{ private static final Log logger = LogFactory.getLog(ChildrenChangeListener.class); private ZkClient client; public ChildrenChangeListener(ZkClient client){ this.client = client; } @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { if(currentChilds==null || currentChilds.isEmpty()){ logger.warn("No currentChilds get form parentPath:"+parentPath); return; } ChildrenChangeResult changeResult = NodeChildrenChangedWrapper.diff(parentPath, currentChilds); ChildrenChangeType changeType = changeResult.getChangeType(); List<String> changePath = changeResult.getChangePath(); if(changePath==null || changePath.isEmpty()){ logger.warn("No children changePath get form parentPath:"+parentPath); return; } switch(changeType){ case add:{ for(String subPath : changePath){ logger.info("Add children node,path:"+parentPath+"/"+subPath); String path = parentPath+"/"+subPath; RealtimeConfigSyncer.syncToClient(client,path); } }break; case delete:{ for(String subPath : changePath){ ZkConfigSubscriber.unsubscribePath(client, subPath); String filePath = subPath.replaceAll(ZkConstant.SEPRATOR, "/"); FileUtils.deleteQuietly(new File(filePath)); logger.info("Delete children node,file:"+filePath); } }break; case update:{ logger.info("Update children node,will do nothing"); }break; default:{ logger.info("Default children node operate,will do nothing"); }break; } } }
ZkClient會在NodeChildChanged事件發生時主動觸發IZkChildListener接口的handleChildChange方法。因此咱們只須要實現IZkChildListener接口的handleChildChange方法便可,而且同一個path只須要訂閱一次,zkclient會自動爲咱們對path進行續訂。
一樣的還有IZkDataListener接口,咱們只須要實現IZkDataListener接口的handleDataChange和handleDataDeleted方法便可,下面就是該接口的具體實現狀況:
/** * DataChangeListener * @author hwang */ public static class DataChangeListener implements IZkDataListener{ private static final Log logger = LogFactory.getLog(DataChangeListener.class); private ZkClient client; public DataChangeListener(ZkClient client){ this.client = client; } @Override public void handleDataChange(String dataPath, Object data) throws Exception { logger.info("handleDataChange event,dataPath:"+dataPath); RealtimeConfigSyncer.syncToClient(client,dataPath); } @Override public void handleDataDeleted(String dataPath) throws Exception { logger.info("handleDataDeleted event,dataPath:"+dataPath); ZkConfigSubscriber.unsubscribePath(client, dataPath); String filePath = dataPath.substring(dataPath.indexOf(ZkConstant.SEPRATOR)).replaceAll(ZkConstant.SEPRATOR, "/"); FileUtils.deleteQuietly(new File(filePath)); } }
須要注意的是,當出現新增或修改事件時,只須要將最新的配置文件的內容同步到本地便可,可是出現刪除事件時,除了須要刪除本地的相關配置文件,還須要將已經訂閱的事件取消掉,也就是須要執行ZkConfigSubscriber.unsubscribePath()方法。
實現完發佈器和訂閱器以後,最後的一個就是配置文件更改器了。更改器主要的工做就是用來修改ZooKeeperServer端的配置文件的內容,具體的實現以下:
/** * 服務端配置文件更改器 * @author hwang * */ public class ZkConfigChanger { private static final Log logger = LogFactory.getLog(ZkConfigChanger.class); private static ZkClient client; /** * 初始化zkclient */ public static void init(){ if(client==null){ try { client = new ZkClient(ZkConstant.ZK_CLUSTER_HOSTS,ZkConstant.ZK_SESSION_TIMEOUT); client.setZkSerializer(new ZkUtils.StringSerializer(ZkConstant.CONF_CHAR_SET)); } catch (Exception e) { logger.error("",e); } } } /** * 新增目錄節點 * @param configRootNode * @param dirAbsolutePath 目錄的絕對路徑,該目錄必須是/config/開頭的目錄 * @throws KeeperException * @throws InterruptedException * @throws UnsupportedEncodingException */ public static boolean addConfigDir(String configRootNode,String dirAbsolutePath) throws KeeperException, InterruptedException, UnsupportedEncodingException{ if(null==client){ logger.warn("Not connected to ZooKeeper,will return"); return false; } if(StringUtils.isEmpty(dirAbsolutePath)){ logger.error("dirAbsolutePath can't be empty"); return false; } return ZkClientNodeUtil.createDirNode(client, configRootNode, dirAbsolutePath); } /** * 刪除目錄節點 * @param configRootNode * @param dirAbsolutePath * @throws InterruptedException * @throws KeeperException */ public static boolean deleteConfigDir(String configRootNode,String dirAbsolutePath) throws InterruptedException, KeeperException{ if(null==client){ logger.warn("Not connected to ZooKeeper,will return"); return false; } if(StringUtils.isEmpty(dirAbsolutePath)){ logger.error("dirAbsolutePath can't be empty"); return false; } return ZkClientNodeUtil.deleteDirNode(client, configRootNode, dirAbsolutePath); } /** * 新增文件節點 * @param configRootNode * @param fileAbsolutePath 文件的絕對路徑,不包括文件名 * @param fileName 文件名 * @param fileContent 文件內容 * @throws KeeperException * @throws InterruptedException * @throws UnsupportedEncodingException */ public static boolean addConfigFile(String configRootNode,String fileAbsolutePath,String fileName,String fileContent) throws KeeperException, InterruptedException, UnsupportedEncodingException{ if(null==client){ logger.warn("Not connected to ZooKeeper,will return"); return false; } if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName) || StringUtils.isEmpty(fileContent)){ logger.error("fileAbsolutePath,fileName,fileContent can't be empty"); return false; } return ZkClientNodeUtil.createFileNode(client, configRootNode, fileAbsolutePath, fileName, fileContent); } /** * 刪除文件節點 * @param configRootNode * @param fileAbsolutePath 文件的絕對路徑,不包括文件名 * @param fileName 文件名 * @throws InterruptedException * @throws KeeperException */ public static boolean deleteConfigFile(String configRootNode,String fileAbsolutePath,String fileName) throws InterruptedException, KeeperException{ if(null==client){ logger.warn("Not connected to ZooKeeper,will return"); return false; } if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName)){ logger.error("fileAbsolutePath,fileName can't be empty"); return false; } return ZkClientNodeUtil.deleteFileNode(client, configRootNode, fileAbsolutePath, fileName); } /** * 更新配置文件內容 * @param configRootNode * @param fileAbsolutePath * @param fileName * @param fileContent * @throws InterruptedException * @throws KeeperException * @throws UnsupportedEncodingException */ public static boolean updateConfigFile(String configRootNode,String fileAbsolutePath,String fileName,String fileContent) throws InterruptedException, KeeperException, UnsupportedEncodingException{ if(null==client){ logger.warn("Not connected to ZooKeeper,will return"); return false; } if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName) || StringUtils.isEmpty(fileContent)){ logger.error("fileAbsolutePath,fileName,fileContent can't be empty"); return false; } return ZkClientNodeUtil.updateFileNode(client, configRootNode, fileAbsolutePath, fileName, fileContent); } }
至此,經過ZkClient重構的統一配置管理框架就完成了。
通過實際測試,zkclient能夠完美解決上一篇博客中未解決的問題,這得益於zkclient大量正確的使用了retryUntilConnected方法。