爲了向你介紹ZooKeeper Java API,咱們開發了一個很是簡單的監視器客戶端。ZooKeeper客戶端監視一個ZooKeeper節點的改變而且經過開始和中止一個程序來做出響應。html
客戶端有四個必備條件:java
按照慣例,ZooKeeper應用被分爲兩部分,一部分維護鏈接,另外一部分監視數據。在這個應用中,Executor的類維護ZooKeeper鏈接,DataMonitor的類監視ZooKeeper樹中的數據。同時,Executor包含主線程和執行邏輯,它負責不多的用戶交互,和你做爲參數傳進去的可執行程序的交互,還有那一個例子關閉和重啓,根據znode的狀態。node
Executor對象是這個例子程序的主要容器。它包含ZooKeeper對象,DataMonitor,就像上面程序設計描述的那樣。apache
按 Ctrl+C 複製代碼session
Executor的回調工做是開始和中止可執行文件,這個可執行文件的名字是你從命令行的傳過來的。它作這個是爲了響應被ZooKeeper對象觸發的事件。就像你在上面的代碼看到的那樣,Executor傳入一個引入給他本身做爲ZooKeeper構造函數的Watcher 參數。異步
它也傳入一個引用給它本身做爲DataMonitor的構造器的DataMonitorListener參數。每個的Executor的定義,它實現了這些接口:async
public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener { ...
Watcher接口被ZooKeeper Java API定義。ZooKeeper使用它和它的容器通訊。它只支持一個方法,process(),而且ZooKeeper使用它和主線程感興趣的事件通訊,例如ZooKeeper通訊或者ZooKeeper會話的狀態。這個例子中的Executor只是簡單的向下推送這些事件 給DataMonitor來決定怎麼處理它。它這樣作只是爲了說明這一點,依照慣例,Executor或像Executor的對象"擁有"ZooKeeper鏈接,可是它能夠自由地把事件委託給其它對象。它也使用這個做爲監聽事件觸發的通道。函數
public void process(WatchedEvent event) { dm.process(event); }
另外一方面DataMonitorListener接口,不是ZooKeeper API的一部分,只是爲了這個應用例子而設計的。DataMonitor對象使用它和它的容器通訊,也就是Executor對象。DataMonitorListener接口就像這樣:this
public interface DataMonitorListener { /** * The existence status of the node has changed. */ void exists(byte data[]); /** * The ZooKeeper session is no longer valid. * * @param rc * the ZooKeeper reason code */ void closing(int rc); }
這個接口被定義在DataMonitor類而且在Executor類中實現。當Executor.exists()被調用,Executor決定是否啓動或關閉每個請求。當znode不存在的時候再次調用須要殺掉可執行程序。spa
當Executor.closing()被調用,Executor決定是否關掉它本身來響應ZooKeeper鏈接永久消失。
正如你可能已經猜到的。DataMonitor是調用這些方法的對象,來響應ZooKeeper的狀態改變。
下面是Executor的DataMonitorListener.exists()和DataMonitorListener.closing的實現:
public void exists( byte[] data ) { if (data == null) { if (child != null) { System.out.println("Killing process"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { } } child = null; } else { if (child != null) { System.out.println("Stopping child"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { e.printStackTrace(); } } try { FileOutputStream fos = new FileOutputStream(filename); fos.write(data); fos.close(); } catch (IOException e) { e.printStackTrace(); } try { System.out.println("Starting child"); child = Runtime.getRuntime().exec(exec); new StreamWriter(child.getInputStream(), System.out); new StreamWriter(child.getErrorStream(), System.err); } catch (IOException e) { e.printStackTrace(); } } } public void closing(int rc) { synchronized (this) { notifyAll(); } }
DataMonitor 類
The DataMonitor class has the meat of the ZooKeeper logic(這句咋翻譯)。它幾乎上是異步和事件驅動。 DataMonitor kicks things off in the constructor with:
public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher, DataMonitorListener listener) { this.zk = zk; this.znode = znode; this.chainedWatcher = chainedWatcher; this.listener = listener; // Get things started by checking if the node exists. We are going // to be completely event driven zk.exists(znode, true, this, null); }
調用ZooKeeper.exists()來檢查znode是否存在,設置一個監視器,而且傳遞它本身的引用給它本身做爲完成回調的對象,在這個意義上,it kicks things off,由於真實的處理在監視器被觸發的時候發生。
注意
不要把完成回調和監視回調弄混淆了。ZooKeeper.exists()完成回調,它發生在DataMonitor對象的StatCallback.processResult()方法實現中,當異步的監視器set操做(被ZooKeeper.exists())在服務端完成的時候被調用。
另外一方面,監視器的觸發,發送一個事件給Executor對象,由於Executor做爲ZooKeeper對象的監視器被註冊。
此外,你可能注意到DataMonitor也能句註冊它自已做爲這個特定監視器事件的監聽者。這是ZooKeeper3.0.0的新特性(支持多個監聽者)。然而在這例子中,DataMonitor沒有把它本身註冊爲監視器。
在ZooKeeper.exists()操做在服務端完成,ZooKeeper API在客戶端調用這個完成回調函數:
public void processResult(int rc, String path, Object ctx, Stat stat) { boolean exists; switch (rc) { case Code.Ok: exists = true; break; case Code.NoNode: exists = false; break; case Code.SessionExpired: case Code.NoAuth: dead = true; listener.closing(rc); return; default: // Retry errors zk.exists(znode, true, this, null); return; } byte b[] = null; if (exists) { try { b = zk.getData(znode, false, null); } catch (KeeperException e) { // We don't need to worry about recovering now. The watch // callbacks will kick off any exception handling e.printStackTrace(); } catch (InterruptedException e) { return; } } if ((b == null && b != prevData) || (b != null && !Arrays.equals(prevData, b))) { listener.exists(b); prevData = b; } }
這個代碼首先檢查Znode 是否存在,致命錯誤,和可恢復的錯誤。若是文件(或znode)存在,它從znode獲取數據,而且若是狀態已經改變它調用Executor的exists()回調函數。注意,它不須要爲getData調用作任何Exception處理由於它有任何可能致使錯誤的監視器:若是在ZooKeeper.getData()方法以前這個節點被刪除,經過ZooKeeper.exists()設置的監聽事件會觸發一個回調;若是有通訊錯誤,當鏈接回來的時候會觸發一個鏈接監聽事件。
最後,注意DataMonitor是怎麼處理監聽事件的:
public void process(WatchedEvent event) { String path = event.getPath(); if (event.getType() == Event.EventType.None) { // We are are being told that the state of the // connection has changed switch (event.getState()) { case SyncConnected: // In this particular example we don't need to do anything // here - watches are automatically re-registered with // server and any watches triggered while the client was // disconnected will be delivered (in order of course) break; case Expired: // It's all over dead = true; listener.closing(KeeperException.Code.SessionExpired); break; } } else { if (path != null && path.equals(znode)) { // Something has changed on the node, let's find out zk.exists(znode, true, this, null); } } if (chainedWatcher != null) { chainedWatcher.process(event); } }
若是客戶端ZooKeeper庫在會話到期(到期事件)以前能夠和ZooKeeper從新創建通訊通道(同步鏈接事件)全部會話的監視器將自動地在服務端被創建(自動重置監視器是ZooKeeper 3.0.0的新特性)。關於這點的更多信息能夠參考ZooKeeper Watches。
再深刻這個方法一些,當DataMonitor等到一個znode的事件,它調用ZooKeeper.exists()來查找什麼被改變了。
Executor.java
/** * A simple example program to use DataMonitor to start and * stop executables based on a znode. The program watches the * specified znode and saves the data that corresponds to the * znode in the filesystem. It also starts the specified program * with the specified arguments when the znode exists and kills * the program if the znode goes away. */ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener { String znode; DataMonitor dm; ZooKeeper zk; String filename; String exec[]; Process child; public Executor(String hostPort, String znode, String filename, String exec[]) throws KeeperException, IOException { this.filename = filename; this.exec = exec; zk = new ZooKeeper(hostPort, 3000, this); dm = new DataMonitor(zk, znode, null, this); } /** * @param args */ public static void main(String[] args) { if (args.length < 4) { System.err .println("USAGE: Executor hostPort znode filename program [args ...]"); System.exit(2); } String hostPort = args[0]; String znode = args[1]; String filename = args[2]; String exec[] = new String[args.length - 3]; System.arraycopy(args, 3, exec, 0, exec.length); try { new Executor(hostPort, znode, filename, exec).run(); } catch (Exception e) { e.printStackTrace(); } } /*************************************************************************** * We do process any events ourselves, we just need to forward them on. * * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent) */ public void process(WatchedEvent event) { dm.process(event); } public void run() { try { synchronized (this) { while (!dm.dead) { wait(); } } } catch (InterruptedException e) { } } public void closing(int rc) { synchronized (this) { notifyAll(); } } static class StreamWriter extends Thread { OutputStream os; InputStream is; StreamWriter(InputStream is, OutputStream os) { this.is = is; this.os = os; start(); } public void run() { byte b[] = new byte[80]; int rc; try { while ((rc = is.read(b)) > 0) { os.write(b, 0, rc); } } catch (IOException e) { } } } public void exists(byte[] data) { if (data == null) { if (child != null) { System.out.println("Killing process"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { } } child = null; } else { if (child != null) { System.out.println("Stopping child"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { e.printStackTrace(); } } try { FileOutputStream fos = new FileOutputStream(filename); fos.write(data); fos.close(); } catch (IOException e) { e.printStackTrace(); } try { System.out.println("Starting child"); child = Runtime.getRuntime().exec(exec); new StreamWriter(child.getInputStream(), System.out); new StreamWriter(child.getErrorStream(), System.err); } catch (IOException e) { e.printStackTrace(); } } } }
DataMonitor.java
/** * A simple class that monitors the data and existence of a ZooKeeper * node. It uses asynchronous ZooKeeper APIs. */ import java.util.Arrays; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.data.Stat; public class DataMonitor implements Watcher, StatCallback { ZooKeeper zk; String znode; Watcher chainedWatcher; boolean dead; DataMonitorListener listener; byte prevData[]; public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher, DataMonitorListener listener) { this.zk = zk; this.znode = znode; this.chainedWatcher = chainedWatcher; this.listener = listener; // Get things started by checking if the node exists. We are going // to be completely event driven zk.exists(znode, true, this, null); } /** * Other classes use the DataMonitor by implementing this method */ public interface DataMonitorListener { /** * The existence status of the node has changed. */ void exists(byte data[]); /** * The ZooKeeper session is no longer valid. * * @param rc * the ZooKeeper reason code */ void closing(int rc); } public void process(WatchedEvent event) { String path = event.getPath(); if (event.getType() == Event.EventType.None) { // We are are being told that the state of the // connection has changed switch (event.getState()) { case SyncConnected: // In this particular example we don't need to do anything // here - watches are automatically re-registered with // server and any watches triggered while the client was // disconnected will be delivered (in order of course) break; case Expired: // It's all over dead = true; listener.closing(KeeperException.Code.SessionExpired); break; } } else { if (path != null && path.equals(znode)) { // Something has changed on the node, let's find out zk.exists(znode, true, this, null); } } if (chainedWatcher != null) { chainedWatcher.process(event); } } public void processResult(int rc, String path, Object ctx, Stat stat) { boolean exists; switch (rc) { case Code.Ok: exists = true; break; case Code.NoNode: exists = false; break; case Code.SessionExpired: case Code.NoAuth: dead = true; listener.closing(rc); return; default: // Retry errors zk.exists(znode, true, this, null); return; } byte b[] = null; if (exists) { try { b = zk.getData(znode, false, null); } catch (KeeperException e) { // We don't need to worry about recovering now. The watch // callbacks will kick off any exception handling e.printStackTrace(); } catch (InterruptedException e) { return; } } if ((b == null && b != prevData) || (b != null && !Arrays.equals(prevData, b))) { listener.exists(b); prevData = b; } } }