<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency>
定義一個參數modeljava
public class ZookeeperRegisterConnectModel { //鏈接地址(能夠多個地址,用逗號分割) private String connectString; private int sessionTimeout; public String getConnectString() { return connectString; } public void setConnectString(String connectString) { this.connectString = connectString; } public int getSessionTimeout() { return sessionTimeout; } public void setSessionTimeout(int sessionTimeout) { this.sessionTimeout = sessionTimeout; } public ZookeeperRegisterConnectModel(){} public ZookeeperRegisterConnectModel(String connectString, int sessionTimeout) { this.connectString = connectString; this.sessionTimeout = sessionTimeout; } }
服務註冊具體代碼:apache
public class ZookeeperRegisterNodeServiceImpl implements IRegisterNodeService, Closeable { private ZooKeeper zk; /** * 實例鏈接zookeeper * * @param zookeeperConnectModel * @throws IOException */ public ZookeeperRegisterNodeServiceImpl(ZookeeperRegisterConnectModel zookeeperConnectModel) throws IOException { //建立一個與ZooKeeper服務器的鏈接 zk = new ZooKeeper(zookeeperConnectModel.getConnectString(), zookeeperConnectModel.getSessionTimeout(), new Watcher() { @Override public void process(WatchedEvent watchedEvent) { } }); } /** * 建立節點 * * @param service 服務接口名稱 * @param address 服務發佈的地址和端口 * @return * @throws KeeperException * @throws InterruptedException */ public String createNode(String service, String address) throws KeeperException, InterruptedException { Stat stat = zk.exists("/" + service, false); //不存在就建立根節點 if (stat == null) { zk.create("/" + service, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } //建立一個子節點 return zk.create("/" + service + "/" + address, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } @Override public void close() { try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } }
服務發現具體代碼:服務器
public class ZookeeperGetChildrenServiceImpl implements IGetChildrenService , Closeable { private ZooKeeper zk; public ZookeeperGetChildrenServiceImpl(ZookeeperFindConnectModel zookeeperConnectModel) throws IOException { //建立一個與ZooKeeper服務器的鏈接 zk = new ZooKeeper(zookeeperConnectModel.getConnectString(), zookeeperConnectModel.getSessionTimeout(), new Watcher() { @Override public void process(WatchedEvent watchedEvent) { } }); } /** * 獲取路徑下全部節點,幷隨機返回一個節點的信息 * @param path * @return * @throws KeeperException * @throws InterruptedException */ @Override public String getChildren(String path) throws KeeperException, InterruptedException { List<String> childrens = zk.getChildren("/"+path, false); if(null==childrens ||childrens.size()==0){ return null; } // shuffle 打亂順序 Collections.shuffle(childrens); return childrens.get(0); } @Override public void close() { try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } }
public class RegisterFindTest { private String service = "zookeeperTest"; boolean error=false; @Test public void findTest() throws InterruptedException, IOException, KeeperException { registerTest(); IGetChildrenService getChildrenService = new ZookeeperGetChildrenServiceImpl(new ZookeeperFindConnectModel("192.168.10.200:2181", 5000)); List<Thread> threads=new ArrayList<>(); for(int i=0;i<500;i++){ Thread thread = new Thread() { @Override public void run() { try { String organization = getChildrenService.getChildren(service); Assert.assertNotNull(service + "服務地址沒有找到", organization); } catch (InterruptedException | KeeperException |AssertionError e) { error=true; } } }; thread.start(); threads.add(thread); } threads.forEach(thread -> { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); Assert.assertFalse(error); } private void registerTest() throws IOException, KeeperException, InterruptedException { IRegisterNodeService registerNodeService = new ZookeeperRegisterNodeServiceImpl(new ZookeeperRegisterConnectModel("192.168.10.200:2181", 18000)); registerNodeService.createNode(service, "192.168.10.11:1111"); registerNodeService.createNode(service, "192.168.10.22:2222"); registerNodeService.createNode(service, "192.168.10.33:3333"); } }
/** ** 獲取節點上面的數據 ** @param path 路徑 ** @return ** @throws KeeperException ** @throws InterruptedException * */ public String getData(String path) throws KeeperException, InterruptedException { byte[] data = zookeeper.getData(path, false, null); if (data == null) { return ""; } return new String(data); } /** * 設置節點信息 * @param path 路徑 * @param data 數據 * @return * @throws KeeperException * @throws InterruptedException */ public Stat setData(String path,String data) throws KeeperException,InterruptedException{ Stat stat = zookeeper.setData(path, data.getBytes(), -1); return stat; } /** * 刪除節點 * @param path * @throws InterruptedException * @throws KeeperException */ public void deleteNode(String path) throws InterruptedException, KeeperException{ zookeeper.delete(path, -1); } /** * 獲取建立時間 * @param path * @return * @throws KeeperException * @throws InterruptedException */ public String getCTime(String path) throws KeeperException, InterruptedException{ Stat stat = zookeeper.exists(path, false); return String.valueOf(stat.getCtime()); } /** * 獲取某個路徑下孩子的數量 * @param path * @return * @throws KeeperException * @throws InterruptedException */ public Integer getChildrenNum(String path) throws KeeperException, InterruptedException{ int childenNum = zookeeper.getChildren(path, false).size(); return childenNum; } /** * 關閉鏈接 * @throws InterruptedException */ public void closeConnection() throws InterruptedException{ if (zookeeper != null) { zookeeper.close(); } }