zookeeper簡單實現註冊與發現以及其餘基本操做

添加依賴

<dependency>
   <groupId>org.apache.zookeeper</groupId>
   <artifactId>zookeeper</artifactId>
   <version>3.4.6</version>
</dependency>

註冊

定義一個參數modeljava

public class ZookeeperRegisterConnectModel {
    //鏈接地址(能夠多個地址,用逗號分割)
    private String connectString;
    private int sessionTimeout;

    public String getConnectString() {
        return connectString;
    }

    public void setConnectString(String connectString) {
        this.connectString = connectString;
    }

    public int getSessionTimeout() {
        return sessionTimeout;
    }

    public void setSessionTimeout(int sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    public ZookeeperRegisterConnectModel(){}
    public ZookeeperRegisterConnectModel(String connectString, int sessionTimeout) {
        this.connectString = connectString;
        this.sessionTimeout = sessionTimeout;
    }
}

服務註冊具體代碼:apache

public class ZookeeperRegisterNodeServiceImpl implements IRegisterNodeService, Closeable {

    private ZooKeeper zk;

    /**
     * 實例鏈接zookeeper
     *
     * @param zookeeperConnectModel
     * @throws IOException
     */
    public ZookeeperRegisterNodeServiceImpl(ZookeeperRegisterConnectModel zookeeperConnectModel) throws IOException {
        //建立一個與ZooKeeper服務器的鏈接
        zk = new ZooKeeper(zookeeperConnectModel.getConnectString(), zookeeperConnectModel.getSessionTimeout(), new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }

    /**
     * 建立節點
     *
     * @param service 服務接口名稱
     * @param address 服務發佈的地址和端口
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public String createNode(String service, String address) throws KeeperException, InterruptedException {
        Stat stat = zk.exists("/" + service, false);
        //不存在就建立根節點
        if (stat == null) {
            zk.create("/" + service, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        //建立一個子節點
        return zk.create("/" + service + "/" + address, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    }

    @Override
    public void close() {
        try {
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

發現

服務發現具體代碼:服務器

public class ZookeeperGetChildrenServiceImpl implements IGetChildrenService , Closeable {

    private ZooKeeper zk;

    public ZookeeperGetChildrenServiceImpl(ZookeeperFindConnectModel zookeeperConnectModel) throws IOException {
        //建立一個與ZooKeeper服務器的鏈接
        zk = new ZooKeeper(zookeeperConnectModel.getConnectString(), zookeeperConnectModel.getSessionTimeout(), new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }

    /**
     * 獲取路徑下全部節點,幷隨機返回一個節點的信息
     * @param path
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Override
    public String getChildren(String path) throws KeeperException, InterruptedException {
        List<String> childrens = zk.getChildren("/"+path, false);
        if(null==childrens ||childrens.size()==0){
            return null;
        }
        // shuffle 打亂順序
        Collections.shuffle(childrens);
        return childrens.get(0);
    }

    @Override
    public void close() {
        try {
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

測試

public class RegisterFindTest {
    private String service = "zookeeperTest";
    boolean error=false;

    @Test
    public void findTest() throws InterruptedException, IOException, KeeperException {
        registerTest();

        IGetChildrenService getChildrenService = new ZookeeperGetChildrenServiceImpl(new ZookeeperFindConnectModel("192.168.10.200:2181", 5000));
        List<Thread> threads=new ArrayList<>();
        for(int i=0;i<500;i++){
            Thread thread = new Thread() {
                @Override
                public void run() {
                    try {

                        String organization = getChildrenService.getChildren(service);
                        Assert.assertNotNull(service + "服務地址沒有找到", organization);
                    } catch (InterruptedException | KeeperException   |AssertionError e) {
                        error=true;
                    }
                }
            };
            thread.start();
            threads.add(thread);
        }
        threads.forEach(thread -> {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Assert.assertFalse(error);
    }


    private void registerTest() throws IOException, KeeperException, InterruptedException {
        IRegisterNodeService registerNodeService = new ZookeeperRegisterNodeServiceImpl(new ZookeeperRegisterConnectModel("192.168.10.200:2181", 18000));
        registerNodeService.createNode(service, "192.168.10.11:1111");
        registerNodeService.createNode(service, "192.168.10.22:2222");
        registerNodeService.createNode(service, "192.168.10.33:3333");
    }
}

其餘基本操做

/**
     ** 獲取節點上面的數據
     ** @param path 路徑
     ** @return
     ** @throws KeeperException
     ** @throws InterruptedException
     * 
     */
    public String getData(String path) throws KeeperException, InterruptedException {
        byte[] data = zookeeper.getData(path, false, null);
        if (data == null) {
            return "";
        }
        return new String(data);
    }
    
    /**
     * 設置節點信息
     * @param path 路徑
     * @param data 數據
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public Stat setData(String path,String data) throws KeeperException,InterruptedException{
        Stat stat = zookeeper.setData(path, data.getBytes(), -1);
        return stat;
    }
    
    /**
     * 刪除節點
     * @param path
     * @throws InterruptedException
     * @throws KeeperException
     */
    public void deleteNode(String path) throws InterruptedException, KeeperException{
        zookeeper.delete(path, -1);
    }
    
    /**
     * 獲取建立時間
     * @param path
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public String getCTime(String path) throws KeeperException, InterruptedException{
        Stat stat = zookeeper.exists(path, false);
        return String.valueOf(stat.getCtime());
    }
    
    /**
     * 獲取某個路徑下孩子的數量
     * @param path
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public Integer getChildrenNum(String path) throws KeeperException, InterruptedException{
        int childenNum = zookeeper.getChildren(path, false).size();
        return childenNum;
    }
    
    /**
     * 關閉鏈接
     * @throws InterruptedException
     */
    public void closeConnection() throws InterruptedException{
        if (zookeeper != null) {
            zookeeper.close();
        }
    }
相關文章
相關標籤/搜索