導入zookeeper和junitjava
<dependencies> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.6</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies>
before,建立會話用,後面不在貼這部分代碼node
private static ZooKeeper zooKeeper; private static Stat stat = new Stat(); @Before public void before() { zooKeeper = ZookeeperConnect.getZookeeper(); }
CountDownLatch的知識點,參考java併發編程學習之CountDownLatch。ZooKeeperZookeeperConnect代碼以下:apache
public class ZookeeperConnect { private static final String CONNECT_STRING = "172.17.0.2:2181,172.17.0.3:2181,172.17.0.4:2181"; public static CountDownLatch countDownLatch = new CountDownLatch(1); static ZooKeeper zooKeeper = null; public static ZooKeeper getZookeeper() { try { // 第一個參數,是zookeeper集羣各個節點的地址,多個地址用逗號隔開 // 第二個參數,會話超時時間,毫秒爲單位。 // 第三個參數,是Watcher,這邊用CountDownLatch控制鏈接成功後,才返回zooKeeper。 zooKeeper = new ZooKeeper(CONNECT_STRING, 5000, new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("watchedEvent:" + watchedEvent); if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) { countDownLatch.countDown(); } } } }); System.out.println("等待中"); countDownLatch.await(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("獲取到鏈接"); return zooKeeper; } }
測試代碼編程
@Test public void testConnect(){ }
運行結果:segmentfault
包括同步和異步。測試代碼以下:服務器
@Test public void testCreate() throws KeeperException, InterruptedException { // 第一個參數,是節點路徑,一樣不能直接多級設置,會報KeeperErrorCode = NoNode for的錯誤 // 第二個參數,是節點的內容 // 第三個參數,是節點的ACL策略,這邊先用OPEN_ACL_UNSAFE放寬權限操做 // 第四個參數,是節點的類型,好比持久化節點、持久化有序節點、臨時節點、臨時有序節點 String result = zooKeeper.create("/node", "node".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("新增節點成功:" + result); } @Test public void testCreateASync() { // 第五個參數,是用於異步回調 // 第六個參數,是用於回調的時候,傳遞一個上下文信息 zooKeeper.create("/nodeASync", "nodeASync".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new MyStringCallBack(), "testCreateASync"); System.out.println("新增節點成功"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } }
MyStringCallBack併發
public class MyStringCallBack implements AsyncCallback.StringCallback { public void processResult(int rc, String path, Object ctx, String name) { System.out.println("rc:" + rc + ",path:" + path + ",ctx:" + ctx + "name," + name); } }
同步運行結果以下:
異步運行結果以下:
在上面的例子中,能夠看到,testCreate()
是有拋異常的,而異步是沒有拋異常的,異常的信息經過Result Code來響應。
回調函數的接口,都是繼承了AsyncCallback
。在AsyncCallback
類中,裏面還有DataCallback
、VoidCallback
、ACLCallback
、StatCallback
等回調接口,根據ZooKeeper類中,不一樣的方法,調用不一樣的接口。後續幾個方法的演示,這個異步的就略過了,大同小異。
接口中的processResult()
方法,參數說明以下:異步
測試代碼函數
@Test public void testDelete() throws KeeperException, InterruptedException { // 第一個參數,是節點路徑 // 第二個參數,是節點的版本,,-1表示不參與原子性操做 // 其餘回調參數、上下文跟上面雷同 zooKeeper.delete("/node", -1); System.out.println("刪除節點成功"); }
運行結果以下:學習
測試代碼
@Test public void testGetChildren() throws KeeperException, InterruptedException { // 第一個參數,是獲取這個節點路徑下面的子節點 // 第二個參數,true表示使用默認的Watcher,false表示不用Watcher來監聽。這個默認的watcher就是建立會話的時候的Watcher // 其餘參數,好比註冊Watcher、回調、上下文、stat信息就略過 List<String> children = zooKeeper.getChildren("/", true); System.out.println(children); }
運行結果以下,在這以前,又運行了node添加,因此就有三個節點:
測試代碼:
@Test public void testGetData() throws KeeperException, InterruptedException { // 第一個參數,是獲取這個路徑的數據 // 第二個參數,true表示使用默認的Watcher,false表示不用Watcher來監聽。這個默認的watcher就是建立會話的時候的Watcher。 // 第三個參數,接收新的stat信息 byte[] data = zooKeeper.getData("/node", true, stat); System.out.println(stat); System.out.println("node數據爲:" + new String(data)); }
運行結果以下:
測試代碼:
@Test public void testSetData() throws KeeperException, InterruptedException { byte[] data = zooKeeper.getData("/node", true, stat); System.out.println("node數據爲:" + new String(data)); // 第一個參數,是設置節點的路徑 // 第二個參數,是設置節點的數據 // 第三個參數,是設置節點的版本,-1表示不參與原子性操做 zooKeeper.setData("/node", "newData".getBytes(), -1); data = zooKeeper.getData("/node", true, stat); System.out.println("node數據爲:" + new String(data)); }
運行結果以下:
測試代碼:
@Test public void testExists() throws KeeperException, InterruptedException { // 第一個參數,須要獲取信息的節點的路徑 // 第二個參數,true表示使用默認的Watcher,false表示不用Watcher來監聽。這個默認的watcher就是建立會話的時候的Watcher System.out.println("node節點信息:" + zooKeeper.exists("/node", true)); }
運行結果以下:
當客戶端獲取到節點的信息或者節點的子列表信息時,能夠經過註冊Watcher來監聽節點或子列表的變化信息。指的一提的是,Watcher的通知,是一次性的,一旦觸發後,這個Watcher就失效了,因此想要一直監聽,就要一直反覆的註冊Watcher。
WatchedEvent有如下幾種:
ZookeeperConnect修改爲:
public class ZookeeperConnect { private static final String CONNECT_STRING = "172.17.0.2:2181,172.17.0.3:2181,172.17.0.4:2181"; public static CountDownLatch countDownLatch = new CountDownLatch(1); static ZooKeeper zooKeeper = null; public static ZooKeeper getZookeeper() { try { zooKeeper = new ZooKeeper(CONNECT_STRING, 5000, new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("watchedEvent:" + watchedEvent); if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) { countDownLatch.countDown(); } else { switch (watchedEvent.getType()) { case NodeChildrenChanged: System.out.println("NodeChildrenChanged"); try { zooKeeper.getChildren(watchedEvent.getPath(), true); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } break; case NodeDataChanged: try { if (watchedEvent.getPath().equals("/node/node_1")) { zooKeeper.getData(watchedEvent.getPath(), true, new Stat()); } else { zooKeeper.exists(watchedEvent.getPath(), true); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("NodeDataChanged"); break; case NodeCreated: try { zooKeeper.exists(watchedEvent.getPath(), true); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("NodeCreated"); break; case NodeDeleted: System.out.println("NodeDeleted"); break; default: break; } } } } }); System.out.println("等待中"); countDownLatch.await(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("獲取到鏈接"); return zooKeeper; } }
測試代碼:
@Test public void testGetChildren4Watcher() throws KeeperException, InterruptedException { zooKeeper.getChildren("/node", true); System.out.println("create prepare"); zooKeeper.create("/node/node_1", "node_1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("setData prepare"); zooKeeper.setData("/node/node_1", "node_1_new".getBytes(), -1); System.out.println("delete prepare"); zooKeeper.delete("/node/node_1", -1); }
運行結果以下:
監聽了/node的子節點,能夠看出,子節點的建立、刪除,都獲得了監聽。
測試代碼:
@Test public void testGetData4Watcher() throws KeeperException, InterruptedException { zooKeeper.create("/node/node_1", "node_1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zooKeeper.getData("/node/node_1", true, stat); System.out.println("setData prepare"); zooKeeper.setData("/node/node_1", "node_1_new".getBytes(), -1); TimeUnit.MILLISECONDS.sleep(200); System.out.println(); System.out.println("delete prepare"); zooKeeper.delete("/node/node_1", -1); }
運行結果以下:
監聽了/node/node_1節點,能夠看出,/node/node_1節點的修改、刪除,都獲得了監聽。
測試代碼:
@Test public void testExists4Watcher() throws KeeperException, InterruptedException { if(null==zooKeeper.exists("/node/node_2", true)){ System.out.println("create prepare"); zooKeeper.create("/node/node_2", "node_2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zooKeeper.getData("/node/node_2", true, stat); TimeUnit.MILLISECONDS.sleep(200); System.out.println(); System.out.println("setData prepare"); zooKeeper.setData("/node/node_2", "node_2_new".getBytes(), -1); TimeUnit.MILLISECONDS.sleep(200); System.out.println("delete prepare"); System.out.println(); zooKeeper.delete("/node/node_2", -1); } }
運行結果以下:
監聽了/node/node_2,能夠看出,/node/node_2節點的建立、修改、刪除,都獲得了監聽。
zookeeper提供了ACL的權限控制機制,經過服務器數據節點的ACL,來控制客戶端對該節點的控制權限。
ACL的權限控制模式:
測試代碼:
@Test public void testAuth() throws IOException, KeeperException, InterruptedException { ZooKeeper zooKeeper1 = new ZooKeeper(ZookeeperConnect.CONNECT_STRING, 5000, null); zooKeeper1.addAuthInfo("digest", "name:dajun".getBytes()); zooKeeper1.create("/node2", "node2".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL); System.out.println("zooKeeper1:"+new String(zooKeeper1.getData("/node2", false, stat))); ZooKeeper zooKeeper2 = new ZooKeeper(ZookeeperConnect.CONNECT_STRING, 5000, null); System.out.println("zooKeeper2:"+new String(zooKeeper2.getData("/node2", false, stat))); }
運行結果以下:
zooKeeper2獲取節點信息的時候,拋異常了。
下面咱們來看看特殊的delete操做。
測試代碼:
@Test public void testAuth4Delete() throws IOException, KeeperException, InterruptedException { ZooKeeper zooKeeper1 = new ZooKeeper(ZookeeperConnect.CONNECT_STRING, 5000, null); zooKeeper1.addAuthInfo("digest", "name:dajun".getBytes()); zooKeeper1.create("/node2", "node2".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT); zooKeeper1.create("/node2/node2_1", "node2_1".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL); ZooKeeper zooKeeper2 = new ZooKeeper(ZookeeperConnect.CONNECT_STRING, 5000, null); try { zooKeeper2.delete("/node2/node2_1", -1); } catch (Exception e) { System.out.println("delete error:" + e.getMessage()); } ZooKeeper zooKeeper3 = new ZooKeeper(ZookeeperConnect.CONNECT_STRING, 5000, null); zooKeeper3.addAuthInfo("digest", "name:dajun".getBytes()); zooKeeper3.delete("/node2/node2_1", -1); System.out.println("delete /node2/node2_1 sucess"); ZooKeeper zooKeeper4 = new ZooKeeper(ZookeeperConnect.CONNECT_STRING, 5000, null); zooKeeper4.delete("/node2", -1); System.out.println("delete /node2 sucess"); }
運行結果以下:zookeeper1,對/node2和/node2/node2_1設置了權限。zookeeper2,刪除的時候,由於沒權限,刪除失敗。zookeeper3,有權限,刪除成功。zookeeper4,沒有權限,也刪除成功。能夠看出,刪除的時候,權限的做用訪問是其子節點,須要權限才能夠刪除,可是這個對於該節點,仍是能夠任意的刪除。固然,讀取等權限,也是不容許的,只對刪除節點有用。