基於ZooKeeper的統一配置管理實現(一)

先來看一下配置管理實現的效果

服務端啓動

Jan 12, 2017 4:18:43 PM org.apache.zookeeper.server.persistence.FileTxnLog append
INFO: Creating new log file: log.400000001
Jan 12, 2017 4:18:43 PM org.apache.zookeeper.server.persistence.FileTxnLog append
INFO: Creating new log file: log.400000001
Jan 12, 2017 4:18:43 PM org.apache.zookeeper.server.ZooKeeperServer finishSessionInit
INFO: Established session 0x25991c1eda50000 with negotiated timeout 5000 for client /10.1.126.104:33941
Jan 12, 2017 4:18:43 PM org.apache.zookeeper.ClientCnxn$SendThread onConnected
INFO: Session establishment complete on server 10.1.126.104/10.1.126.104:2182, sessionid = 0x25991c1eda50000, negotiated timeout = 5000
2017-01-12 16:18:43[INFO] - ServerConfigPublisher Connected to ZooKeeper Cluster:10.1.126.104:2181,10.1.126.104:2182,10.1.126.104:2183
[com.zkconfig.config.server.ServerConfigPublisher.connectZkServer(ServerConfigPublisher.java:75)]
2017-01-12 16:18:43[WARN] - Node:/config alreadly exists,can't create.
[com.zkconfig.util.ZkUtil.createNode(ZkUtil.java:69)]
2017-01-12 16:18:43[WARN] - Node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_other alreadly exists,can't create.
[com.zkconfig.util.ZkUtil.createDirNode(ZkUtil.java:120)]
2017-01-12 16:18:43[WARN] - Node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_other/Backchina_SearchEngineConfig.xml alreadly exists,can't create.
[com.zkconfig.util.ZkUtil.createFileNode(ZkUtil.java:231)]
2017-01-12 16:18:43[WARN] - Node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_baidu alreadly exists,can't create.
[com.zkconfig.util.ZkUtil.createDirNode(ZkUtil.java:120)]
2017-01-12 16:18:43[WARN] - Node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_baidu/BaiduTieba_SearchEngineConfig.xml alreadly exists,can't create.
[com.zkconfig.util.ZkUtil.createFileNode(ZkUtil.java:231)]
2017-01-12 16:18:43[WARN] - Node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_baidu alreadly exists,can't create.
[com.zkconfig.util.ZkUtil.createDirNode(ZkUtil.java:120)]
2017-01-12 16:18:43[WARN] - Node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_baidu/Baidu_SearchEngineConfig.xml alreadly exists,can't create.
[com.zkconfig.util.ZkUtil.createFileNode(ZkUtil.java:231)]
2017-01-12 16:18:43[WARN] - Node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_tianya alreadly exists,can't create.
[com.zkconfig.util.ZkUtil.createDirNode(ZkUtil.java:120)]
2017-01-12 16:18:43[WARN] - Node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_tianya/Tianya_SearchEngineConfig.xml alreadly exists,can't create.
[com.zkconfig.util.ZkUtil.createFileNode(ZkUtil.java:231)]
Jan 12, 2017 4:18:48 PM org.apache.zookeeper.server.ZooKeeperServer expire
INFO: Expiring session 0x359871022840000, timeout of 5000ms exceeded
Jan 12, 2017 4:18:48 PM org.apache.zookeeper.server.ZooKeeperServer expire
INFO: Expiring session 0x359871022840001, timeout of 5000ms exceeded
Jan 12, 2017 4:18:48 PM org.apache.zookeeper.server.PrepRequestProcessor pRequest2Txn
INFO: Processed session termination for sessionid: 0x359871022840000
Jan 12, 2017 4:18:48 PM org.apache.zookeeper.server.PrepRequestProcessor pRequest2Txn
INFO: Processed session termination for sessionid: 0x359871022840001
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.server.NIOServerCnxnFactory run
INFO: Accepted socket connection from /10.1.126.102:38841
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.server.ZooKeeperServer processConnectRequest
INFO: Client attempting to establish new session at /10.1.126.102:38841
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.server.ZooKeeperServer finishSessionInit
INFO: Established session 0x15991c1eda50000 with negotiated timeout 5000 for client /10.1.126.102:38841

客戶端啓動

Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:zookeeper.version=3.4.9-1757313, built on 08/23/2016 07:39 GMT
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:host.name=test-102
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:java.version=1.7.0_06
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:java.vendor=Oracle Corporation
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:java.home=/usr/java/jdk1.7.0_06/jre
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:java.class.path=ZkConfigClientMain-1.0.jar
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:java.library.path=/opt/FUDE-0.4/fude/lib:/opt/FUDE-0.4/utils/lib:/opt/FUDE-0.4/log/lib:/opt/FUDE-0.4/netsnmp/lib:/opt/FUDE-0.4/omni/lib:/opt/FUDE-0.4/python/lib:/opt/FUDE-0.4/boost/lib:/opt/FUDE-0.4/cppunit/lib:/opt/FUDE-0.4/TAO/ACE_wrappers/lib::/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:java.io.tmpdir=/tmp
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:java.compiler=<NA>
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:os.name=Linux
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:os.arch=amd64
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:os.version=2.6.18-92.el5
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:user.name=root
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:user.home=/root
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:user.dir=/home/NetPF/bin/zk
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.ZooKeeper <init>
INFO: Initiating client connection, connectString=10.1.126.104:2181,10.1.126.104:2182,10.1.126.104:2183 sessionTimeout=5000 watcher=com.zkconfig.config.client.ConfigChangeListener$ConnectedWatcher@1786b2ca
2017-01-12 16:19:00[INFO] - ZooKeeper State:CONNECTING
[com.zkconfig.config.client.ConfigChangeListener.waitUntilConnected(ConfigChangeListener.java:58)]
2017-01-12 16:19:00[INFO] - ConnectedLatch.await
[com.zkconfig.config.client.ConfigChangeListener.waitUntilConnected(ConfigChangeListener.java:61)]
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.ClientCnxn$SendThread logStartConnect
INFO: Opening socket connection to server 10.1.126.104/10.1.126.104:2181. Will not attempt to authenticate using SASL (unknown error)
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.ClientCnxn$SendThread primeConnection
INFO: Socket connection established to 10.1.126.104/10.1.126.104:2181, initiating session
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.ClientCnxn$SendThread onConnected
INFO: Session establishment complete on server 10.1.126.104/10.1.126.104:2181, sessionid = 0x15991c1eda50000, negotiated timeout = 5000
2017-01-12 16:19:00[INFO] - ConfigChangeListener Connected to ZooKeeper Cluster:10.1.126.104:2181,10.1.126.104:2182,10.1.126.104:2183
[com.zkconfig.config.client.ConfigChangeListener.start(ConfigChangeListener.java:78)]
2017-01-12 16:19:01[INFO] - Delete config dir:/home/NetPF/bin/zk/zkconfig
[com.zkconfig.config.client.ClientConfigUpdater.clearConfigDir(ClientConfigUpdater.java:45)]
2017-01-12 16:19:01[INFO] - Mkdir:/home/NetPF/bin/zk/zkconfig/other
[com.zkconfig.config.client.ClientConfigUpdater.syncNode(ClientConfigUpdater.java:277)]
2017-01-12 16:19:01[INFO] - Write file:/home/NetPF/bin/zk/zkconfig/other/Backchina_SearchEngineConfig.xml
[com.zkconfig.config.client.ClientConfigUpdater.syncNode(ClientConfigUpdater.java:283)]
2017-01-12 16:19:01[INFO] - Mkdir:/home/NetPF/bin/zk/zkconfig/baidu
[com.zkconfig.config.client.ClientConfigUpdater.syncNode(ClientConfigUpdater.java:277)]
2017-01-12 16:19:01[INFO] - Write file:/home/NetPF/bin/zk/zkconfig/baidu/BaiduTieba_SearchEngineConfig.xml
[com.zkconfig.config.client.ClientConfigUpdater.syncNode(ClientConfigUpdater.java:283)]
2017-01-12 16:19:01[INFO] - Write file:/home/NetPF/bin/zk/zkconfig/baidu/Baidu_SearchEngineConfig.xml
[com.zkconfig.config.client.ClientConfigUpdater.syncNode(ClientConfigUpdater.java:283)]
2017-01-12 16:19:01[INFO] - Mkdir:/home/NetPF/bin/zk/zkconfig/tianya
[com.zkconfig.config.client.ClientConfigUpdater.syncNode(ClientConfigUpdater.java:277)]
2017-01-12 16:19:01[INFO] - Write file:/home/NetPF/bin/zk/zkconfig/tianya/Tianya_SearchEngineConfig.xml
[com.zkconfig.config.client.ClientConfigUpdater.syncNode(ClientConfigUpdater.java:283)]

