zookeeper系列(一)zookeeper基礎

zookeeper系列(一)zookeeper基礎
zookeeper系列(二)實戰master選舉
zookeeper系列(三)實戰數據發佈訂閱
zookeeper系列(四)實戰負載均衡
zookeeper系列(五)實戰分佈式鎖
zookeeper系列(六)實戰分佈式隊列
zookeeper系列(七)實戰分佈式命名服務
zookeeper系列(八)zookeeper運維java

zookeeper集羣配置

準備3臺服務器:
n1:192.168.1.101
n2:192.168.1.102
n3:192.168.1.103node

安裝zookeeper

cd /opt
wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.10.tar.gz
tar -zxvf zookeeper-3.4.10.tar.gz
mv zookeeper-3.4.10 zookeeper

配置zookeeper

cd zookeeper/conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
dataDir=/var/zookeeper # 修改zookeeper的數據目錄
server.1=192.168.1.101:2888:3888 # 指定集羣節點
server.2=192.168.1.102:2888:3888 # 指定集羣節點
server.3=192.168.1.103:2888:3888 # 指定集羣節點

建立zookeeper數據目錄和myid文件:apache

cd /var
mkdir zookeeper
cd zookeeper
vi myid

myid裏鍵入各自在配置文件中的服務器編號便可。segmentfault

啓動zookeeper

cd /opt/zookeeper/conf
./zkServer.sh start

驗證狀態

telnet 192.168.1.101 2181

stat獲取zookeepr狀態服務器

stat

輸出:This ZooKeeper instance is not currently serving requestssession

再啓動第二臺,超過1/2的zookeeper集羣節點正常工做後,zookeeper就能夠提供服務了。負載均衡

從新telnet後用stat查看n1:運維

Latency min/avg/max: 0/0/0
Received: 2
Sent: 1
Connections: 1
Outstanding: 0
Zxid: 0x0
Mode: follower
Node count: 4

查看n2:maven

Latency min/avg/max: 0/0/0
Received: 2
Sent: 1
Connections: 1
Outstanding: 0
Zxid: 0x100000000
Mode: leader
Node count: 4

能夠看到n1是follower角色,n2是leader角色。分佈式

zkCli.sh的使用

cd /opt/zookeeper/bin
./zkCli.sh -timeout 5000 -server 192.168.1.101:2181

輸入h顯示客戶端可用命令:

h
ZooKeeper -server host:port cmd args
        stat path [watch]
        set path data [version]
        ls path [watch]
        delquota [-n|-b] path
        ls2 path [watch]
        setAcl path acl
        setquota -n|-b val path
        history 
        redo cmdno
        printwatches on|off
        delete path [version]
        sync path
        listquota path
        rmr path
        get path [watch]
        create [-s] [-e] path data acl
        addauth scheme auth
        quit 
        getAcl path
        close 
        connect host:port

開源客戶端ZkClient

引入maven依賴

<dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.4.6</version>
    </dependency>    
    <dependency>
      <groupId>com.101tec</groupId>
      <artifactId>zkclient</artifactId>
      <version>0.5</version>
    </dependency>

會話建立

public class CreateSession {

    public static void main(String[] args) {
        ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
        System.out.println("conneted ok!");
    }
    
}

節點建立

public class CreateNode {

    public static void main(String[] args) {
        ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
        System.out.println("conneted ok!");
        
        
        User u = new User();
        u.setId(1);
        u.setName("test");
        String path = zc.create("/jike5", u, CreateMode.PERSISTENT);
        System.out.println("created path:"+path);
    }
    
}

獲取節點

public class GetData {

    public static void main(String[] args) {
        ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
        System.out.println("conneted ok!");
        
        Stat stat = new Stat();
        User u = zc.readData("/jike5",stat);
        System.out.println(u.toString());
        System.out.println(stat);
        
    }
    
}

獲取子節點

public class GetChild {

    public static void main(String[] args) {
        ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
        System.out.println("conneted ok!");
        
        List<String> cList = zc.getChildren("/jike5");
        
        System.out.println(cList.toString());
        
    }
    
}

檢測節點

public class NodeExists {

    public static void main(String[] args) {
        ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
        System.out.println("conneted ok!");
        
        boolean e = zc.exists("/jike5");
        
        System.out.println(e);
        
    }
    
}

