因爲本身最近在學習zookeeper分佈式相關的知識,發如今其代碼實現上存在較多難以想清楚的點,尤爲是響應式編程的操做,爲此在這裏記錄完整的代碼書寫流程(每一步的思想),這裏是第一篇zookeeper分佈式註冊配置中心的實現代碼過程,後面還會有第二篇關於zookeeper分佈式鎖的簡單實現過程。
第二篇zookeeper分佈式鎖實現:
https://segmentfault.com/a/11...
zookeeper因爲擁有watcher機制,使得其擁有發佈訂閱的功能,而發佈與訂閱模型,即所謂的配置中心,顧名思義就是發佈者將數據發佈到 ZK節點上,供訂閱者動態獲取數據,實現配置信息的集中式管理和動態更新。 應用在啓動的時候會主動來獲取一次配置,同時,在節點上註冊一個 Watcher,這樣一來,之後每次配置有更新的時候,都會實時通知到訂閱的客戶端,歷來達到獲取最新配置信息的目的。java
首選交代實驗環境,本身的zookeeper的版本是3.5.8的版本,代碼工具爲IDEA,建立了一個MAVEN項目,僅僅添加了以下依賴。apache
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.8</version> </dependency>
因爲客戶端須要與zookeeper創建鏈接,獲取數據,添加監控等等一系列的事情,因此這裏封裝一個Utils工具類供咱們使用。編程
而後對於zookeeper鏈接客戶端的地址的後面能夠緊跟一個path,做爲在根目錄下的工做目錄。該目錄就是做爲全部操做的根目錄,這裏使用/test、segmentfault
同時因爲zookeeper基於watch機制實現發佈訂閱,我們全部的watcher都採用自定義的方式實現,首先是對鏈接成功的時候的DefaultWatcher。服務器
package org.qzx.config; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:05 下午 * @Description: org.qzx.config * @version: 1.0 */ public class DefaultWatcher implements Watcher { @Override public void process(WatchedEvent event) { System.out.println(event.toString()); } }
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:02 下午 * @Description: org.qzx.config * @version: 1.0 */ public class Utils { // zookeeper對象 private static ZooKeeper zooKeeper; // 鏈接地址 private static String address = "10.211.55.5:2181,10.211.55.8:2181,10.211.55.9:2181,10.211.55.10:2181/test"; private static DefaultWatcher defaultWatcher = new DefaultWatcher(); public static ZooKeeper getZooKeeper() throws Exception{ zooKeeper = new ZooKeeper(address,3000,defaultWatcher); return zooKeeper; } }
因爲zookeeper採用的是異步調用,因此這裏須要使用一把鎖鎖住主線程,在鏈接成功後自動解鎖,主線程再往下進行。這裏使用CountDownLatch實現鎖,在主線程建立,傳遞到DafaultWatcher的回掉函數中。框架
package org.qzx.config; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:05 下午 * @Description: org.qzx.config * @version: 1.0 */ public class DefaultWatcher implements Watcher { private CountDownLatch latch; public void setLatch(CountDownLatch latch) { this.latch = latch; } @Override public void process(WatchedEvent event) { switch (event.getState()) { case Unknown: break; case Disconnected: break; case NoSyncConnected: break; case SyncConnected: latch.countDown(); break; case AuthFailed: break; case ConnectedReadOnly: break; case SaslAuthenticated: break; case Expired: break; case Closed: break; } System.out.println(event.toString()); } }
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:02 下午 * @Description: org.qzx.config * @version: 1.0 */ public class Utils { // zookeeper對象 private static ZooKeeper zooKeeper; // 鏈接地址 private static String address = "10.211.55.5:2181,10.211.55.8:2181,10.211.55.9:2181,10.211.55.10:2181/test"; private static DefaultWatcher defaultWatcher = new DefaultWatcher(); // 鎖 private static CountDownLatch latch = new CountDownLatch(1); public static ZooKeeper getZooKeeper() throws Exception{ zooKeeper = new ZooKeeper(address,3000,defaultWatcher); defaultWatcher.setLatch(latch); latch.await(); return zooKeeper; } }
接下來就是編寫配置類TestConfig,首先是在操做以前進行鏈接,操做後得關閉,分別對應conn和close方法,而後就是配置方法getConfig,因爲並不清楚zookeeper客戶端是否必定含有自定義的工做目錄,因此通常傾向於使用exists方法來進行測試。又因爲exists方法中有1個watcher和一個回調函數,在回調函數中返回存在的話又得調用getData方法獲取數據,在getData方法中又存在一個watcher和回調函數,這樣會形成代碼深度太大不易閱讀,因此這裏也自定義一個工具類,封裝好全部的watcher和回調函數。該類的名稱就叫MyWatcherAndCallBack.異步
package org.qzx.config; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:40 下午 * @Description: org.qzx.config * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { // StatCallback @Override public void processResult(int rc, String path, Object ctx, Stat stat) { } // Watcher @Override public void process(WatchedEvent event) { } // DataCallback @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { } }
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.zip.ZipOutputStream; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:29 下午 * @Description: org.qzx.config * @version: 1.0 */ public class TestConfig { private ZooKeeper zooKeeper; private MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack(); @Before public void conn() throws Exception { zooKeeper = Utils.getZooKeeper(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test public void getConf(){ // 這裏的/AppConf在zookeeper中其實是/test/AppConf, zooKeeper.exists("/AppConf",watcherAndCallBack,watcherAndCallBack,"123"); } }
這個時候就得考慮在成功判斷在工做目錄下存在AppConf的時候須要作的事情,其實也很簡單,就是獲取當前節點的數據就好了。分佈式
package org.qzx.config; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:40 下午 * @Description: org.qzx.config * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { private ZooKeeper zooKeeper; public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // StatCallback @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if(stat!=null){ // 節點存在獲取數據 zooKeeper.getData("/AppConf",this,this,"aaa"); } } // Watcher @Override public void process(WatchedEvent event) { } // DataCallback @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { } }
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.zip.ZipOutputStream; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:29 下午 * @Description: org.qzx.config * @version: 1.0 */ public class TestConfig { private ZooKeeper zooKeeper; private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack(); @Before public void conn() throws Exception { zooKeeper = Utils.getZooKeeper(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test public void getConf(){ watcherAndCallBack.setZooKeeper(zooKeeper); // 這裏的/AppConf在zookeeper中其實是/test/AppConf, zooKeeper.exists("/AppConf",watcherAndCallBack,watcherAndCallBack,"123"); } }
如今,咱們再來考慮另一個問題,當咱們取數據的時候,zookeeper其實是使用的異步調用模型,這裏不會等待數據取回而是直接繼續執行主線程的任務,那麼在數據取回的時候要如何讓主線程知道呢?因此在這裏我們得準備一個接受數據的對象,該類叫MyConf,對應的代碼以下:ide
package org.qzx.config; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 2:00 下午 * @Description: org.qzx.config * @version: 1.0 */ public class MyConf { private String confData; public String getConfData() { return confData; } public void setConfData(String confData) { this.confData = confData; } }
因爲須要讓主線程接受數據,得在TestConfig類中聚合該對象,而且在getData的回調函數中須要爲MyConf設置數據,因此在MyWatcherAndCallBack中也得聚合該對象。函數
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.zip.ZipOutputStream; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:29 下午 * @Description: org.qzx.config * @version: 1.0 */ public class TestConfig { private ZooKeeper zooKeeper; private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack(); // 接受數據 MyConf myConf = new MyConf(); @Before public void conn() throws Exception { zooKeeper = Utils.getZooKeeper(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test public void getConf(){ watcherAndCallBack.setZooKeeper(zooKeeper); watcherAndCallBack.setMyConf(myConf); // 這裏的/AppConf在zookeeper中其實是/test/AppConf, zooKeeper.exists("/AppConf",watcherAndCallBack,watcherAndCallBack,"123"); } }
package org.qzx.config; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:40 下午 * @Description: org.qzx.config * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { private ZooKeeper zooKeeper; private MyConf myConf; public void setMyConf(MyConf myConf) { this.myConf = myConf; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // StatCallback @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if(stat!=null){ // 節點存在獲取數據 zooKeeper.getData("/AppConf",this,this,"aaa"); } } // Watcher @Override public void process(WatchedEvent event) { } // DataCallback @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if(stat!=null){ myConf.setConfData(new String(data)); } } }
這樣在數據取回來後,在TestConfig中就能夠看見該數據了。這裏存在着一個問題,在exists執行的時候不會等待數據的獲取而會一直執行下去,可是對於判斷時候,若是有該節點而且獲取數據應該是一個原子性的操做,在這裏咱們將這兩步封裝成一部操做完成。咱們能夠在MyWatcherAndCallBack類中添加一個方法用來等待該操做的執行,從而獲取數據結果,該方法就叫aWait().咱們這裏將exists方法移動到aWait方法中,同時使用CountDownLatch阻塞該操做,直到獲取數據成功爲止解鎖。這裏使用一個CountDownLatch完成了對於判斷節點存在和獲取數據的封裝,若是在TestConfig中對exists方法進行加鎖,那就還得將這把鎖傳遞到MyWatcherAndCallBack中在getData回調結束才能解鎖,這種實現方式顯然在語義上沒有將其移動到aWait方法中的更好。
package org.qzx.config; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:40 下午 * @Description: org.qzx.config * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { private ZooKeeper zooKeeper; private MyConf myConf; private CountDownLatch latch = new CountDownLatch(1); public void setMyConf(MyConf myConf) { this.myConf = myConf; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // StatCallback @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if(stat!=null){ // 節點存在獲取數據 zooKeeper.getData("/AppConf",this,this,"aaa"); } } // Watcher @Override public void process(WatchedEvent event) { } // DataCallback @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if(stat!=null){ myConf.setConfData(new String(data)); latch.countDown(); } } public void aWait() throws InterruptedException { // 這裏的/AppConf在zookeeper中其實是/test/AppConf, zooKeeper.exists("/AppConf",this,this,"123"); latch.await(); } }
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.zip.ZipOutputStream; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:29 下午 * @Description: org.qzx.config * @version: 1.0 */ public class TestConfig { private ZooKeeper zooKeeper; private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack(); // 接受數據 MyConf myConf = new MyConf(); @Before public void conn() throws Exception { zooKeeper = Utils.getZooKeeper(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test public void getConf() throws InterruptedException { watcherAndCallBack.setZooKeeper(zooKeeper); watcherAndCallBack.setMyConf(myConf); watcherAndCallBack.aWait(); } }
如今咱們對於判斷節點存在和成功獲取節點數據的這條路徑就編寫完畢了,接下來考慮節點被修改的狀況,首先是當節點不存在的時候,exists的回調不會執行,在節點被建立的時候,註冊在exists的watcher會被執行,那麼咱們只須要調用數據便可,其次是節點中的數據被修改,咱們須要從新得到新的節點數據而且設置到confData中,再就是節點被刪除,咱們須要將confData的數據置爲空。爲了觀察數據的變化,這裏在TestConfig中循環打印設置的數據。
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.concurrent.TimeUnit; import java.util.zip.ZipOutputStream; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:29 下午 * @Description: org.qzx.config * @version: 1.0 */ public class TestConfig { private ZooKeeper zooKeeper; private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack(); // 接受數據 MyConf myConf = new MyConf(); @Before public void conn() throws Exception { zooKeeper = Utils.getZooKeeper(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test public void getConf() throws InterruptedException { watcherAndCallBack.setZooKeeper(zooKeeper); watcherAndCallBack.setMyConf(myConf); watcherAndCallBack.aWait(); while (true){ System.out.println(myConf.getConfData()); TimeUnit.SECONDS.sleep(2); } } }
package org.qzx.config; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:40 下午 * @Description: org.qzx.config * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { private ZooKeeper zooKeeper; private MyConf myConf; private final CountDownLatch latch = new CountDownLatch(1); public void setMyConf(MyConf myConf) { this.myConf = myConf; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // StatCallback @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if(stat!=null){ // 節點存在獲取數據 zooKeeper.getData("/AppConf",this,this,"aaa"); } } // Watcher @Override public void process(WatchedEvent event) { switch (event.getType()) { case None: break; case NodeCreated: // 節點建立須要獲取數據 zooKeeper.getData("/AppConf",this,this,"bbb"); break; case NodeDeleted: // 節點刪除須要清空數據 myConf.setConfData(""); break; case NodeDataChanged: // 數據修改 zooKeeper.getData("/AppConf",this,this,"bbb"); break; case NodeChildrenChanged: break; case DataWatchRemoved: break; case ChildWatchRemoved: break; } } // DataCallback @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if(stat!=null){ myConf.setConfData(new String(data)); latch.countDown(); } } public void aWait() throws InterruptedException { // 這裏的/AppConf在zookeeper中其實是/test/AppConf, zooKeeper.exists("/AppConf",this,this,"123"); latch.await(); } }
接下來就是測試程序是否正確了,首先啓動4臺zookeeper,而後在根目錄下建立test工做目錄.
而後開始啓動程序,而後在zookeeper客戶端手動建立AppConf節點,而且設置數據olddata。
能夠看到程序輸出olddata.
如今再修改該節點數據爲newdata.
而後能夠看到程序輸出newdata.
在測試的時候發現若是刪除了節點會不斷的輸出空字符串,這個比較佔用IO和資源,修改成阻塞等待數據不空。同時在輸出的時候若是數據爲空打印一句數據爲空的提示,這裏對於MyWatcherAndCallBack中節點刪除的代碼須要注意的是,咱們是經過調用aWait方法來實現的阻塞,由於這樣會在節點數據存在時候自動解鎖,進而輸出節點數據,可是因爲CountDownLatch已經被減過了,因此這裏須要將latch從新賦值。
package org.qzx.config; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:40 下午 * @Description: org.qzx.config * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { private ZooKeeper zooKeeper; private MyConf myConf; private CountDownLatch latch = new CountDownLatch(1); public void setMyConf(MyConf myConf) { this.myConf = myConf; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // StatCallback @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if(stat!=null){ // 節點存在獲取數據 zooKeeper.getData("/AppConf",this,this,"aaa"); } } // Watcher @Override public void process(WatchedEvent event) { switch (event.getType()) { case None: break; case NodeCreated: // 節點建立須要獲取數據 zooKeeper.getData("/AppConf",this,this,"bbb"); break; case NodeDeleted: // 節點刪除須要清空數據而且等待數據到達 myConf.setConfData(""); latch = new CountDownLatch(1); break; case NodeDataChanged: // 數據修改 zooKeeper.getData("/AppConf",this,this,"bbb"); break; case NodeChildrenChanged: break; case DataWatchRemoved: break; case ChildWatchRemoved: break; } } // DataCallback @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if(stat!=null){ myConf.setConfData(new String(data)); latch.countDown(); } } public void aWait() throws InterruptedException { // 這裏的/AppConf在zookeeper中其實是/test/AppConf, zooKeeper.exists("/AppConf",this,this,"123"); latch.await(); } }
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.concurrent.TimeUnit; import java.util.zip.ZipOutputStream; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:29 下午 * @Description: org.qzx.config * @version: 1.0 */ public class TestConfig { private ZooKeeper zooKeeper; private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack(); // 接受數據 MyConf myConf = new MyConf(); @Before public void conn() throws Exception { zooKeeper = Utils.getZooKeeper(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test public void getConf() throws InterruptedException { watcherAndCallBack.setZooKeeper(zooKeeper); watcherAndCallBack.setMyConf(myConf); watcherAndCallBack.aWait(); while (true){ if(myConf.getConfData().equals("")){ System.out.println("數據爲空"); watcherAndCallBack.aWait();// 等待數據到達 } System.out.println(myConf.getConfData()); TimeUnit.SECONDS.sleep(2); } } }
接下來從新測試節點被刪除的狀況.
刪除節點後會發發現程序輸出數據爲空的提示後就阻塞住了。
如今從新建立節點:
會發現又從新得到了節點的數據。
到此對於zookeeper的配置註冊的代碼就編寫完畢。
這裏對於zookeeper的配置註冊作一個小小的總結,配置註冊本質上是在統一管理服務器共享的節點,其配置信息所有寫在了那1M的數據中,在一個服務器修改了該節點後,其餘的服務器會經過zookeeper的watcher機制接受到該消息,也就成功看到節點的實時變化完成更新配置的操做,這樣就完成了分佈式服務的協調功能。
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:02 下午 * @Description: org.qzx.config * @version: 1.0 */ public class Utils { // zookeeper對象 private static ZooKeeper zooKeeper; // 鏈接地址 private static String address = "10.211.55.5:2181,10.211.55.8:2181,10.211.55.9:2181,10.211.55.10:2181/test"; private static DefaultWatcher defaultWatcher = new DefaultWatcher(); // 鎖 private static CountDownLatch latch = new CountDownLatch(1); public static ZooKeeper getZooKeeper() throws Exception{ zooKeeper = new ZooKeeper(address,3000,defaultWatcher); defaultWatcher.setLatch(latch); latch.await(); return zooKeeper; } }
package org.qzx.config; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:05 下午 * @Description: org.qzx.config * @version: 1.0 */ public class DefaultWatcher implements Watcher { private CountDownLatch latch; public void setLatch(CountDownLatch latch) { this.latch = latch; } @Override public void process(WatchedEvent event) { switch (event.getState()) { case Unknown: break; case Disconnected: break; case NoSyncConnected: break; case SyncConnected: latch.countDown(); break; case AuthFailed: break; case ConnectedReadOnly: break; case SaslAuthenticated: break; case Expired: break; case Closed: break; } System.out.println(event.toString()); } }
package org.qzx.config; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:40 下午 * @Description: org.qzx.config * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { private ZooKeeper zooKeeper; private MyConf myConf; private CountDownLatch latch = new CountDownLatch(1); public void setMyConf(MyConf myConf) { this.myConf = myConf; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // StatCallback @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if(stat!=null){ // 節點存在獲取數據 zooKeeper.getData("/AppConf",this,this,"aaa"); } } // Watcher @Override public void process(WatchedEvent event) { switch (event.getType()) { case None: break; case NodeCreated: // 節點建立須要獲取數據 zooKeeper.getData("/AppConf",this,this,"bbb"); break; case NodeDeleted: // 節點刪除須要清空數據而且等待數據到達 myConf.setConfData(""); latch = new CountDownLatch(1); break; case NodeDataChanged: // 數據修改 zooKeeper.getData("/AppConf",this,this,"bbb"); break; case NodeChildrenChanged: break; case DataWatchRemoved: break; case ChildWatchRemoved: break; } } // DataCallback @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if(stat!=null){ myConf.setConfData(new String(data)); latch.countDown(); } } public void aWait() throws InterruptedException { // 這裏的/AppConf在zookeeper中其實是/test/AppConf, zooKeeper.exists("/AppConf",this,this,"123"); latch.await(); } }
package org.qzx.config; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 2:00 下午 * @Description: org.qzx.config * @version: 1.0 */ public class MyConf { private String confData; public String getConfData() { return confData; } public void setConfData(String confData) { this.confData = confData; } }
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.concurrent.TimeUnit; import java.util.zip.ZipOutputStream; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:29 下午 * @Description: org.qzx.config * @version: 1.0 */ public class TestConfig { private ZooKeeper zooKeeper; private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack(); // 接受數據 MyConf myConf = new MyConf(); @Before public void conn() throws Exception { zooKeeper = Utils.getZooKeeper(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test public void getConf() throws InterruptedException { watcherAndCallBack.setZooKeeper(zooKeeper); watcherAndCallBack.setMyConf(myConf); watcherAndCallBack.aWait(); while (true){ if(myConf.getConfData().equals("")){ System.out.println("數據爲空"); watcherAndCallBack.aWait();// 等待數據到達 } System.out.println(myConf.getConfData()); TimeUnit.SECONDS.sleep(2); } } }
可能有人本文篇幅較冗餘(尤爲是代碼部分)並且過於簡單,可是本人只是想完整的記錄響應式編程的思考過程和完整的代碼書寫流程,能夠供本身複習和爲小白提供一個入門zookeeper響應式編程的小demo。