zookeeper - 經過java代碼鏈接zookeeper(2)

首先建立一個Maven項目java

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>groupId</groupId>
    <artifactId>code</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.11</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.4-beta</version>
            <type>pom</type>
        </dependency>

        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

    </dependencies>

</project>
pom.xml
package com.amber;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;


/**
 * zookeeper
 * 鏈接zookeeper
 * 建立znode
 * 獲取znode值
 * 斷開連接
 */
public class ZookeeperDemo {
    private ZooKeeper zookeeper;

    /**
     * 連接zookeeper
     * @return
     * @throws IOException
     */
    public ZooKeeper zkConnect( ) throws IOException {
        //zookeeper的ip:端口
        String path = "127.0.0.1:2181";
        //第二個參數是超時時間,第三個參數是設置觀察者,如今能夠先無論
        zookeeper = new ZooKeeper(path, 20 * 1000, null);
        return zookeeper;
    }

    /**
     * 建立znode節點
     * @param path znode的路徑
     * @param value znode的值
     * @param watcher
     * @param node //建立node的模式
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void createZnode(String path, byte[] value, Watcher watcher, CreateMode node ) throws KeeperException, InterruptedException {
        zookeeper.create(path, value, ZooDefs.Ids.OPEN_ACL_UNSAFE, node);
    }

    /**
     * 經過path得到znode的值
     * @param path
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public String getZnodeValue(String path ) throws KeeperException, InterruptedException {
        //第二個值是表明是否開啓監聽,這裏仍是先無論.第三個參數就是結構體
        byte[] data = zookeeper.getData(path, false, new Stat());
        return new String(data);
    }

    public void close() {
        try {
            if (zookeeper != null) {
                zookeeper.close();
            }
        } catch (InterruptedException e) {
                e.printStackTrace();
        }
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        ZookeeperDemo zookeeperDemo = new ZookeeperDemo();
        //獲取鏈接
        ZooKeeper zooKeeper = zookeeperDemo.zkConnect();
        //建立znode
        zookeeperDemo.createZnode("/amber", "hahaha".getBytes(), null, CreateMode.PERSISTENT);
        //獲取znode的值
        String znodeValue = zookeeperDemo.getZnodeValue("/amber");
        System.out.println(znodeValue);
        
        zookeeperDemo.close();

    }
}
ZookeeperDemo

經過上面的代碼就能夠實現經過java代碼操控zookeeper.可是你可能有疑惑的是node

  • Create.PERSISTENT是什麼
  • watcher是什麼

Znode的四種類型

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.zookeeper;

import org.apache.yetus.audience.InterfaceAudience.Public;
import org.apache.zookeeper.KeeperException.BadArgumentsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Public
public enum CreateMode {
    PERSISTENT(0, false, false),
    PERSISTENT_SEQUENTIAL(2, false, true),
    EPHEMERAL(1, true, false),
    EPHEMERAL_SEQUENTIAL(3, true, true);

    private static final Logger LOG = LoggerFactory.getLogger(CreateMode.class);
    private boolean ephemeral;
    private boolean sequential;
    private int flag;

    private CreateMode(int flag, boolean ephemeral, boolean sequential) {
        this.flag = flag;
        this.ephemeral = ephemeral;
        this.sequential = sequential;
    }

    public boolean isEphemeral() {
        return this.ephemeral;
    }

    public boolean isSequential() {
        return this.sequential;
    }

    public int toFlag() {
        return this.flag;
    }

    public static CreateMode fromFlag(int flag) throws KeeperException {
        switch(flag) {
        case 0:
            return PERSISTENT;
        case 1:
            return EPHEMERAL;
        case 2:
            return PERSISTENT_SEQUENTIAL;
        case 3:
            return EPHEMERAL_SEQUENTIAL;
        default:
            String errMsg = "Received an invalid flag value: " + flag + " to convert to a CreateMode";
            LOG.error(errMsg);
            throw new BadArgumentsException(errMsg);
        }
    }
}
CreateMode
 
znode分四種類型
PERSISTENT                 持久節點  對應命令 create path value
 
PERSISTENT_SEQUENTIAL     順序自動編號持久化節點,這種節點會根據當前已存在的節點數自動加  1 (有序持久節點)create -s path value
 
EPHEMERAL                 臨時節點, 客戶端session超時這類節點就會被自動刪除 create -e path value
 
EPHEMERAL_SEQUENTIAL      臨時自動編號節點 (臨時有序節點) create -s -e path value
CreateMode是一個枚舉類型,裏面有四個對象分別是.表示的就是Znode的類型
 PERSISTENT(0, false, false), //持久節點
    PERSISTENT_SEQUENTIAL(2, false, true), //有序節點
    EPHEMERAL(1, true, false), //臨時節點 只存在本次session中,當服務器重啓後就會不見
    EPHEMERAL_SEQUENTIAL(3, true, true); //有序臨時節點 重啓後數據不見

在建立持久節點(PERSISTENT)的時候,應該注意由於znode的path是不容許重複的,所以在建立持久節點以前,應先判斷節點是否存在。可是持久有序節點(PERSISTENT_SEQUENTIAL)會自動在/path後面跟上dataVersion序號apache

if (zooKeeper.exists(path, false) == null) {
            zookeeperWatchDemo.createZnode(path, value.getBytes(), null, CreateMode.PERSISTENT);
        }

watcher

Zookeeper支持發佈訂閱功能,引入了watcher機制進行監聽。當數據進行變更之後,能夠及時通知客戶端,數據進行了變更,而且把相應的時間通知給Watcher的Client。服務器

watcher的特性:session

  • Watcher一次性觸發器:只能監聽一次,是一個一次性的動做,若是須要監聽屢次,那麼應該遞歸
  • 可使用系統默認的watcher,也能夠自定義Watcher.自定義Watcher必須實現
    org.apache.zookeeper.Watcher接口
  • Zookeeper的getData(),getChildren(),exists()均可以設置Watch選項。當watch爲false的時候,或者watcher爲null的時候表明不開啓watch
new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    triggerWatch(path);
                }
package com.amber;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;

public class ZookeeperWatchDemo {
    private ZooKeeper zookeeper;
    private String oldValue = "";
    private String newValue = "";
    public ZooKeeper zkConnect( ) throws IOException {
        String path = "127.0.0.1:2181";
        zookeeper = new ZooKeeper(path, 20 * 1000, null);
        return zookeeper;
    }

    public void createZnode(String path, byte[] value, Watcher watcher, CreateMode node ) throws KeeperException, InterruptedException {
        zookeeper.create(path, value, ZooDefs.Ids.OPEN_ACL_UNSAFE, node);
    }

    public String getZnodeValue(final String path ) throws KeeperException, InterruptedException {
        byte[] data = zookeeper.getData(path, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                triggerWatch(path);
            }
        }, new Stat());
        oldValue = new String(data);
        return new String(data);
    }

    public boolean triggerWatch (String path) {
        byte[] data = new byte[0];
        try {
            data = zookeeper.getData(path, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    triggerWatch(path);
                }
            }, new Stat());
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        newValue = new String(data);
        if (oldValue.equals(newValue)) {
            System.out.println("on change");
            return false;
        } else {
            System.out.println("oldvalue: " + oldValue + "new value: "  + newValue);
            oldValue = newValue;
            return true;
        }
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //建立
        ZookeeperWatchDemo zookeeperWatchDemo = new ZookeeperWatchDemo();
        ZooKeeper zooKeeper = zookeeperWatchDemo.zkConnect();
        String path = "/amberas";
        String value = "hahahahaha";
        if (zooKeeper.exists(path, false) == null) {
            zookeeperWatchDemo.createZnode(path, value.getBytes(), null, CreateMode.PERSISTENT);
        }

        String znodeValue = zookeeperWatchDemo.getZnodeValue(path);
        System.out.println(znodeValue);

        Thread.sleep(1000 * 60 * 50);
    }
}
ZookeeperWatchDemo
相關文章
相關標籤/搜索