節點刪除

public class NodeDel {

    public static void main(String[] args) {
        ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
        System.out.println("conneted ok!");
        
        boolean e1 = zc.delete("/jike5");
        boolean e2 = zc.deleteRecursive("/jike5");
        
    }
    
}

數據修改

public class WriteData {

    public static void main(String[] args) {
        ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
        System.out.println("conneted ok!");
        
        
        User u = new User();
        u.setId(2);
        u.setName("test2");
        zc.writeData("/jike5", u, 1);
        
    }
    
}

訂閱子節點列表變化

public class SubscribeChildChanges {
    
    private static class ZkChildListener implements IZkChildListener{

        public void handleChildChange(String parentPath,
                List<String> currentChilds) throws Exception {

            System.out.println(parentPath);
            System.out.println(currentChilds.toString());
            
        }
        
    }

    public static void main(String[] args) throws InterruptedException {
        ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
        System.out.println("conneted ok!");

        // 除子節點變化外,節點自己建立和刪除也會收到通知
        zc.subscribeChildChanges("/jike20", new ZkChildListener());
        Thread.sleep(Integer.MAX_VALUE);

    }
    
}

訂閱數據內容變化

public class SubscribeDataChanges {
    
    private static class ZkDataListener implements IZkDataListener{

        public void handleDataChange(String dataPath, Object data)
                throws Exception {
            System.out.println(dataPath+":"+data.toString());
        }

        public void handleDataDeleted(String dataPath) throws Exception {
            System.out.println(dataPath);
        }

    }

    public static void main(String[] args) throws InterruptedException {
        ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new BytesPushThroughSerializer());
        System.out.println("conneted ok!");
        
        zc.subscribeDataChanges("/jike20", new ZkDataListener());
        Thread.sleep(Integer.MAX_VALUE);

    }
    
}

開源客戶端Curator

引入maven依賴

<dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>2.8.0</version>
    </dependency> 
    
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>2.8.0</version>
    </dependency>

建立會話

public class CreateSession {

    public static void main(String[] args) throws InterruptedException {
        
        //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        //RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
        RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
//        CuratorFramework client = CuratorFrameworkFactory
//                .newClient("192.168.1.105:2181",5000,5000, retryPolicy);
        
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .connectString("192.168.1.105:2181")
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();
        
        client.start();
        
        Thread.sleep(Integer.MAX_VALUE);
        
        
    }
    
}

建立節點

public class CreateNode {

    public static void main(String[] args) throws Exception {
        
        //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        //RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
        RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
//        CuratorFramework client = CuratorFrameworkFactory
//                .newClient("192.168.1.105:2181",5000,5000, retryPolicy);
        
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .connectString("192.168.1.105:2181")
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();
        
        client.start();
        
        
        String path = client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath("/jike/1","123".getBytes());
        
        System.out.println(path);
        
        Thread.sleep(Integer.MAX_VALUE);
        
        
    }
    
}

節點刪除

public class DelNode {

    public static void main(String[] args) throws Exception {
        
        //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        //RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
        RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
//        CuratorFramework client = CuratorFrameworkFactory
//                .newClient("192.168.1.105:2181",5000,5000, retryPolicy);
        
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .connectString("192.168.1.105:2181")
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();
        
        client.start();
        
        client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath("/jike20");

        
        Thread.sleep(Integer.MAX_VALUE);
        
        
    }
    
}

獲取子節點

public class GetChildren {

    public static void main(String[] args) throws Exception {
        
        //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        //RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
        RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
//        CuratorFramework client = CuratorFrameworkFactory
//                .newClient("192.168.1.105:2181",5000,5000, retryPolicy);
        
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .connectString("192.168.1.105:2181")
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();
        
        client.start();
        
        List<String> cList = client.getChildren().forPath("/jike20");
        
        System.out.println(cList.toString());
        
    }
    
}

獲取節點內容

public class GetData {

    public static void main(String[] args) throws Exception {
        
        //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        //RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
        RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
//        CuratorFramework client = CuratorFrameworkFactory
//                .newClient("192.168.1.105:2181",5000,5000, retryPolicy);

        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .connectString("192.168.1.105:2181")
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();
        
        client.start();
        
        Stat stat = new Stat();
        
        byte[] ret = client.getData().storingStatIn(stat).forPath("/jike");
        
        System.out.println(new String(ret));
        
        System.out.println(stat);
        
        
    }
    
}