更新服務端的配置文件

一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:zookeeper.version=3.4.9-1757313, built on 08/23/2016 07:39 GMT
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:host.name=hwang-PC
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:java.version=1.7.0_76
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:java.vendor=Oracle Corporation
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:java.home=C:\Program Files (x86)\Java\jdk1.7.0_76\jre
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:java.class.path=E:\workspace\zk\bin;E:\workspace\zk\lib\log4j-1.2.11.jar;E:\workspace\zk\lib\slf4j-api-1.6.1.jar;E:\workspace\zk\lib\slf4j-jdk14-1.6.1.jar;E:\workspace\zk\lib\jline-0.9.94.jar;E:\workspace\zk\lib\netty-3.10.5.Final.jar;E:\workspace\zk\lib\junit-4.8.2.jar;E:\workspace\zk\lib\mockito-all-1.8.4.jar;E:\workspace\zk\lib\commons-lang-2.6.jar;E:\workspace\zk\lib\commons-io-2.1.jar;E:\workspace\zk\lib\commons-logging-1.0.3.jar
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:java.library.path=C:\Program Files (x86)\Java\jdk1.7.0_76\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;C:/Program Files (x86)/Java/jre7/bin/client;C:/Program Files (x86)/Java/jre7/bin;C:/Program Files (x86)/Java/jre7/lib/i386;C:\Program Files (x86)\Common Files\NetSarang;C:\Program Files (x86)\Intel\iCLS Client\;C:\Program Files\Intel\iCLS Client\;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;C:\Program Files\Intel\Intel(R) Management Engine Components\DAL;C:\Program Files\Intel\Intel(R) Management Engine Components\IPT;C:\Program Files (x86)\Intel\Intel(R) Management Engine Components\DAL;C:\Program Files (x86)\Intel\Intel(R) Management Engine Components\IPT;C:\Program Files (x86)\ATI Technologies\ATI.ACE\Core-Static;C:\Program Files\Intel\WiFi\bin\;C:\Program Files\Common Files\Intel\WirelessCommon\;C:\Program Files (x86)\Common Files\Lenovo;C:\ProgramData\Lenovo\ReadyApps;C:\Program Files (x86)\Java\jdk1.7.0_76\bin;E:\eclipse;;.
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:java.io.tmpdir=C:\Users\hwang\AppData\Local\Temp\
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:java.compiler=<NA>
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:os.name=Windows 7
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:os.arch=x86
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:os.version=6.1
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:user.name=hwang
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:user.home=C:\Users\hwang
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:user.dir=E:\workspace\zk
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.ZooKeeper <init>
信息: Initiating client connection, connectString=10.1.126.104:2181,10.1.126.104:2182,10.1.126.104:2183 sessionTimeout=5000 watcher=com.zkconfig.config.changer.ServerConfigChanger$ConnectedWatcher@149e631
2017-01-12 16:21:43[INFO] - ZooKeeper State:CONNECTING
[com.zkconfig.config.changer.ServerConfigChanger.waitUntilConnected(ServerConfigChanger.java:58)]
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.ClientCnxn$SendThread logStartConnect
信息: Opening socket connection to server 10.1.126.104/10.1.126.104:2182. Will not attempt to authenticate using SASL (unknown error)
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.ClientCnxn$SendThread primeConnection
信息: Socket connection established to 10.1.126.104/10.1.126.104:2182, initiating session
2017-01-12 16:21:43[INFO] - ConnectedLatch.await
[com.zkconfig.config.changer.ServerConfigChanger.waitUntilConnected(ServerConfigChanger.java:61)]
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.ClientCnxn$SendThread onConnected
信息: Session establishment complete on server 10.1.126.104/10.1.126.104:2182, sessionid = 0x25991c1eda50001, negotiated timeout = 5000
2017-01-12 16:21:43[INFO] - ServerConfigChanger Connected to ZooKeeper Cluster:10.1.126.104:2181,10.1.126.104:2182,10.1.126.104:2183
[com.zkconfig.config.changer.ServerConfigChanger.connectZkServer(ServerConfigChanger.java:77)]
2017-01-12 16:21:44[INFO] - Create dirNode:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_test,dirPath:/home/NetPF/bin/zk/zkconfig/test
[com.zkconfig.util.ZkUtil.createDirNode(ZkUtil.java:118)]
2017-01-12 16:21:46[INFO] - Create fileNode:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_test/Test2_SearchEngineConfig.xml,filePath:/home/NetPF/bin/zk/zkconfig/test,fileName:Test2_SearchEngineConfig.xml
[com.zkconfig.util.ZkUtil.createFileNode(ZkUtil.java:229)]
2017-01-12 16:21:46[INFO] - Update fileNode:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_tianya/Tianya_SearchEngineConfig.xml,filePath:/home/NetPF/bin/zk/zkconfig/tianya,fileName:Tianya_SearchEngineConfig.xml
[com.zkconfig.util.ZkUtil.updateFileNode(ZkUtil.java:299)]

