zkCli 工具適用於調試,不推薦使用 zkCli 工具來搭建系統。java
實際開發時通常也不直接使用 ZooKeeper 的 Java API,而是使用更高層次的封裝庫 Curator,不過對 Java API 的學習仍然有不少益處。算法
本篇文章介紹經過 ZooKeeper 的 Java API 來實現建立會話、實現監視點等功能,演示主從模式。apache
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>
複製代碼
啓動 ZooKeeper 服務端,經過 Java API 創建會話。編程
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
複製代碼
其中 connectString 包含主機名及端口號,sessionTimeout 爲會話超時時間,watcher 對象用於接收會話事件。bootstrap
Watcher 爲一個接口,實現 Watcher 須要重寫 void process(WatchedEvent event)
方法。網絡
當遇到網絡故障時,若是鏈接斷開,ZooKeeper 客戶端會自動從新鏈接。session
下面經過 ZooKeeper Java API 來實現簡單的羣首選舉算法,確保同一時間只有一個主節點進程處於活動狀態。dom
package com.ulyssesss.zookeeper;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Random;
public class Master implements Watcher {
private ZooKeeper zk;
private String serviceId = Integer.toString(new Random().nextInt());
private boolean isLeader = false;
private void startZk() throws IOException {
zk = new ZooKeeper("localhost:2181", 5000, this);
}
private void stopZk() throws InterruptedException {
zk.close();
}
public void process(WatchedEvent watchedEvent) {
System.out.println("event: " + watchedEvent);
}
public static void main(String[] args) throws Exception {
Master master = new Master();
master.startZk();
master.runForMaster();
System.out.println("serviceId: " + master.serviceId);
if (master.isLeader) {
System.out.println("master");
Thread.sleep(10000);
} else {
System.out.println("not master");
}
master.stopZk();
}
private boolean checkMaster() throws InterruptedException {
while (true) {
try {
Stat stat = new Stat();
byte data[] = zk.getData("/master", false, stat);
isLeader = new String(data).equals(serviceId);
return true;
} catch (KeeperException.NoNodeException e) {
return false;
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
private void runForMaster() throws InterruptedException {
while (true) {
try {
zk.create("/master", serviceId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
isLeader = true;
break;
} catch (KeeperException.NodeExistsException e) {
isLeader = false;
break;
} catch (KeeperException e) {
e.printStackTrace();
}
if (checkMaster()) {
break;
}
}
}
}
複製代碼
主函數執行建立一個演示實例,實例會分配一個隨機整數做爲 id,創建 ZooKeeper 鏈接後嘗試建立主節點 master。異步
若是 master 主節點建立成功,則該實例爲羣首 leader;若是節點已經存在則其餘實例爲 leader;發生斷開鏈接等異常時,響應信息丟失,沒法肯定當前進程是否爲主節點,須要經過 checkMaster() 方法從新檢查主節點狀態。ide
屢次執行主函數,其中第一次執行時會打印 master,在 master 斷開鏈接前的 10 秒鐘內,再次執行會打印 not master,當第一次執行的 master 斷開鏈接後,再次執行主函數,打印 master。
在 ZooKeeper 中全部的同步操做都有對應的異步操做,異步調用不會阻塞應用程序,還能簡化應用的實現方式。
主從模型的設計須要用到 /tasks、/assign 和 /workers 3 個目錄,能夠經過某些系統配置來建立這些目錄。下面的代碼示例會經過異步的方式建立出所須要的目錄。
private void bootstrap() {
createParent("/workers", new byte[0]);
createParent("/assign", new byte[0]);
createParent("/tasks", new byte[0]);
}
private void createParent(String path, byte[] data) {
zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createParentCallback, data);
}
AsyncCallback.StringCallback createParentCallback = new AsyncCallback.StringCallback() {
public void processResult(int rc, String path, Object ctx, String name) {
switch (KeeperException.Code.get(rc)) {
case OK:
System.out.println("parent " + path + " created");
break;
case NODEEXISTS:
System.out.println("parent " + path + " already registered");
break;
case CONNECTIONLOSS:
createParent(path, (byte[]) ctx);
break;
default:
System.out.println("create " + path + " error");
}
}
};
複製代碼
前面的部分已經有了主節點,爲了使主節點能夠發號施令,如今要配置從節點,在 /workers 下建立臨時節點。
package com.ulyssesss.zookeeper;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.Random;
public class Worker implements Watcher {
private ZooKeeper zk;
private String serviceId = Integer.toString(new Random().nextInt());
private void startZk() throws IOException {
zk = new ZooKeeper("localhost:2181", 5000, this);
}
@Override
public void process(WatchedEvent event) {
System.out.println("event: " + event);
}
private void register() {
zk.create("/workers/worker-" + serviceId, "Idle".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE
, CreateMode.EPHEMERAL, createWorkerCallback, null);
}
private AsyncCallback.StringCallback createWorkerCallback = new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
switch (KeeperException.Code.get(rc)) {
case OK:
System.out.println("registered successfully: " + serviceId);
break;
case NODEEXISTS:
System.out.println("already registered: " + serviceId);
break;
case CONNECTIONLOSS:
register();
break;
default:
System.out.println("error");
}
}
};
private String status;
public void setStatus(String status) {
this.status = status;
updateStatus(status);
}
synchronized private void updateStatus(String status) {
if (status.equals(this.status)) {
zk.setData("/workers/worker-" + serviceId, status.getBytes(), -1, statusUpdateCallback, status);
}
}
AsyncCallback.StatCallback statusUpdateCallback = new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
switch (KeeperException.Code.get(rc)) {
case CONNECTIONLOSS:
updateStatus((String) ctx);
break;
default:
}
}
};
public static void main(String[] args) throws Exception {
Worker worker = new Worker();
worker.startZk();
worker.register();
Thread.sleep(30000);
}
}
複製代碼
主函數建立 worker 實例,開啓會話,執行註冊邏輯,建立節點時如發生鏈接丟失則再次執行註冊邏輯,註冊所建立的節點爲臨時節點。
從節點開始處理某些任務時,須要經過 setStatus() 方法更新節點狀態。
系統中 client 組件用於添加任務,以便從節點執行任務。如下爲 client 代碼:
package com.ulyssesss.zookeeper;
import org.apache.zookeeper.*;
import java.io.IOException;
public class Client implements Watcher {
private ZooKeeper zk;
private void startZk() throws IOException {
zk = new ZooKeeper("localhost:2181", 5000, this);
}
@Override
public void process(WatchedEvent event) {
System.out.println("event: " + event);
}
private String queueCommand(String command) {
while (true) {
try {
String name = zk.create("/tasks/task-", command.getBytes()
, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
return name;
} catch (Exception e) {
System.out.println("error");
}
}
}
public static void main(String[] args) throws Exception {
Client client = new Client();
client.startZk();
String name = client.queueCommand("command-1");
System.out.println("created " + name);
}
}
複製代碼
Client 使用有序節點 task- 標示任務,task- 後面會跟隨一個遞增整數,在執行 create 時如發生鏈接丟失,則重試 create 操做,適用於【至少執行一次】策略的應用。如要採用【至多執行一次】策略,能夠將任務的惟一標識添加到節點名中。
管理客戶端 AdminClient 用於展現系統運行狀態,代碼以下:
package com.ulyssesss.zookeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Date;
public class AdminClient implements Watcher {
private ZooKeeper zk;
private void startZk() throws IOException {
zk = new ZooKeeper("localhost:2181", 5000, this);
}
@Override
public void process(WatchedEvent event) {
System.out.println("event: " + event);
}
private void listState() throws KeeperException, InterruptedException {
try {
Stat stat = new Stat();
byte[] masterData = zk.getData("/master", false, stat);
Date startDate = new Date(stat.getCtime());
System.out.println("master: " + new String(masterData) + " since " + startDate);
} catch (KeeperException.NoNodeException e) {
System.out.println("no master");
}
System.out.println("workers: \n");
for (String worker : zk.getChildren("/workers", false)) {
byte[] data = zk.getData("/workers/" + worker, false, null);
String state = new String(data);
System.out.println("worker: " + state);
}
// ...
}
public static void main(String[] args) throws Exception {
AdminClient adminClient = new AdminClient();
adminClient.startZk();
adminClient.listState();
}
}
複製代碼
以上代碼會簡單的列出各個節點的信息。
經過 Java API 編程與 zkCli 命令很是接近,不一樣的是 zkCli 經常使用於調試,通常會在一個相對穩定的環境下使用。經過 Java API 編寫的程序,須要考慮到異常狀況,尤爲是 ConnectionLossException 異常,須要檢查狀態併合理恢復。