基於ZooKeeper與zkclient的統一配置管理實現(二)

上一篇博客《基於ZooKeeper與zkclient的統一配置管理實現(一)》分享了基於ZooKeeper原生api實現的統一配置管理,本篇文章將經過使用zkclient封裝後的api來再次實現該功能。java

實現的效果與上一篇文章相似,這裏再也不贅述。node

系統的結構

系統仍然是由四個組件組成:api

  • ZooKeeperServer

        集羣或單機版的ZooKeeper服務端,主要用以存儲IConfigPublisher發佈的配置文件信息服務器

  • IConfigPublisher

        配置文件的發佈器,負責將配置文件信息發佈到ZooKeeperServer中去app

  • IConfigSubscriber

        配置文件變動狀況的訂閱器,由客戶端開啓對服務器配置文件信息的訂閱,當配置信息發生變動時,負責將本地的信息更新成最新的狀態框架

  • ZkConfigChanger

        配置文件更改器,通常由用戶手動調用,用來更改配置文件的信息異步

啓動ZooKeeper集羣的方法同上一篇博客,這裏再也不贅述,各位能夠自行異步《基於ZooKeeper與zkclient的統一配置管理實現(一)》查看。ide

其中IConfigPublisher和IConfigSubscriber是接口,看一下兩個接口的定義:測試

/**
 * Config files publisher
 * @author hwang
 *
 */
public interface IConfigPublisher {

	/**
	 * publish config files under {@link configDir} to {@link configRootNode} of {@link zkServerHost}
	 * @param zkServerHost
	 * @param configRootNode
	 * @param configDir
	 */
	public void publish(String zkServerHost,String configRootNode,String configDir);
}

IConfigPublisher接口只有一個publish方法,主要的工做就是把configDir目錄下的配置文件發佈到zkServerHost的configRootNode節點中去。ui

/**
 * Subscribe Config files change
 * @author hwang
 *
 */
public interface IConfigSubscriber {

	/**
	 * <p>Subscribe config files change event under rootNode {@link configRootNode} of {@link zkServerHost}</p>
	 * <p>include the dataChange and childrenChange </p>
	 * @param zkServerHost
	 * @param configRootNode
	 */
	public void subscribe(String zkServerHost,String configRootNode);
}

IConfigSubscriber接口也只有一個方法,主要的工做就是訂閱ZkServerHost中的configRootNode節點的變化狀況。

如今來看下這兩個主要的核心接口的實現狀況。

配置文件發佈器IConfigPublisher

IConfigPublisher接口的實現類是ZkConfigPublisher,看一下ZkConfigPublisher是怎麼實現publish方法的:

public class ZkConfigPublisher implements IConfigPublisher{

	private static final Log logger = LogFactory.getLog(ZkConfigSubscriber.class);
	
	private ZkClient client;
	
	private String configRootNode;
	
	@Override
	public void publish(String zkServerHost,String configRootNode,String configDir){
		try{
			if(client==null){
				client = new ZkClient(ZkConstant.ZK_CLUSTER_HOSTS,ZkConstant.ZK_SESSION_TIMEOUT);
				client.setZkSerializer(new ZkUtils.StringSerializer(ZkConstant.CONF_CHAR_SET));
			}
			this.configRootNode = configRootNode;
			String rootNode = "/" + configRootNode;
			// 建立根節點
			ZkClientNodeUtil.createNode(client, rootNode, configDir);
			// 掃描全部配置文件
			this.scanConfigFiles(configDir,ZkConstant.ACCEPT_SUFFIX);
		}catch(Exception e){
			logger.error("",e);
		}
	}
	
	/**
     * 掃描指定目錄下的全部配置文件,並將內容寫入到zookeeper節點中
     * @param path	掃描的目錄
     * @param acceptSuffix 接受的文件後綴
     * @throws KeeperException
     * @throws InterruptedException
     * @throws IOException 
     */
    private void scanConfigFiles(String path,String acceptSuffix) throws KeeperException, InterruptedException, IOException{
    	File dir = new File(path);
        if(dir.exists() && dir.isDirectory()){
        	File[] subFiles = dir.listFiles();
        	for(File file : subFiles){
        		String absPath = file.getAbsolutePath();
        		String fileName = file.getName();
        		if(file.isDirectory() || (null!=acceptSuffix && !fileName.endsWith(acceptSuffix))){
        			this.scanConfigFiles(absPath,acceptSuffix);
        		}else{
        			String parentDir = file.getParentFile().getAbsolutePath();
        			// 讀取文件內容
        			String fileContent = FileUtils.readFileToString(file,ZkConstant.CONF_CHAR_SET);
        			// 建立目錄節點
        			ZkClientNodeUtil.createDirNode(client, configRootNode, parentDir);
        			// 建立該目錄下的文件節點
        			ZkClientNodeUtil.createFileNode(client, configRootNode, parentDir, fileName, fileContent);
        		}
        	}
        }
    }
    
}