客戶端自動更新配置

2017-01-12 16:21:47[INFO] - EventType:NodeChildrenChanged,Value:4,path:/config
[com.zkconfig.config.client.ClientConfigUpdater.updateConfigFiles(ClientConfigUpdater.java:60)]
2017-01-12 16:21:47[INFO] - Catch [NodeChildrenChanged] event
[com.zkconfig.config.client.ClientConfigUpdater.updateConfigFiles(ClientConfigUpdater.java:71)]
2017-01-12 16:21:47[INFO] - Node:/config
oriChildren:_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_other,_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_baidu,_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_tianya,
curChildren:_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_other,_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_baidu,_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_test,_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_tianya,
[com.zkconfig.config.client.ChildrenChangedWrapper.diff(ChildrenChangedWrapper.java:171)]
2017-01-12 16:21:47[INFO] - Add children node,path:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_test
[com.zkconfig.config.client.ClientConfigUpdater.updateConfigFiles(ClientConfigUpdater.java:84)]
2017-01-12 16:21:47[INFO] - Relistening node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_test
[com.zkconfig.config.client.ClientConfigUpdater.nodeHandle(ClientConfigUpdater.java:122)]
2017-01-12 16:21:47[INFO] - nodeType:1,nodeOperateType:1
[com.zkconfig.config.client.ClientConfigUpdater.nodeHandle(ClientConfigUpdater.java:130)]
2017-01-12 16:21:49[INFO] - EventType:NodeChildrenChanged,Value:4,path:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_test
[com.zkconfig.config.client.ClientConfigUpdater.updateConfigFiles(ClientConfigUpdater.java:60)]
2017-01-12 16:21:49[INFO] - Catch [NodeChildrenChanged] event
[com.zkconfig.config.client.ClientConfigUpdater.updateConfigFiles(ClientConfigUpdater.java:71)]
2017-01-12 16:21:49[INFO] - Node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_test has no children.All currentChildrenPath are new.
[com.zkconfig.config.client.ChildrenChangedWrapper.diff(ChildrenChangedWrapper.java:159)]
2017-01-12 16:21:49[INFO] - Add children node,path:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_test/Test2_SearchEngineConfig.xml
[com.zkconfig.config.client.ClientConfigUpdater.updateConfigFiles(ClientConfigUpdater.java:84)]
2017-01-12 16:21:49[INFO] - Relistening node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_test/Test2_SearchEngineConfig.xml
[com.zkconfig.config.client.ClientConfigUpdater.nodeHandle(ClientConfigUpdater.java:122)]
2017-01-12 16:21:49[INFO] - nodeType:2,nodeOperateType:1
[com.zkconfig.config.client.ClientConfigUpdater.nodeHandle(ClientConfigUpdater.java:130)]
2017-01-12 16:21:49[INFO] - EventType:NodeDataChanged,Value:3,path:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_tianya/Tianya_SearchEngineConfig.xml
[com.zkconfig.config.client.ClientConfigUpdater.updateConfigFiles(ClientConfigUpdater.java:60)]
2017-01-12 16:21:49[INFO] - Relistening node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_tianya/Tianya_SearchEngineConfig.xml
[com.zkconfig.config.client.ClientConfigUpdater.nodeHandle(ClientConfigUpdater.java:122)]
2017-01-12 16:21:49[INFO] - nodeType:2,nodeOperateType:2
[com.zkconfig.config.client.ClientConfigUpdater.nodeHandle(ClientConfigUpdater.java:130)]

 

系統的結構

系統主要由四個組件組成:java

  • ZooKeeperServer

         集羣或單機版的ZooKeeper服務端,主要用以存儲ServerConfigPublisher發佈的配置文件信息node

  • ServerConfigPublisher

         用以將須要統一管理的配置文件發佈到ZooKeeperServer中去python

  • ConfigChangeListener

        用以向ZooKeeperServer進行註冊監聽,並在配置發生變化時實時更新客戶端本地的配置文件apache

  • ServerConfigChanger

        用以更新ZooKeeperServer端的配置信息api

 

以上的各組件分別打包成ZkConfigServerMain-1.0.jar和ZkConfigClientMain-1.0.jar兩個jar包bash

ZkConfigServerMain-1.0.jar的入口是ConfigServerMain類服務器

ZkConfigClientMain-1.0.jar的入口是ConfigClientMain類session

下面讓咱們來看一下各組件的詳細代碼,經過代碼來了解併發

 

ZooKeeper服務端的啓動與配置文件的發佈

執行ConfigServerMain類的main方法,main方法具體的執行過程以下:app

public static void main(String[] args) throws Exception {  
    // 啓動zookeeper服務器集羣
	ConfigServerMain.startZkCluster();
		
	// 啓動配置文件發佈器
    ServerConfigPublisher publisher = new ServerConfigPublisher(Constant.CONF_DIR);  
    publisher.connectZkServer();  
    publisher.publish();
    	
}

1.先啓動ZooKeeper服務器的集羣

2.建立一個配置文件發佈器,鏈接上ZooKeeperServer

3.掃描服務端的配置文件,併發布到ZooKeeperServer中

須要注意的是ZooKeeper的啓動分集羣版(cluster)和單機版(standalone),具體的ZooKeeper服務端啓動的原理,不是本篇文章的重點,各位看官能夠自行探究。

啓動集羣的代碼以下:

/**
* 啓動zk server集羣
* @throws IOException
* @throws InterruptedException
* @throws ConfigException
*/
public static void startZkCluster() throws IOException, InterruptedException, ConfigException{
	String zkConfigPath1 = Constant.CLUSTER_PATH +"/1/zk1.cfg";
    // 啓動ZooKeeper服務器1
    ZkServer server1 = new ZkServer();
    server1.startCluster(zkConfigPath1);
        
    String zkConfigPath2 = Constant.CLUSTER_PATH +"/2/zk2.cfg";
    // 啓動ZooKeeper服務器2
    ZkServer server2 = new ZkServer();
    server2.startCluster(zkConfigPath2);
    	
    String zkConfigPath3 = Constant.CLUSTER_PATH +"/3/zk3.cfg";
    // 啓動ZooKeeper服務器3
    ZkServer server3 = new ZkServer();
    server3.startCluster(zkConfigPath3);
}

分別建立了三個ZkServer對象,而後執行startCluster方法啓動ZooKeeperServer,該方法接收一個ZooKeeperServer啓動所需的配置文件的路徑。

讓咱們看一下ZkServer的startCluster方法:

