zookeeper系列(一)zookeeper基礎
zookeeper系列(二)實戰master選舉
zookeeper系列(三)實戰數據發佈訂閱
zookeeper系列(四)實戰負載均衡
zookeeper系列(五)實戰分佈式鎖
zookeeper系列(六)實戰分佈式隊列
zookeeper系列(七)實戰分佈式命名服務
zookeeper系列(八)zookeeper運維java
準備3臺服務器:
n1:192.168.1.101
n2:192.168.1.102
n3:192.168.1.103node
cd /opt wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.10.tar.gz tar -zxvf zookeeper-3.4.10.tar.gz mv zookeeper-3.4.10 zookeeper
cd zookeeper/conf cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
dataDir=/var/zookeeper # 修改zookeeper的數據目錄 server.1=192.168.1.101:2888:3888 # 指定集羣節點 server.2=192.168.1.102:2888:3888 # 指定集羣節點 server.3=192.168.1.103:2888:3888 # 指定集羣節點
建立zookeeper數據目錄和myid文件:apache
cd /var mkdir zookeeper cd zookeeper vi myid
myid裏鍵入各自在配置文件中的服務器編號便可。segmentfault
cd /opt/zookeeper/conf ./zkServer.sh start
telnet 192.168.1.101 2181
stat獲取zookeepr狀態服務器
stat
輸出:This ZooKeeper instance is not currently serving requestssession
再啓動第二臺,超過1/2的zookeeper集羣節點正常工做後,zookeeper就能夠提供服務了。負載均衡
從新telnet後用stat查看n1:運維
Latency min/avg/max: 0/0/0 Received: 2 Sent: 1 Connections: 1 Outstanding: 0 Zxid: 0x0 Mode: follower Node count: 4
查看n2:maven
Latency min/avg/max: 0/0/0 Received: 2 Sent: 1 Connections: 1 Outstanding: 0 Zxid: 0x100000000 Mode: leader Node count: 4
能夠看到n1是follower角色,n2是leader角色。分佈式
cd /opt/zookeeper/bin ./zkCli.sh -timeout 5000 -server 192.168.1.101:2181
輸入h顯示客戶端可用命令:
h
ZooKeeper -server host:port cmd args stat path [watch] set path data [version] ls path [watch] delquota [-n|-b] path ls2 path [watch] setAcl path acl setquota -n|-b val path history redo cmdno printwatches on|off delete path [version] sync path listquota path rmr path get path [watch] create [-s] [-e] path data acl addauth scheme auth quit getAcl path close connect host:port
引入maven依賴
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.5</version> </dependency>
public class CreateSession { public static void main(String[] args) { ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer()); System.out.println("conneted ok!"); } }
public class CreateNode { public static void main(String[] args) { ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer()); System.out.println("conneted ok!"); User u = new User(); u.setId(1); u.setName("test"); String path = zc.create("/jike5", u, CreateMode.PERSISTENT); System.out.println("created path:"+path); } }
public class GetData { public static void main(String[] args) { ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer()); System.out.println("conneted ok!"); Stat stat = new Stat(); User u = zc.readData("/jike5",stat); System.out.println(u.toString()); System.out.println(stat); } }
public class GetChild { public static void main(String[] args) { ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer()); System.out.println("conneted ok!"); List<String> cList = zc.getChildren("/jike5"); System.out.println(cList.toString()); } }
public class NodeExists { public static void main(String[] args) { ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer()); System.out.println("conneted ok!"); boolean e = zc.exists("/jike5"); System.out.println(e); } }
public class NodeDel { public static void main(String[] args) { ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer()); System.out.println("conneted ok!"); boolean e1 = zc.delete("/jike5"); boolean e2 = zc.deleteRecursive("/jike5"); } }
public class WriteData { public static void main(String[] args) { ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer()); System.out.println("conneted ok!"); User u = new User(); u.setId(2); u.setName("test2"); zc.writeData("/jike5", u, 1); } }
public class SubscribeChildChanges { private static class ZkChildListener implements IZkChildListener{ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { System.out.println(parentPath); System.out.println(currentChilds.toString()); } } public static void main(String[] args) throws InterruptedException { ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer()); System.out.println("conneted ok!"); // 除子節點變化外,節點自己建立和刪除也會收到通知 zc.subscribeChildChanges("/jike20", new ZkChildListener()); Thread.sleep(Integer.MAX_VALUE); } }
public class SubscribeDataChanges { private static class ZkDataListener implements IZkDataListener{ public void handleDataChange(String dataPath, Object data) throws Exception { System.out.println(dataPath+":"+data.toString()); } public void handleDataDeleted(String dataPath) throws Exception { System.out.println(dataPath); } } public static void main(String[] args) throws InterruptedException { ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new BytesPushThroughSerializer()); System.out.println("conneted ok!"); zc.subscribeDataChanges("/jike20", new ZkDataListener()); Thread.sleep(Integer.MAX_VALUE); } }
引入maven依賴
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.8.0</version> </dependency>
public class CreateSession { public static void main(String[] args) throws InterruptedException { //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); //RetryPolicy retryPolicy = new RetryNTimes(5, 1000); RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000); // CuratorFramework client = CuratorFrameworkFactory // .newClient("192.168.1.105:2181",5000,5000, retryPolicy); CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); Thread.sleep(Integer.MAX_VALUE); } }
public class CreateNode { public static void main(String[] args) throws Exception { //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); //RetryPolicy retryPolicy = new RetryNTimes(5, 1000); RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000); // CuratorFramework client = CuratorFrameworkFactory // .newClient("192.168.1.105:2181",5000,5000, retryPolicy); CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); String path = client.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .forPath("/jike/1","123".getBytes()); System.out.println(path); Thread.sleep(Integer.MAX_VALUE); } }
public class DelNode { public static void main(String[] args) throws Exception { //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); //RetryPolicy retryPolicy = new RetryNTimes(5, 1000); RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000); // CuratorFramework client = CuratorFrameworkFactory // .newClient("192.168.1.105:2181",5000,5000, retryPolicy); CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath("/jike20"); Thread.sleep(Integer.MAX_VALUE); } }
public class GetChildren { public static void main(String[] args) throws Exception { //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); //RetryPolicy retryPolicy = new RetryNTimes(5, 1000); RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000); // CuratorFramework client = CuratorFrameworkFactory // .newClient("192.168.1.105:2181",5000,5000, retryPolicy); CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); List<String> cList = client.getChildren().forPath("/jike20"); System.out.println(cList.toString()); } }
public class GetData { public static void main(String[] args) throws Exception { //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); //RetryPolicy retryPolicy = new RetryNTimes(5, 1000); RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000); // CuratorFramework client = CuratorFrameworkFactory // .newClient("192.168.1.105:2181",5000,5000, retryPolicy); CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); Stat stat = new Stat(); byte[] ret = client.getData().storingStatIn(stat).forPath("/jike"); System.out.println(new String(ret)); System.out.println(stat); } }
public class UpdateData { public static void main(String[] args) throws Exception { //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); //RetryPolicy retryPolicy = new RetryNTimes(5, 1000); RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000); // CuratorFramework client = CuratorFrameworkFactory // .newClient("192.168.1.105:2181",5000,5000, retryPolicy); CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath("/jike"); client.setData().withVersion(stat.getVersion()).forPath("/jike", "123".getBytes()); } }
public class checkexists { public static void main(String[] args) throws Exception { ExecutorService es = Executors.newFixedThreadPool(5); //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); //RetryPolicy retryPolicy = new RetryNTimes(5, 1000); RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000); // CuratorFramework client = CuratorFrameworkFactory // .newClient("192.168.1.105:2181",5000,5000, retryPolicy); CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); // Stat s = client.checkExists().forPath("/jike"); client.checkExists().inBackground(new BackgroundCallback() { public void processResult(CuratorFramework arg0, CuratorEvent arg1) throws Exception { Stat stat = arg1.getStat(); System.out.println(stat); System.out.println(arg1.getContext()); } },"123",es).forPath("/jike"); Thread.sleep(Integer.MAX_VALUE); } }
public class NodeListener { public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000); CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); final NodeCache cache = new NodeCache(client,"/jike"); cache.start(); cache.getListenable().addListener(new NodeCacheListener() { public void nodeChanged() throws Exception { byte[] ret = cache.getCurrentData().getData(); System.out.println("new data:"+new String(ret)); } }); Thread.sleep(Integer.MAX_VALUE); } }
public class NodeChildrenListener { public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000); CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); final PathChildrenCache cache = new PathChildrenCache(client,"/jike",true); cache.start(); cache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED:"+event.getData()); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED:"+event.getData()); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED:"+event.getData()); break; default: break; } } }); Thread.sleep(Integer.MAX_VALUE); } }
public class CreateNodeAuth { public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000); CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); ACL aclIp = new ACL(Perms.READ,new Id("ip","192.168.1.105")); ACL aclDigest = new ACL(Perms.READ|Perms.WRITE,new Id("digest",DigestAuthenticationProvider.generateDigest("jike:123456"))); ArrayList<ACL> acls = new ArrayList<ACL>(); acls.add(aclDigest); acls.add(aclIp); String path = client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(acls) .forPath("/jike/3","123".getBytes()); System.out.println(path); Thread.sleep(Integer.MAX_VALUE); } }
zookeeper系列(一)zookeeper基礎
zookeeper系列(二)實戰master選舉
zookeeper系列(三)實戰數據發佈訂閱
zookeeper系列(四)實戰負載均衡
zookeeper系列(五)實戰分佈式鎖
zookeeper系列(六)實戰分佈式隊列
zookeeper系列(七)實戰分佈式命名服務
zookeeper系列(八)zookeeper運維