實現方法很簡單,先建立了一個ZkClient對象,而後建立了一個根節點,最後掃描了指定目錄下的全部配置文件,並將符合要求的配置文件(及目錄)加入到ZooKeeperServer中去。

 

配置文件訂閱器IConfigSubscriber

IConfigSubscriber接口的實現類是ZkConfigSubscriber,看一下ZkConfigSubscriber是怎麼實現subscribe方法的:

public class ZkConfigSubscriber implements IConfigSubscriber{

	private static final Log logger = LogFactory.getLog(ZkConfigSubscriber.class);
	
	
	private ZkClient client;
	
	@Override
	public void subscribe(String zkServerHost, String configRootNode) {
		try{
			if(client==null){
				client = new ZkClient(ZkConstant.ZK_CLUSTER_HOSTS,ZkConstant.ZK_SESSION_TIMEOUT);
				client.setZkSerializer(new ZkUtils.StringSerializer(ZkConstant.CONF_CHAR_SET));
			}
			String rootNode = "/" + configRootNode;
			this.clearConfigDir(client,rootNode);
			this.subscribeRootNode(client, rootNode);
			// 等待配置信息變動
			Thread.currentThread().join();
		}catch(Exception e){
			logger.error("",e);
		}
	}

	
	
	/**
     * 清空本地的配置文件目錄
     * @param client
     * @param rootNode
     * @throws IOException
     * @throws InterruptedException 
     * @throws KeeperException 
     */
	private void clearConfigDir(ZkClient client,String rootNode) throws IOException{
    	if(client.exists(rootNode)){
    		String configDir = client.readData(rootNode);
    		FileUtils.deleteDirectory(new File(configDir));
    		logger.info("Delete config dir:"+configDir);
    	}
    }
    
    
    /**
     * 訂閱根節點和遞歸訂閱全部子節點
     * @param client
     * @param rootNodePath
     * @throws KeeperException
     * @throws InterruptedException
     * @throws IOException 
     */
	private void subscribeRootNode(ZkClient client,String rootNodePath) throws IOException{
        if(client.exists(rootNodePath)){
        	logger.debug("subscribe node:"+rootNodePath);
        	ZkConfigSubscriber.subscribePath(client, rootNodePath);
        	List<String> subList = client.getChildren(rootNodePath);
        	if(null!=subList && subList.size()>0){
        		// 將節點的全部子節點保存起來
        		NodeChildrenChangedWrapper.addChildren(rootNodePath, subList);
        	}
        	for (String subNode : subList) { 
        		this.subscribeSubNode(client,rootNodePath,subNode);
        	}
        }else{
        	logger.warn("rootNode:"+rootNodePath+" does not exists!");
        }
    }
    
    /**
     * 訂閱子節點
     * @param client
     * @param currentNode
     * @param subNode
     * @throws KeeperException
     * @throws InterruptedException
     * @throws IOException 
     */
	private void subscribeSubNode(ZkClient client,String currentNode,String subNode) throws IOException{
    	String nodePath = currentNode+"/"+subNode;
    	if(nodePath.startsWith("/")){
    		// 訂閱子節點
    		if(client.exists(nodePath)){
    			// sync content to client
            	String content = client.readData(nodePath);
            	OnetimeConfigSyncer.syncToClient(content);
            	
    			logger.debug("subscribe node:"+nodePath);
    			ZkConfigSubscriber.subscribePath(client, nodePath);
            	
            	List<String> subList = client.getChildren(nodePath); 
            	if(null!=subList && subList.size()>0){
            		// 將節點的全部子節點保存起來
            		NodeChildrenChangedWrapper.addChildren(nodePath, subList);
            	}
            	for (String _subNode : subList) {
            		this.subscribeSubNode(client,nodePath,_subNode);
            	}
            }else{
            	logger.warn("subNode:"+nodePath+" does not exists!");
            }
    	}
    }

}

subscribe方法的具體實現也很簡單,先是清空本地的配置文件目錄,而後訂閱根節點和遞歸訂閱全部子節點。訂閱的時候,會將每一個節點和該節點的子節點的狀況保存到Map中去,具體的緣由在上一篇博客中已經作過說明,這個再也不贅述。具體執行訂閱的方法是ZkConfigSubscriber類中的subscribePath()方法,來看下該方法的內容:

/**
	 * Store the paths that already subscribed
	 */
	private static Set<String> subscribedPathSet = new CopyOnWriteArraySet<String>();
	
	public static void subscribePath(ZkClient client,String path){
		if(!subscribedPathSet.contains(path)){
			subscribedPathSet.add(path);
			// Subscribe ChildChange and DataChange event at path
        	client.subscribeChildChanges(path, new ChildrenChangeListener(client));
        	client.subscribeDataChanges(path, new DataChangeListener(client));
        	logger.info("Subscribe ChildChange and DataChange event at path:"+path);
		}
	}
	
	public static void unsubscribePath(ZkClient client,String path){
		if(subscribedPathSet.contains(path)){
			subscribedPathSet.remove(path);
			// Unsubscribe ChildChange and DataChange event at path
			client.unsubscribeChildChanges(path, new ChildrenChangeListener(client));
			client.unsubscribeDataChanges(path, new DataChangeListener(client));
			logger.info("Unsubscribe ChildChange and DataChange event at path:"+path);
		}
	}

主要是用一個CopyOnWriteArraySet存儲全部已經訂閱的節點的path,防止重複訂閱。

其中訂閱時使用了兩個Listener類,分別是ChildrenChangeListener和DataChangeListener。

ChildrenChangeListener

先來看下ChildrenChangeListener的實現:

/**
	 * ChildrenChangeListener 
	 * @author hwang
	 *
	 */
	public static class ChildrenChangeListener implements IZkChildListener{

		private static final Log logger = LogFactory.getLog(ChildrenChangeListener.class);
		
		private ZkClient client;
		
		public ChildrenChangeListener(ZkClient client){
			this.client = client;
		}
		
		@Override
		public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
			if(currentChilds==null || currentChilds.isEmpty()){
				logger.warn("No currentChilds get form parentPath:"+parentPath);
				return;
			}
			ChildrenChangeResult changeResult = NodeChildrenChangedWrapper.diff(parentPath, currentChilds);
	    	ChildrenChangeType changeType = changeResult.getChangeType();
	    	List<String> changePath = changeResult.getChangePath();
	    	if(changePath==null || changePath.isEmpty()){
	    		logger.warn("No children changePath get form parentPath:"+parentPath);
	    		return;
	    	}
	    	switch(changeType){
	        	case add:{
	        		for(String subPath : changePath){
	        			logger.info("Add children node,path:"+parentPath+"/"+subPath);
	        			String path = parentPath+"/"+subPath;
	        			RealtimeConfigSyncer.syncToClient(client,path);
	        		}
	        	}break;
	        	case delete:{
	        		for(String subPath : changePath){
	        			ZkConfigSubscriber.unsubscribePath(client, subPath);
	        			String filePath = subPath.replaceAll(ZkConstant.SEPRATOR, "/");
	    	        	FileUtils.deleteQuietly(new File(filePath));
	    	        	logger.info("Delete children node,file:"+filePath);
	        		}
	        	}break;
	        	case update:{
	        		logger.info("Update children node,will do nothing");
	        	}break;
	        	default:{
	        		logger.info("Default children node operate,will do nothing");
	        	}break;
	    	}
		}
	}

ZkClient會在NodeChildChanged事件發生時主動觸發IZkChildListener接口的handleChildChange方法。因此咱們只須要實現IZkChildListener接口的handleChildChange方法便可,而且同一個path只須要訂閱一次,zkclient會自動爲咱們對path進行續訂。

DataChangeListener

一樣的還有IZkDataListener接口,咱們只須要實現IZkDataListener接口的handleDataChange和handleDataDeleted方法便可,下面就是該接口的具體實現狀況:

/**
	 * DataChangeListener 
	 * @author hwang
	 */
	public static class DataChangeListener implements IZkDataListener{

		private static final Log logger = LogFactory.getLog(DataChangeListener.class);
		
		private ZkClient client;
		
		public DataChangeListener(ZkClient client){
			this.client = client;
		}
		@Override
		public void handleDataChange(String dataPath, Object data) throws Exception {
			logger.info("handleDataChange event,dataPath:"+dataPath);
			RealtimeConfigSyncer.syncToClient(client,dataPath);
		}

		@Override
		public void handleDataDeleted(String dataPath) throws Exception {
			logger.info("handleDataDeleted event,dataPath:"+dataPath);
			ZkConfigSubscriber.unsubscribePath(client, dataPath);
			String filePath = dataPath.substring(dataPath.indexOf(ZkConstant.SEPRATOR)).replaceAll(ZkConstant.SEPRATOR, "/");
	    	FileUtils.deleteQuietly(new File(filePath));
		}
	}

須要注意的是,當出現新增或修改事件時,只須要將最新的配置文件的內容同步到本地便可,可是出現刪除事件時,除了須要刪除本地的相關配置文件,還須要將已經訂閱的事件取消掉,也就是須要執行ZkConfigSubscriber.unsubscribePath()方法。

 

配置文件更改器ZkConfigChanger

實現完發佈器和訂閱器以後,最後的一個就是配置文件更改器了。更改器主要的工做就是用來修改ZooKeeperServer端的配置文件的內容,具體的實現以下:

/**
 * 服務端配置文件更改器
 * @author hwang
 *
 */
public class ZkConfigChanger {

	private static final Log logger = LogFactory.getLog(ZkConfigChanger.class);
	
	private static ZkClient client;
	
	/**
	 * 初始化zkclient
	 */
    public static void init(){
    	if(client==null){
    		try {
    			client = new ZkClient(ZkConstant.ZK_CLUSTER_HOSTS,ZkConstant.ZK_SESSION_TIMEOUT);
    			client.setZkSerializer(new ZkUtils.StringSerializer(ZkConstant.CONF_CHAR_SET));
    		} catch (Exception e) {
    			logger.error("",e);
    		}	
    	}
    }
    
    
    /**
     * 新增目錄節點
     * @param configRootNode
     * @param dirAbsolutePath 目錄的絕對路徑,該目錄必須是/config/開頭的目錄
     * @throws KeeperException
     * @throws InterruptedException
     * @throws UnsupportedEncodingException
     */
    public static boolean addConfigDir(String configRootNode,String dirAbsolutePath) throws KeeperException, InterruptedException, UnsupportedEncodingException{
    	if(null==client){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return false;
    	}
    	if(StringUtils.isEmpty(dirAbsolutePath)){
    		logger.error("dirAbsolutePath can't be empty");
    		return false;
    	}
    	return ZkClientNodeUtil.createDirNode(client, configRootNode, dirAbsolutePath);
    }
  
    /**
     * 刪除目錄節點
     * @param configRootNode
     * @param dirAbsolutePath
     * @throws InterruptedException
     * @throws KeeperException
     */
    public static boolean deleteConfigDir(String configRootNode,String dirAbsolutePath) throws InterruptedException, KeeperException{
    	if(null==client){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return false;
    	}
    	if(StringUtils.isEmpty(dirAbsolutePath)){
    		logger.error("dirAbsolutePath can't be empty");
    		return false;
    	}
    	return ZkClientNodeUtil.deleteDirNode(client, configRootNode,  dirAbsolutePath);
    }
    

    
    /**
     * 新增文件節點
     * @param configRootNode
     * @param fileAbsolutePath 文件的絕對路徑,不包括文件名
     * @param fileName 文件名
     * @param fileContent 文件內容
     * @throws KeeperException
     * @throws InterruptedException
     * @throws UnsupportedEncodingException
     */
    public static boolean addConfigFile(String configRootNode,String fileAbsolutePath,String fileName,String fileContent) throws KeeperException, InterruptedException, UnsupportedEncodingException{
    	if(null==client){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return false;
    	}
    	if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName) || StringUtils.isEmpty(fileContent)){
    		logger.error("fileAbsolutePath,fileName,fileContent can't be empty");
    		return false;
    	}
    	return ZkClientNodeUtil.createFileNode(client, configRootNode,  fileAbsolutePath, fileName, fileContent);
    }
    
    /**
     * 刪除文件節點
     * @param configRootNode
     * @param fileAbsolutePath 文件的絕對路徑,不包括文件名
     * @param fileName 文件名
     * @throws InterruptedException
     * @throws KeeperException
     */
    public static boolean deleteConfigFile(String configRootNode,String fileAbsolutePath,String fileName) throws InterruptedException, KeeperException{
    	if(null==client){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return false;
    	}
    	if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName)){
    		logger.error("fileAbsolutePath,fileName can't be empty");
    		return false;
    	}
    	return ZkClientNodeUtil.deleteFileNode(client, configRootNode,  fileAbsolutePath, fileName);
    }
    

    /**
     * 更新配置文件內容
     * @param configRootNode
     * @param fileAbsolutePath
     * @param fileName
     * @param fileContent
     * @throws InterruptedException
     * @throws KeeperException
     * @throws UnsupportedEncodingException
     */
    public static boolean updateConfigFile(String configRootNode,String fileAbsolutePath,String fileName,String fileContent) throws InterruptedException, KeeperException, UnsupportedEncodingException{
    	if(null==client){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return false;
    	}
    	if(StringUtils.isEmpty(fileAbsolutePath)  || StringUtils.isEmpty(fileName)  || StringUtils.isEmpty(fileContent)){
    		logger.error("fileAbsolutePath,fileName,fileContent can't be empty");
    		return false;
    	}
		return ZkClientNodeUtil.updateFileNode(client, configRootNode,  fileAbsolutePath, fileName, fileContent);
    }
    
    
    
}

至此,經過ZkClient重構的統一配置管理框架就完成了。

通過實際測試,zkclient能夠完美解決上一篇博客中未解決的問題,這得益於zkclient大量正確的使用了retryUntilConnected方法。

相關文章
相關標籤/搜索