/**
	 * 啓動集羣模式
	 * @param zkConfigPath
	 * @throws IOException
	 * @throws InterruptedException
	 * @throws ConfigException
	 */
	public void startCluster(String zkConfigPath) throws IOException, InterruptedException, ConfigException {  
        QuorumPeerConfig config = new QuorumPeerConfig();
        // 從配置文件讀取配置
        config.parse(zkConfigPath);
        
        ServerCnxnFactory cnxnFactory = new NIOServerCnxnFactory();  
        cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());  
        
        QuorumPeer quorumPeer = new QuorumPeer();
        quorumPeer.setClientPortAddress(config.getClientPortAddress());
        quorumPeer.setTxnFactory(new FileTxnSnapLog(
                    new File(config.getDataLogDir()),
                    new File(config.getDataDir())));
        quorumPeer.setQuorumPeers(config.getServers());
        quorumPeer.setElectionType(config.getElectionAlg());
        quorumPeer.setMyid(config.getServerId());
        quorumPeer.setTickTime(config.getTickTime());
        quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
        quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
        quorumPeer.setInitLimit(config.getInitLimit());
        quorumPeer.setSyncLimit(config.getSyncLimit());
        quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
        quorumPeer.setCnxnFactory(cnxnFactory);
        quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
        quorumPeer.setLearnerType(config.getPeerType());
        quorumPeer.setSyncEnabled(config.getSyncEnabled());
        quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
        
        quorumPeer.start();
        // 註釋掉該行代碼,若是集羣在同一臺服務器,啓動時會一直等待
//        quorumPeer.join();
        logger.info("ZkServerCluster Started! ClientPortAddress:"+config.getClientPortAddress());
    }

請注意該方法最後被註釋掉的一行代碼,因爲本框架中ZooKeeper集羣採用的是僞集羣的方式,三臺ZooKeeperServer都部署在同一臺服務器上,爲了server2和server3可以正常啓動,故將quorumPeer.join()方法註釋掉。

 

ZooKeeperServer集羣啓動完成以後,就能夠進行配置文件的發佈了,這裏是建立一個ServerConfigPublisher的對象,該對象先鏈接上ZooKeeper,而後將配置文件發佈到ZooKeeper中去,具體代碼以下:

  • 先建立發佈器對象並鏈接ZooKeeper
private ZooKeeper zk;
	
	// 配置文件的路徑
	private String configPath;
    
	public ServerConfigPublisher(String configPath){
		this.configPath = configPath;
	}
	
	private CountDownLatch connectedLatch = new CountDownLatch(1);
	
	/**
	 * 默認的watcher
	 *
	 */
	class ConnectedWatcher implements Watcher {  
        private CountDownLatch connectedLatch;  
        ConnectedWatcher(CountDownLatch connectedLatch) {  
            this.connectedLatch = connectedLatch;  
        }  
        @Override  
        public void process(WatchedEvent event) {  
           if (event.getState() == KeeperState.SyncConnected) {  
               connectedLatch.countDown();  
           }  
           logger.debug("ServerConfigPublisher get ["+event.getType()+"] notice by ConnectedWatcher");
        }  
    }  
	
	private ConnectedWatcher watcher = new ConnectedWatcher(connectedLatch);
	
	protected void waitUntilConnected(ZooKeeper zooKeeper, CountDownLatch connectedLatch) {  
		logger.info("ZooKeeper State:"+zooKeeper.getState());
        if (States.CONNECTING == zooKeeper.getState()) {  
            try {  
            	logger.info("ConnectedLatch.await...");
                connectedLatch.await();  
            } catch (InterruptedException e) {  
                throw new IllegalStateException(e);  
            }  
        }  
    }  
	
    /** 
     * 鏈接zookeeper 
     */  
    public void connectZkServer() throws Exception {  
    	zk = new ZooKeeper(Constant.ZK_CLUSTER_HOSTS, 5000, watcher);  
    	// 等待鏈接上
    	waitUntilConnected(zk, connectedLatch);
    	logger.info("ServerConfigPublisher Connected to ZooKeeper Cluster:"+Constant.ZK_CLUSTER_HOSTS);
    }
  • 發佈配置文件到ZooKeeperServer
/** 
     * 讀取配置文件後解析加入到znode中 
     * @throws KeeperException 
     * @throws IOException 
     */  
    public void publish() throws InterruptedException, KeeperException, IOException {  
        // 建立配置文件根節點
    	String rootNode = "/" + Constant.CONF_ROOT_NODE;
    	// 建立根節點
    	ZkUtil.createNode(zk, rootNode, Constant.CONF_DIR);
    	// 掃描全部配置文件
    	scanConfigFiles(configPath,Constant.ACCEPT_SUFFIX);
    }  
    
    /**
     * 掃描指定目錄下的全部配置文件,並將內容寫入到zookeeper節點中
     * @param path	掃描的目錄
     * @param acceptSuffix 接受的文件後綴
     * @throws KeeperException
     * @throws InterruptedException
     * @throws IOException 
     */
    protected void scanConfigFiles(String path,String acceptSuffix) throws KeeperException, InterruptedException, IOException{
    	File dir = new File(path);
        if(dir.exists() && dir.isDirectory()){
        	File[] subFiles = dir.listFiles();
        	for(File file : subFiles){
        		String absPath = file.getAbsolutePath();
        		String fileName = file.getName();
        		if(file.isDirectory() || (null!=acceptSuffix && !fileName.endsWith(acceptSuffix))){
        			scanConfigFiles(absPath,acceptSuffix);
        		}else{
        			String parentDir = file.getParentFile().getAbsolutePath();
        			// 讀取文件內容
        			String fileContent = FileUtils.readFileToString(file,Constant.CONF_CHAR_SET);
        			// 建立目錄節點
        			ZkUtil.createDirNode(zk, parentDir);
        			// 建立該目錄下的文件節點
        			ZkUtil.createFileNode(zk, parentDir, fileName, fileContent);
        		}
        	}
        }
    }

能夠看到發佈器會掃描指定的配置文件的目錄下面的全部目錄和文件,並有一個接受的文件後綴,只接受指定後綴的配置文件進行發佈。

發佈的節點有兩種類型:目錄節點和文件節點,具體的節點存儲規則以下:

* 	<p>		節點存儲規則	</p>
     * 	<p>		_#_ : separator	d : directory	f : file	                                                                            </p>
     * 	<p>		節點路徑			    節點類型			     節點值	                                                                        </p>
     * 	<p>		/root				root(0)				(0_#_root).getBytes()		                                                    </p>	
     * 	<p>		/root/d1			dir(1)				(1_#_str2Hex(d1.getAbsolutePath())).getBytes()		                            </p>	
     * 	<p>		/root/d1/f1			file(2)				(2_#_str2Hex(f1.getAbsolutePath())_#_str2Hex(f1.getContent())).getBytes()		</p>
     *

其中節點類型分爲根節點(0),目錄節點(1),文件節點(2)

 

ConfigChangeListener配置文件變動監聽器

ZooKeeperServer集羣啓動好,配置文件信息也發佈到ZooKeeper中去了,如今能夠啓動客戶端進行配置文件的同步與監聽了,而這些工做主要由ConfigChangeListener和ClientConfigUpdater來完成。

先來看下ConfigChangeListener的主要工做,他的工做比較輕鬆,主要是向ZooKeeper進行註冊一個監聽器,監聽全部配置文件的變動狀態,包括新增、修改、刪除的配置節點,而節點又分紅配置文件的目錄和具體的配置文件兩種。

看一下配置文件變動監聽器的具體工做:

/** 
     * 鏈接zookeeper 
     */  
    public void start() {  
    	try{
			zk = new ZooKeeper(Constant.ZK_CLUSTER_HOSTS, 5000, watcher); 
			// 等待鏈接上
			waitUntilConnected(zk, connectedLatch);
			logger.info("ConfigChangeListener Connected to ZooKeeper Cluster:"+Constant.ZK_CLUSTER_HOSTS);
			// 先清空本地的配置文件目錄
			ClientConfigUpdater.clearConfigDir(zk);
			// 監聽根節點和全部子節點,並第一次同步數據到本地
			ClientConfigUpdater.listenNode(true,zk,watcher);
			// 等待配置信息變動
			Thread.currentThread().join();
			
		}catch(Exception e){
			logger.error("ConfigChangeListener occured error:",e);
		}
    }

1.首先監聽器鏈接上ZooKeeper服務器,

2.而後清空本地的配置文件的目錄

3.監聽根節點和全部子節點,並第一次同步ZooKeeper服務端的數據到本地

4.等待配置信息的變動

來看下ClientConfigUpdater的listernNode方法,具體代碼以下:

/**
     * 監聽根節點和遞歸監聽全部子節點
     * @param syncData 是否須要同步服務端的數據到本地
     * @param zk
     * @param watcher
     * @throws KeeperException
     * @throws InterruptedException
     * @throws IOException 
     */
    public static void listenNode(boolean syncData,ZooKeeper zk,Watcher watcher) throws KeeperException, InterruptedException, IOException{
    	// 監聽根節點
    	String rootNode = "/" + Constant.CONF_ROOT_NODE;
        if(null!=zk.exists(rootNode, watcher)){
        	logger.debug("Listening node:"+rootNode);
        	List<String> subList = zk.getChildren(rootNode,watcher);
        	if(null!=subList && subList.size()>0){
        		// 將節點的全部子節點保存起來
        		ChildrenChangedWrapper.addChildren(rootNode, subList);
        	}
        	for (String subNode : subList) { 
        		ClientConfigUpdater.listenSubNode(syncData,zk,rootNode,subNode,watcher);
        	}
        }else{
        	logger.warn("rootNode:"+rootNode+" does not exists!");
        }
    }
    
    /**
     * 監聽子節點
     * @param syncData 是否須要同步服務端的數據到本地
     * @param zk
     * @param currentNode
     * @param subNode
     * @param watcher
     * @throws KeeperException
     * @throws InterruptedException
     * @throws IOException 
     */
    public static void listenSubNode(boolean syncData,ZooKeeper zk,String currentNode,String subNode,Watcher watcher) throws KeeperException, InterruptedException, IOException{
    	String nodePath = currentNode+"/"+subNode;
    	if(nodePath.startsWith("/")){
    		// 監聽子節點
    		if(null!=zk.exists(nodePath,watcher)){
    			// 若是須要同步數據到本地
            	if(syncData){
            		ClientConfigUpdater.syncNode(zk,nodePath);
            	}
    			logger.debug("Listening node:"+nodePath);
            	List<String> subList = zk.getChildren(nodePath,watcher); 
            	if(null!=subList && subList.size()>0){
            		// 將節點的全部子節點保存起來
            		ChildrenChangedWrapper.addChildren(nodePath, subList);
            	}
            	for (String _subNode : subList) {
            		ClientConfigUpdater.listenSubNode(syncData,zk,nodePath,_subNode,watcher);
            	}
            }else{
            	logger.warn("subNode:"+nodePath+" does not exists!");
            }
    	}
    }
    
    /**
     * 從ZooKeeper服務端同步配置信息到本地
     * @param zk
     * @param path
     * @throws IOException
     * @throws InterruptedException 
     * @throws KeeperException 
     */
    public static void syncNode(ZooKeeper zk,String path) throws IOException, KeeperException, InterruptedException{
    	if(null==zk.exists(path, null)){
    		logger.error("syncNode error,node ["+path+"] does not exists");
    		return;
    	}
    	byte[] data = zk.getData(path, true, null);
        String content = new String(data,Constant.CONF_CHAR_SET);
        // 對內容進行解析
        String[] strs = content.split(Constant.NODE_CONTENT_SEPRATOR);
        if(strs.length<2){
        	logger.error("syncNode error,strs length should not be less than 2");
        	return;
        }
        // 節點類型
        String nodeType = strs[0];
        // 節點操做類型
        String nodeOperateType = strs[1];
        logger.debug("nodeType:"+nodeType+",nodeOperateType:"+nodeOperateType);
        String dirPath = "";
        String filePath = "";
        String fileContent = "";
        if(strs.length==3){
        	dirPath = HexUtil.hex2Str(strs[2]);
        }else if(strs.length==4){
        	filePath = HexUtil.hex2Str(strs[2]);
        	fileContent = HexUtil.hex2Str(strs[3]);
        }
    	// 若是是目錄節點,則建立目錄
    	if(Constant.NODE_TYPE_DIRECTORY.equals(nodeType)){
    		if(StringUtils.isNotEmpty(dirPath)){
    			FileUtils.forceMkdir(new File(dirPath));
    			logger.info("Mkdir:"+dirPath);
    		}
		// 若是是文件節點,則生成該文件
    	}else if(Constant.NODE_TYPE_FILE.equals(nodeType)){
    		if(StringUtils.isNotEmpty(filePath) && StringUtils.isNotEmpty(fileContent)){
    			FileUtils.writeStringToFile(new File(filePath), fileContent);
    			logger.info("Write file:"+filePath);
    		}
    	}
    }

代碼中有一行代碼:ChildrenChangedWrapper.addChildren(rootNode, subList)

ChildrenChangedWrapper該包裝器主要是用來保存每次更新後的某個節點的子節點狀況,以便發生NodeChildrenChanged事件時,能夠用以比較該節點更新後的子節點列表與更新前的列表。據此能夠得知該節點發生的究竟是建立子節點仍是刪除子節點的事件。

ChildrenChangedWrapper包裝器的具體實現以下:

/**
 * 子節點變動包裝器
 * 保存每一個節點的子節點path
 * 用以判斷變動的子節點的path,和變動的類型:新增、刪除、更改
 * @author hwang
 *
 */
public class ChildrenChangedWrapper{
    	
	private static final Log logger = LogFactory.getLog(ChildrenChangedWrapper.class);
	
	/**
	 * 子節點變動的類型
	 * @author hwang
	 *
	 */
	public static enum ChildrenChangeType{
		add,
		delete,
		update
	}
	
	/**
	 * 子節點變動的結果
	 * @author hwang
	 *
	 */
	public static class ChildrenChangeResult{
		private ChildrenChangeType changeType;
		
		private List<String> changePath;

		public ChildrenChangeType getChangeType() {
			return changeType;
		}
		public void setChangeType(ChildrenChangeType changeType) {
			this.changeType = changeType;
		}
		public List<String> getChangePath() {
			return changePath;
		}
		public void setChangePath(List<String> changePath) {
			this.changePath = changePath;
		}
	}
	
	
	/**
	 * 記錄當前狀態下的節點node的全部子節點
	 */
	private static Map<String,Set<String>> nodeChildrenMap = new ConcurrentHashMap<String,Set<String>>();
	
	/**
	 * 增長子節點
	 * @param nodePath
	 * @param childrenPath
	 */
	public static void addChildren(String nodePath,String childrenPath){
		synchronized (nodeChildrenMap) {
			Set<String> childrenSet = nodeChildrenMap.get(nodePath);
			if(null==childrenSet){
				childrenSet = new CopyOnWriteArraySet<String>();
			}
			childrenSet.add(childrenPath);
			nodeChildrenMap.put(nodePath, childrenSet);
		}
	}
	
	/**
	 * 增長子節點
	 * @param nodePath
	 * @param childrenPath
	 */
	public static void addChildren(String nodePath,Collection<String> childrenPath){
		synchronized (nodeChildrenMap) {
			Set<String> childrenSet = nodeChildrenMap.get(nodePath);
			if(null==childrenSet){
				childrenSet = new CopyOnWriteArraySet<String>();
			}
			childrenSet.addAll(childrenPath);
			nodeChildrenMap.put(nodePath, childrenSet);
		}
	}
	
	/**
	 * 刪除子節點
	 * @param nodePath
	 * @param childrenPath
	 */
	public static void removeChildren(String nodePath,String childrenPath){
		synchronized (nodeChildrenMap) {
			Set<String> childrenSet = nodeChildrenMap.get(nodePath);
			if(null==childrenSet){
				logger.warn("Node:"+nodePath+" has no children.");
				return;
			}
			childrenSet.remove(childrenPath);
			nodeChildrenMap.put(nodePath, childrenSet);
		}
	}
	
	/**
	 * 刪除子節點
	 * @param nodePath
	 * @param childrenPath
	 */
	public static void removeChildren(String nodePath,Collection<String> childrenPath){
		synchronized (nodeChildrenMap) {
			Set<String> childrenSet = nodeChildrenMap.get(nodePath);
			if(null==childrenSet){
				logger.warn("Node:"+nodePath+" has no children.");
				return;
			}
			childrenSet.removeAll(childrenPath);
			nodeChildrenMap.put(nodePath, childrenSet);
		}
	}
	
	protected static void showNodeChildren(String nodePath){
		Set<String> childrenSet = nodeChildrenMap.get(nodePath);
		if(null==childrenSet){
			logger.warn("Node:"+nodePath+" has no children.No need to show.");
			return;
		}
		StringBuffer sb = new StringBuffer();
		for(String child : childrenSet){
			sb.append(child).append(",");
		}
		logger.debug("Node:"+nodePath+",children:"+sb.toString());
	}
	
	/**
	 * 計算節點的變動狀況
	 * @param nodePath
	 * @param currentChildrenPath
	 * @return
	 */
	public static ChildrenChangeResult diff(String nodePath,List<String> currentChildrenPath){
		synchronized (nodeChildrenMap) {
			ChildrenChangeResult changeResult = new ChildrenChangeResult();
			ChildrenChangeType changeType = ChildrenChangeType.add;
			List<String> changePath = new ArrayList<String>();
			// 獲取節點的原始子節點
			Set<String> originalChildrenPath = nodeChildrenMap.get(nodePath);
			if(null==originalChildrenPath){
				logger.info("Node:"+nodePath+" has no children.All currentChildrenPath are new.");
				changeType = ChildrenChangeType.add;
				changePath = currentChildrenPath;
			}else{
				int currentSize = currentChildrenPath.size();
				int originalSize = originalChildrenPath.size();
				if(currentSize>originalSize){
					changeType = ChildrenChangeType.add;
					for(String curPath : currentChildrenPath){
						// if originalChildrenPath does not contains curPath
						// then curPath is newly added
						if(!originalChildrenPath.contains(curPath)){
							changePath.add(curPath);
						}
					}
				}else if(currentSize<originalSize){
					changeType = ChildrenChangeType.delete;
					for(String oriPath : originalChildrenPath){
						// if currentChildrenPath does not contains oriPath
						// then oriPath is removed
						if(!currentChildrenPath.contains(oriPath)){
							changePath.add(oriPath);
						}
					}
				}
			}
			changeResult.setChangeType(changeType);
			changeResult.setChangePath(changePath);
			return changeResult;
		}
	}
	
}

當發生節點變動事件時,ClientConfigUpdater將負責更新本地的配置文件,具體實現以下:

/**
	 * 更新本地的配置文件信息,更新完成後再次對每一個節點進行監聽
	 * @param zk
	 * @param event
	 * @param watcher
	 * @throws Exception
	 */
    public static void updateConfigFiles(ZooKeeper zk,WatchedEvent event,Watcher watcher) throws Exception {  
        String path = event.getPath();  
        EventType type = event.getType();
        logger.info("EventType:"+type+",Value:"+type.getIntValue()+",path:"+path);
        switch(type){
	        case NodeDeleted:{
	        	String filePath = path.substring(("/"+Constant.CONF_ROOT_NODE).length()+1).replaceAll(Constant.SEPRATOR, "/");
	        	FileUtils.deleteQuietly(new File(filePath));
	        }break;
	        case NodeCreated:
	        case NodeDataChanged:{
	        	nodeHandle(zk,path);
	        }break;
	        case NodeChildrenChanged:{
	        	logger.info("Catch [NodeChildrenChanged] event");
	        	// 獲取節點的當前子節點
	        	if(null==zk.exists(path, false)){
	        		logger.info("Node:"+path+" does not exists,will do nothing.");
	        		break;
	        	}
	        	List<String> currentChildrenPath = zk.getChildren(path, false);
	        	ChildrenChangeResult changeResult = ChildrenChangedWrapper.diff(path, currentChildrenPath);
	        	ChildrenChangeType changeType = changeResult.getChangeType();
	        	List<String> changePath = changeResult.getChangePath();
	        	switch(changeType){
		        	case add:{
		        		for(String subPath : changePath){
		        			logger.info("Add children node,path:"+path+"/"+subPath);
		        			nodeHandle(zk,path+"/"+subPath);
		        		}
		        	}break;
		        	case delete:{
		        		for(String subPath : changePath){
		        			String filePath = subPath.replaceAll(Constant.SEPRATOR, "/");
		    	        	FileUtils.deleteQuietly(new File(filePath));
		    	        	logger.info("Delete children node,file:"+filePath);
		        		}
		        	}break;
		        	case update:{
		        		logger.info("Update children node,will do nothing");
		        	}break;
		        	default:{
		        		logger.info("Default children node operate,will do nothing");
		        	}break;
	        	}
	        }break;
	        default:{
	        	logger.info("Catch [default] event,will do nothing");
	        }break;
        }
        // 再次註冊全部節點的監聽事件
        ClientConfigUpdater.listenNode(false,zk,watcher);
    }  

    /**
     * 節點處理
     * @param zk
     * @param path
     * @throws KeeperException
     * @throws InterruptedException
     * @throws IOException
     */
    public static void nodeHandle(ZooKeeper zk,String path) throws KeeperException, InterruptedException, IOException{
    	// 對path節點馬上進行監聽,防止新建立的節點又馬上建立子節點
        byte[] data = zk.getData(path, true, null);
        logger.info("Re-listening node:"+path);
        String content = new String(data,Constant.CONF_CHAR_SET);
        // 對內容進行解析
        String[] strs = content.split(Constant.NODE_CONTENT_SEPRATOR);
        // 節點類型
        String nodeType = strs[0];
        // 節點操做類型
        String nodeOperateType = strs[1];
        logger.info("nodeType:"+nodeType+",nodeOperateType:"+nodeOperateType);
        // 若是是目錄節點
    	if(Constant.NODE_TYPE_DIRECTORY.equals(nodeType)){
    		// 新增目錄
			if(Constant.NODE_OPERATE_TYPE_ADD.equals(nodeOperateType)){
				if(strs.length==3){
					String dirPath = HexUtil.hex2Str(strs[2]);
					FileUtils.forceMkdir(new File(dirPath));
				}else{
					logger.warn("The str array length should be 3,while actually is:"+strs.length);
				}
			// 更新目錄
			}else if(Constant.NODE_OPERATE_TYPE_UPDATE.equals(nodeOperateType)){
				if(strs.length==4){
					File oldDir = new File(HexUtil.hex2Str(strs[2]));
					File newDir = new File(HexUtil.hex2Str(strs[3]));
					if(oldDir.exists()){
						FileUtils.moveDirectory(oldDir, newDir);
					}else{
						logger.error("Dir["+oldDir+"] does not exist");
					}
				}else{
					logger.warn("The str array length should be 4,while actually is:"+strs.length);
				}
			// 刪除目錄
			}else{
				logger.info("Delete dir:"+path.replaceAll(Constant.SEPRATOR, "/"));
			}
    	// 若是是文件節點
    	}else if(Constant.NODE_TYPE_FILE.equals(nodeType)){
    		// 新增文件或者更新文件
			if(Constant.NODE_OPERATE_TYPE_ADD.equals(nodeOperateType) || Constant.NODE_OPERATE_TYPE_UPDATE.equals(nodeOperateType)){
				if(strs.length==4){
					String filePath = HexUtil.hex2Str(strs[2]);
		        	String fileContent = HexUtil.hex2Str(strs[3]);
		        	FileUtils.writeStringToFile(new File(filePath), fileContent);
				}else{
					logger.warn("The str array length should be 4,while actually is:"+strs.length);
				}
			// 刪除文件
			}else{
				logger.info("Delete file:"+path.replaceAll(Constant.SEPRATOR, "/"));
			}
    	}
    }

每次更新完本地配置信息後,須要從新對全部節點進行監聽。

 

ServerConfigChanger配置文件修改器

配置文件修改器的工做就是鏈接上ZooKeeper後,變動(或新增、刪除)對應節點的數據,具體的代碼以下:

/**
 * 服務端配置文件更改器
 * @author hwang
 *
 */
public class ServerConfigChanger {

	private static final Log logger = LogFactory.getLog(ServerConfigChanger.class);
	
	private static ZooKeeper zk;  
    
    private static CountDownLatch connectedLatch = new CountDownLatch(1);
	
	static class ConnectedWatcher implements Watcher {  
        private CountDownLatch connectedLatch;  
        ConnectedWatcher(CountDownLatch connectedLatch) {  
            this.connectedLatch = connectedLatch;  
        }  
        @Override  
        public void process(WatchedEvent event) {  
           if (event.getState() == KeeperState.SyncConnected) {  
               connectedLatch.countDown();  
           }  
           logger.debug("ServerConfigChanger get ["+event.getType()+"] notice by ConnectedWatcher");
        }  
    }  
	
	static{
		if(null==zk){
    		try {
				connectZkServer();
			} catch (Exception e) {
				e.printStackTrace();
			}
    	}
	}
	
	private static void waitUntilConnected(ZooKeeper zooKeeper, CountDownLatch connectedLatch) {  
		logger.info("ZooKeeper State:"+zooKeeper.getState());
        if (States.CONNECTING == zooKeeper.getState()) {  
            try {  
            	logger.info("ConnectedLatch.await");
                connectedLatch.await();  
            } catch (InterruptedException e) {  
                throw new IllegalStateException(e);  
            }  
        }  
    }
	
	
    /** 
     * 鏈接zookeeper 
     */  
    private static void connectZkServer() throws Exception {  
        zk = new ZooKeeper(Constant.ZK_CLUSTER_HOSTS, 5000, new ConnectedWatcher(connectedLatch));  
        // 等待鏈接上
    	waitUntilConnected(zk, connectedLatch);
    	logger.info("ServerConfigChanger Connected to ZooKeeper Cluster:"+Constant.ZK_CLUSTER_HOSTS);
    }  
    
    /**
     * 新增目錄節點
     * @param dirAbsolutePath 目錄的絕對路徑,該目錄必須是/config/開頭的目錄
     * @throws KeeperException
     * @throws InterruptedException
     * @throws UnsupportedEncodingException
     */
    public static void addConfigDir(String dirAbsolutePath) throws KeeperException, InterruptedException, UnsupportedEncodingException{
    	if(null==zk){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return;
    	}
    	if(StringUtils.isEmpty(dirAbsolutePath)){
    		logger.error("dirAbsolutePath can't be empty");
    		return;
    	}
    	ZkUtil.createDirNode(zk, dirAbsolutePath);
    }
  
    /**
     * 刪除目錄節點
     * @param dirAbsolutePath
     * @throws InterruptedException
     * @throws KeeperException
     */
    public static void deleteConfigDir(String dirAbsolutePath) throws InterruptedException, KeeperException{
    	if(null==zk){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return;
    	}
    	ZkUtil.deleteDirNode(zk, dirAbsolutePath);
    }
    

    public static void updateConfigDir(String oldDirAbsolutePath,String newDirAbsolutePath) throws InterruptedException, KeeperException, UnsupportedEncodingException{
    	if(null==zk){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return;
    	}
    	if(StringUtils.isEmpty(oldDirAbsolutePath)  || StringUtils.isEmpty(newDirAbsolutePath)){
    		logger.error("oldDirAbsolutePath,newDirAbsolutePath can't be empty");
    		return;
    	}
		ZkUtil.updateDirNode(zk, oldDirAbsolutePath, newDirAbsolutePath);
    }
    
    
    /**
     * 新增文件節點
     * @param fileAbsolutePath 文件的絕對路徑,不包括文件名
     * @param fileName 文件名
     * @param fileContent 文件內容
     * @throws KeeperException
     * @throws InterruptedException
     * @throws UnsupportedEncodingException
     */
    public static void addConfigFile(String fileAbsolutePath,String fileName,String fileContent) throws KeeperException, InterruptedException, UnsupportedEncodingException{
    	if(null==zk){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return;
    	}
    	if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName) || StringUtils.isEmpty(fileContent)){
    		logger.error("fileAbsolutePath,fileName,fileContent can't be empty");
    		return;
    	}
    	ZkUtil.createFileNode(zk, fileAbsolutePath, fileName, fileContent);
    }
    
    /**
     * 刪除文件節點
     * @param fileAbsolutePath 文件的絕對路徑,不包括文件名
     * @param fileName 文件名
     * @throws InterruptedException
     * @throws KeeperException
     */
    public static void deleteConfigFile(String fileAbsolutePath,String fileName) throws InterruptedException, KeeperException{
    	if(null==zk){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return;
    	}
    	if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName)){
    		logger.error("fileAbsolutePath,fileName can't be empty");
    		return;
    	}
    	ZkUtil.deleteFileNode(zk, fileAbsolutePath, fileName);
    }
    

    public static void updateConfigFile(String fileAbsolutePath,String fileName,String fileContent) throws InterruptedException, KeeperException, UnsupportedEncodingException{
    	if(null==zk){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return;
    	}
    	if(StringUtils.isEmpty(fileAbsolutePath)  || StringUtils.isEmpty(fileName)  || StringUtils.isEmpty(fileContent)){
    		logger.error("fileAbsolutePath,fileName,fileContent can't be empty");
    		return;
    	}
		ZkUtil.updateFileNode(zk, fileAbsolutePath, fileName, fileContent);
    }
    
    public static void testAdd() throws InterruptedException, KeeperException, UnsupportedEncodingException{
    	String dirAbsolutePath = "/home/NetPF/bin/zk/zkconfig/test";
    	ServerConfigChanger.addConfigDir(dirAbsolutePath);
		
		String fileAbsolutePath = "/home/NetPF/bin/zk/zkconfig/test";
		String fileName = "Test2_SearchEngineConfig.xml";
		String fileContent = "This is a test content added by zookeeper";
		try{
			Thread.sleep(2000);
		}catch(Exception e){
			
		}
		ServerConfigChanger.addConfigFile(fileAbsolutePath,fileName,fileContent);
		
    }
    
    public static void testDelete() throws InterruptedException, KeeperException{
    	String dirAbsolutePath = "/home/NetPF/bin/zk/zkconfig/test";
    	
    	String fileAbsolutePath = "/home/NetPF/bin/zk/zkconfig/test";
    	String fileName = "Test2_SearchEngineConfig.xml";
    	ServerConfigChanger.deleteConfigFile(fileAbsolutePath,fileName);
    	
    	ServerConfigChanger.deleteConfigDir(dirAbsolutePath);
    }
    
    
    public static void testUpdate() throws InterruptedException, KeeperException, UnsupportedEncodingException{
    	
    	String fileAbsolutePath = "/home/NetPF/bin/zk/zkconfig/tianya";
    	String fileName = "Tianya_SearchEngineConfig.xml";
    	String fileContent = "Tianya_SearchEngineConfig.xml哈哈";
    	ServerConfigChanger.updateConfigFile(fileAbsolutePath,fileName,fileContent);
    	
    }
    
    /**
     * 獲取根節點和全部子節點
     * @throws KeeperException
     * @throws InterruptedException
     */
    public static void getNode() throws KeeperException, InterruptedException{
    	// 監聽根節點
    	String rootNode = "/" + Constant.CONF_ROOT_NODE;
        if(null!=zk.exists(rootNode, false)){
        	logger.info("get node:"+rootNode);
        	List<String> subList = zk.getChildren(rootNode,false);  
        	for (String subNode : subList) { 
        		getSubNode(rootNode,subNode);
        	}
        }else{
        	logger.warn("rootNode:"+rootNode+" does not exists!");
        }
    }
    
    /**
     * 獲取子節點
     * @param currentNode
     * @param subNode
     * @throws KeeperException
     * @throws InterruptedException
     */
    public static void getSubNode(String currentNode,String subNode) throws KeeperException, InterruptedException{
    	String nodePath = currentNode+"/"+subNode;
    	if(nodePath.startsWith("/")){
    		// 監聽子節點
    		if(null!=zk.exists(nodePath,false)){
    			logger.info("get node:"+nodePath);
            	List<String> subList = zk.getChildren(nodePath,false);  
            	for (String _subNode : subList) {
            		getSubNode(nodePath,_subNode);
            	}
            }else{
            	logger.warn("subNode:"+nodePath+" does not exists!");
            }
    	}
    }
    
    public static void main(String[] args) throws  KeeperException, InterruptedException, UnsupportedEncodingException {
    	testAdd();
//    	testDelete();
    	testUpdate();
    	getNode();
	}
   
}

至此,一個簡單的分佈式配置文件管理系統完成了。

 

未解決的問題

因爲程序直接使用了ZooKeeper的api,對於一些問題還未能充分解決,好比ServerConfigChanger在新增節點時在根節點/config下建立一個目錄節點(dirNode1)和文件節點(fileNode1),兩個步驟之間沒有時間間隔,這時將會出現如下問題:ConfigChangeListener只監聽到了/config節點的NodeChildrenChanged事件,客戶端能夠正常建立dirNode1的節點,而還將來得及監聽dirNode1的節點變動事件,致使dirNode1節點下的fileNode1節點未能成功通知到客戶端,因此客戶端就沒法成功建立fileNode1節點

 

下一步將經過改寫成使用ZkClient來與ZooKeeper進行交互。ZkClient充分考慮到了session超時重連、事件從新自動註冊的問題。

 

本篇博客是個人第一篇博客,還有許多不足的地方,請各位大牛予以指正,我必定悉心學習。

相關文章
相關標籤/搜索