節點修改

public class UpdateData {

    public static void main(String[] args) throws Exception {
        
        //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        //RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
        RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
//        CuratorFramework client = CuratorFrameworkFactory
//                .newClient("192.168.1.105:2181",5000,5000, retryPolicy);
        
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .connectString("192.168.1.105:2181")
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();
        
        client.start();
        
        Stat stat = new Stat();        
        client.getData().storingStatIn(stat).forPath("/jike");
        
        client.setData().withVersion(stat.getVersion()).forPath("/jike", "123".getBytes());
        
        
    }
    
}

檢測節點

public class checkexists {

    public static void main(String[] args) throws Exception {
        
        ExecutorService es = Executors.newFixedThreadPool(5);
        
        //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        //RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
        RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
//        CuratorFramework client = CuratorFrameworkFactory
//                .newClient("192.168.1.105:2181",5000,5000, retryPolicy);
        
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .connectString("192.168.1.105:2181")
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();
        
        client.start();
        
        // Stat s = client.checkExists().forPath("/jike");
        
        client.checkExists().inBackground(new BackgroundCallback() {
            
            public void processResult(CuratorFramework arg0, CuratorEvent arg1)
                    throws Exception {
                
                Stat stat = arg1.getStat();
                System.out.println(stat);
                System.out.println(arg1.getContext());
                
            }
        },"123",es).forPath("/jike");
        
        Thread.sleep(Integer.MAX_VALUE);
        
    }
    
}

節點監聽

public class NodeListener {

    public static void main(String[] args) throws Exception {
        
        RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
        
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .connectString("192.168.1.105:2181")
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();
        
        client.start();
        
        final NodeCache cache = new NodeCache(client,"/jike");
        cache.start();
        cache.getListenable().addListener(new NodeCacheListener() {
            
            public void nodeChanged() throws Exception {
                byte[] ret = cache.getCurrentData().getData();
                System.out.println("new data:"+new String(ret));
            }
        });
        
        Thread.sleep(Integer.MAX_VALUE);
        
    }
    
}

監聽子節點

public class NodeChildrenListener {

    public static void main(String[] args) throws Exception {
        
        RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
        
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .connectString("192.168.1.105:2181")
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();
        
        client.start();
        
        final PathChildrenCache cache = new PathChildrenCache(client,"/jike",true);
        cache.start();
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
                    throws Exception {
                switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("CHILD_ADDED:"+event.getData());
                    break;
                case CHILD_UPDATED:
                    System.out.println("CHILD_UPDATED:"+event.getData());
                    break;
                case CHILD_REMOVED:
                    System.out.println("CHILD_REMOVED:"+event.getData());
                    break;
                default:
                    break;
                }
            }
        });
        
        Thread.sleep(Integer.MAX_VALUE);
        
    }
    
}

指定權限

public class CreateNodeAuth {

    public static void main(String[] args) throws Exception {
        
        RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
        
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .connectString("192.168.1.105:2181")
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();
        
        client.start();
        
        ACL aclIp = new ACL(Perms.READ,new Id("ip","192.168.1.105"));
        ACL aclDigest = new ACL(Perms.READ|Perms.WRITE,new Id("digest",DigestAuthenticationProvider.generateDigest("jike:123456")));
        ArrayList<ACL> acls = new ArrayList<ACL>();
        acls.add(aclDigest);
        acls.add(aclIp);
        
        String path = client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .withACL(acls)
                .forPath("/jike/3","123".getBytes());
        
        System.out.println(path);
        
        Thread.sleep(Integer.MAX_VALUE);
        
        
    }
    
}

zookeeper系列(一)zookeeper基礎
zookeeper系列(二)實戰master選舉
zookeeper系列(三)實戰數據發佈訂閱
zookeeper系列(四)實戰負載均衡
zookeeper系列(五)實戰分佈式鎖
zookeeper系列(六)實戰分佈式隊列
zookeeper系列(七)實戰分佈式命名服務
zookeeper系列(八)zookeeper運維

相關文章
相關標籤/搜索