poublic void process(WatchedEvent event);node
WatchedEvent 數據結構:緩存
KeeperState(會話狀態): Disconnected; SyncConnected; AuthFailed; ConnectedReadOnly; SaslAuthenticated; Expired。 EventType(事件類型): NodeCreated; NodeDeleted; NodeDataChanged; NodeChildrenChanged None。 若是事件類型不是None時,返回一個znode路徑。
設置監視點:安全
exists的異步調用的示例代碼:服務器
zk.exists("/myZnode", myWatcher, existsCallback, null); Watcher myWatcher = new Watcher() { public void process(WatchedEvent e) { // Process the watch event } } StatCallback existsCallback = new StatCallback() { public void processResult(int rc, String path, Object ctx, Stat stat) { // Process the result of the exists call } };
任務列表,一個組件須要等待處理的變化狀況:數據結構
一、管理權變化dom
StringCallback masterCreateCallback = new StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { switch (Code.get(rc)) { case CONNECTIONLOSS: checkMaster(); return; case OK: isLeader = true; break; case NODEEXISTS: masterExists(); break; default: isLeader = false; break; } System.out.println("I'm " + (isLeader ? "" : "not ") + "the leader"); } }; void masterExists() { zk.exists("/master", masterExistsWatcher, masterExistsCallback, null); } Watcher masterExistsWatcher = new Watcher() { @Override public void process(WatchedEvent event) { if(event.getType() == EventType.NodeDeleted) { assert "/master".equals(event.getPath()); try { runForMaster(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }; StatCallback masterExistsCallback = new StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { switch (Code.get(rc)) { case CONNECTIONLOSS: masterExists(); break; case OK: if (stat == null) { //state = MasterStates.RUNNING; try { runForMaster(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } break; default: checkMaster(); break; } } };
圖4-1:主節點競選中可能的交錯操做
異步
二、主節點等待從節點列表的變化
【新的從節點加入進來,或舊的從節點退役】ide
經過在ZooKeeper中的/workers下添加子節點來註冊新的從節點。當一個從節點崩潰或從系統中被移除,如會話過時等狀況,須要自動將對應的znode節點刪除。優雅實現的從節點會顯式地關閉其會話,而不須要ZooKeeper等待會話過時。
獲取列表並監視變化的示例代碼:工具
/** workersChangeWatcher爲從節點列表的監視點對象 */ Watcher workersChangeWatcher = new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == EventType.NodeChildrenChanged) { assert "/workers".equals(event.getPath()); getWorkers(); } } }; private void getWorkers() { zk.getChildren("/workers", workersChangeWatcher, workersGetChildrenCallback, null); } ChildrenCallback workersGetChildrenCallback = new ChildrenCallback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children) { switch (Code.get(rc)) { /** 當CONNECTIONLOSS事件發生時,須要從新獲取子節點並設置監視點的操做 */ case CONNECTIONLOSS: getWokerList(); break; case OK: LOG.info("Successfully got a list of workers :" + children.size() + " workers"); /** 從新分配崩潰從節點的任務,並從新設置新的從節點列表 */ reassignAndSet(children); break; default: LOG.error("getChildren failed", KeeperException.create(Code.get(rc), path)); } } private void getWokerList() { // TODO Auto-generated method stub } }; /** 用於保存上次得到的從節點列表的本地緩存 */ ChildrenCache workersCache; void reassignAndSet(List<String> children) { List<String> toProcess; if (workersCache == null) { /** 若是第一次使用本地緩存這個變量,那麼初始化該變量 */ workersCache = new ChildrenCache(children); /** 第一次得到全部從節點時,不須要作什麼其餘事 */ toProcess = null; } else { LOG.info("Removing and setting"); /** 若是不是第一次,那麼須要檢查是否有從節點已經被移除了 */ toProcess = workersCache.removedAndSet(children); } if (toProcess != null) { for (String worker : toProcess) { /** 若是有從節點被移除了,須要從新分配任務 */ getAbsentWorkerTasks(worker); } } }
三、主節點等待新任務進行分配
assignTasks方法爲任務分配的實現:oop
/** 在任務列表變化時,處理通知的監視點實現 */ Watcher tasksChangeWatcher = new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == EventType.NodeChildrenChanged) { assert "/tasks".equals(event.getPath()); getTasks(); } } }; /** 得到任務列表 */ void getTasks() { zk.getChildren("/tasks", tasksChangeWatcher, tasksGetChildrenCallback, null); } ChildrenCallback tasksGetChildrenCallback = new ChildrenCallback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children) { switch (Code.get(rc)) { case CONNECTIONLOSS: /** 當收到子節點變化的通知後,得到子節點的列表 */ getTasks(); break; case OK: if (children != null) { /** 分配列表中的任務 */ assignTasks(children); break; default: LOG.error("getChildren failed.", KeeperException.create(Code.get(rc), path)); break; } } }; void assignTasks(List<String> tasks) { for (String task : tasks) { getTaskData(task); } } void getTaskData(String task) { /** 得到任務信息 */ zk.getData("/tasks/" + task, false, taskDataCallback, task); } DataCallback taskDataCallback = new DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { switch (Code.get(rc)) { case CONNECTIONLOSS: getTaskData((String) ctx); break; case OK: /* * Choose worker at random */ int worker = rand.nextInt(workerList.size()); String designatedWorker = workerList.get(worker); /* * Assign task to randomly chosen worker. */ String assignmentPath = "/assign/" + designatedWorker + "/" + (String) ctx; /** 隨機選擇一個從節點,分配任務給這個從節 */ createAssignment(assignmentPath, data); break; default: LOG.error("Error when trying to get task data.", KeeperException.create(Code.get(rc), path)); break; } } }; void createAssignment(String path, byte[] data) { /** 建立分配節點,路徑形式爲/assign/worker-id/task-num */ zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, assignTaskCallback, data); } StringCallback assignTaskCallback = new StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { switch (Code.get(rc)) { case CONNECTIONLOSS: createAssignment(path, (byte[]) ctx); break; case OK: LOG.info("Task assigned correctly:" + name); /** 刪除/tasks下對應的任務節點 */ deleteTask(name.substring(name.lastIndexOf("/") + 1)); case NODEEXISTS: LOG.warn("Task already assigned"); break; default: LOG.error("Error when trying to assign task.", KeeperException.create(Code.get(rc), path)); break; } } private void deleteTask(String substring) { // TODO Auto-generated method stub } };
四、從節點等待分配新任務
StringCallback createWorkerCallback = new StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { switch (Code.get(rc)) { /** 重試,注意再次註冊不會有問題,由於若是znode節點已經存在,會收到NODEEXISTS事件 */ case CONNECTIONLOSS: register(); break; case OK: LOG.info("Registered successfully:" + serverId); break; case NODEEXISTS: LOG.warn("Already registered:" + serverId); break; default: LOG.error("Something went wrong:" + KeeperException.create(Code.get(rc), path)); break; } } }; /** 經過建立一個znode節點來註冊從節點 */ void register() { zk.create("/workers/worker-" + serverId, "Idle".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, createWorkerCallback, null); } 一旦有任務列表分配給從節點,從節點就會從/assign/worker-id獲取任務信息並執行任務。從節點從本地列表中獲取每一個任務的信息並驗證任務是否還在待執行的隊列中,從節點保存一個本地待執行任務的列表就是爲了這個目的。 注意,爲了釋放回調方法的線程,咱們在單獨的線程對從節點的已分配任務進行循環,不然,會阻塞其餘的回調方法的執行。
示例中,使用了Java的ThreadPoolExecutor類分配一個線程,該線程進行任務的循環操做:
/** 當收到子節點變化的通知後,得到子節點的列表 */ Watcher newTaskWatcher = new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == EventType.NodeChildrenChanged) { assert new String("/assign/worker-" + serverId).equals(event.getPath()); getTasks(); } } }; void getTasks() { zk.getChildren("/assign/worker-" + serverId, newTaskWatcher, tasksGetChildrenCallback, null); } ChildrenCallback tasksGetChildrenCallback = new ChildrenCallback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children) { switch (Code.get(rc)) { case CONNECTIONLOSS: /** 當收到子節點變化的通知後,得到子節點的列表 */ getTasks(); break; case OK: if (children != null) { /** 分配列表中的任務 */ //assignTasks(children); /** 單獨線程中執行 */ executor.execute(new Runnable() { List<String> children; DataCallback cb; private ArrayList<String> noGoingtasks; public Runnable init(List<String> children, DataCallback cb) { this.children = children; this.cb = cb; return this; } @Override public void run() { LOG.info("Looping into tasks"); synchronized (noGoingtasks) { /** 循環子節點列表 */ for (String task : children) { if(!noGoingtasks.contains(task)) { LOG.trace("New task:{}", task); /** 得到任務信息並執行任務 */ zk.getData("assign/worker-" + serverId, false, cb, task); /** 將正在執行的任務添加到執行中列表,防止屢次執行 */ noGoingtasks.add(task); } } } } }.init(children, taskDataCallback)); } break; default: LOG.error("getChildren failed.", KeeperException.create(Code.get(rc), path)); break; } } };
五、客戶端等待任務的執行結果
void submitTask(String task, TaskObject taskCtx) { taskCtx.setTask(task); /** 與以前的ZooKeeper調用不一樣,傳遞了一個上下文對象,該對象爲實現的Task類的實例 */ zk.create("/tasks/task-", task.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, createTaskCallback, taskCtx); } StringCallback createTaskCallback = new StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { switch (Code.get(rc)) { /** 鏈接丟失時,再次提交任務,注意從新提交任務可能會致使任務重複。 */ case CONNECTIONLOSS: submitTask(((TaskObject) ctx).getTask(), (TaskObject) ctx); break; case OK: LOG.info("My created task name: + name"); ((TaskObject) ctx).setTaskName(name); /** 爲這個任務的znode節點設置一個監視點 */ watchStatus("/status/" + name.replace("/tasks/", ""), ctx); break; default: LOG.error("Something went wrong" + KeeperException.create(Code.get(rc), path)); break; } } };
檢查狀態節點是否已經存在(也許任務很快處理完成),並設置監視點。
提供了一個收到znode節點建立的通知時進行處理的監視點的實現和一個exists方法的回調實現:
ConcurrentHashMap<String, Object> ctxMap = new ConcurrentHashMap<String, Object>(); private void watchStatus(String path, Object ctx) { ctxMap.put(path, ctx); /** 客戶端經過該方法傳遞上下對象,當收到狀態節點的通知時,就能夠修改這個表示任務的對象(TaskObject) */ zk.exists(path, statusWatcher, existsCallback, ctx); } Watcher statusWatcher = new Watcher() { @Override public void process(WatchedEvent event) { if(event.getType() == EventType.NodeCreated) { assert event.getPath().contains("/status/task-"); zk.getData(event.getPath(), false, getDataCallback, ctxMap.get(event.getPath())); } } }; StatCallback existsCallback = new StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { switch (Code.get(rc)) { case CONNECTIONLOSS: watchStatus(path, ctx); break; case OK: /** 狀態節點已經存在,所以客戶端獲取這個節點信息 */ if(stat != null) { zk.getData(path, false, getDataCallback, null); } break; /** 若是狀態節點不存在,這是常見狀況,客戶端不進行任何操做 */ case NONODE: break; default: LOG.error("Something went wrong when " + "checking if the status node exists:" + KeeperException.create(Code.get(rc), path)); break; } } };
Multiop能夠原子性地執行多個ZooKeeper的操做,執行過程爲原子性,即在multiop代碼塊中的全部操做要不所有成功,要不所有失敗。
使用multiop特性:
/** 示例 */ /** ①爲delete方法建立Op對象 */ Op deleteZnode(String z) { /** ②經過對應的Op方法返回對象。 */ return Op.delete(z, -1); } ... /** ③以列表方式傳入每一個delete操做的元素執行multi方法 */ List<OpResult> results = zk.multi(Arrays.asList(deleteZnode("/a/b"), deleteZnode("/a"));
調用multi方法返回一個OpResult對象的列表,每一個對象對應每一個操做。例如,對於delete操做,咱們使用DeleteResult類,該類繼承自OpResult,經過每種操做類型對應的結果對象暴露方法和數據。DeleteResult對象僅提供了equals和hashCode方法,而CreateResult對象暴露出操做的路徑(path)和Stat對象。對於錯誤處理,ZooKeeper返回一個包含錯誤碼的ErrorResult類的實例。
multi方法一樣也有異步版本,如下爲同步方法和異步方法的定義:
public List<OpResult> multi(Iterator<Op> ops) throws InterruptedException, KeeperException; public void multi(Iterator<Op> ops, MultiCallback cb, Object ctx);
【Transaction】封裝了multi方法,提供了簡單的接口。咱們能夠建立Transaction對象的實例,添加操做,提交事務。
使用Transaction重寫上一示例的代碼以下:
Transaction t = new Transaction(); t.delete("/a/b", -1); t.delete("/a", -1); List<OpResult> results = t.commit();
【commit】方法一樣也有一個異步版本的方法,該方法以MultiCallback對象和上下文對象爲輸入:
public void commit(MultiCallback cb, Object ctx);
multiop能夠簡化不止一處的主從模式的實現,當分配一個任務,在以前的例子中,主節點會建立任務分配節點,而後刪除/tasks下對應的任務節點。若是在刪除/tasks下的節點時,主節點崩潰,就會致使一個已分配的任務還在/tasks下。使用multiop,能夠原子化建立任務分配節點和刪除/tasks下對應的任務節點這兩個操做。使用這個方式,能夠保證沒有已分配的任務還在/tasks節點下,若是備份節點接管了主節點角色,就不用再區分/tasks下的任務是否是沒有分配的。multiop提供的另外一個功能是檢查一個znode節點的版本,經過multiop能夠同時讀取的多個節點的ZooKeeper狀態並回寫數據——如回寫某些讀取到的數據信息。當被檢查的znode版本號沒有變化時,就能夠經過multiop調用來檢查沒有被修改的znode節點的版本號,這個功能很是有用,如在檢查一個或多個znode節點的版本號取決於另一個znode節點的版本號時。在咱們的主從模式的示例中,主節點須要讓客戶端在主節點指定的路徑下添加新任務,例如,主節點要求客戶端在/task-mid的子節點中添加新任務節點,其中mid爲主節點的標識符,主節點在/master-path節點中保存這個路徑的數據,客戶端在添加新任務前,須要先讀取/master-path的數據,並經過Stat獲取這個節點的版本號信息,而後,客戶端經過multiop的部分調用方式在/task-mid節點下添加新任務節點,同時會檢查/master-path的版本號是否與以前讀取的相匹配。
check方法的定義與setData方法類似,只是沒有data參數:
public static Op check(String path, int version);
若是輸入的path的znode節點的版本號不匹配,multi調用會失敗。
經過如下簡單的示例代碼,來講明如何實現上面所討論的場景:
/** ①獲取/master節點的數據。 */ byte[] masterData = zk.getData("/master-path", false, stat); /** ②從/master節點得到路徑信息。*/ String parent = new String(masterData); ... zk.multi(Arrays.asList(Op.check("/master-path", stat.getVersion()), /** ③兩個操做的multi調用。 */ Op.create(, modify(z1Data),-1),
從應用的角度來看,客戶端每次都是經過訪問ZooKeeper來獲取給定znode節點的數據、一個znode節點的子節點列表或其餘相關的ZooKeeper狀態,這種方式並不可取。
更高效的方式爲客戶端本地緩存數據,並在須要時使用這些數據,一旦這些數據發生變化,你讓
ZooKeeper通知客戶端,客戶端就能夠更新緩存的數據。
另外一種方式,客戶端透明地緩存客戶端訪問的全部ZooKeeper狀態,並在更新緩存數據時將這些數據置爲無效。實現這種緩存一致性的方案代價很是大。
一、寫操做的順序
ZooKeeper狀態會在全部服務端所組成的所有安裝中進行復制。
服務端對狀態變化的順序達成一致,並使用相同的順序執行狀態的更新。
例如,若是一個ZooKeeper的服務端執行了先創建一個/z節點的狀態變化以後再刪除/z節點的狀態變化這個順序的操做,全部的在集合中的服務端均需以相同的順序執行這些變化。
二、讀操做的順序
ZooKeeper客戶端老是會觀察到相同的更新順序,即便它們鏈接到不一樣的服務端上。可是客戶端多是在不一樣時間觀察到了更新,若是他們還在ZooKeeper之外通訊,這種差別就會更加明顯。
圖4-2:隱藏通道問題的例子
爲了不讀取到過去的數據,建議應用程序使用ZooKeeper進行全部涉及ZooKeeper狀態的通訊。
例如,爲了不剛剛描述的場景,c 2 能夠在/z節點設置監視點來代替從c 1 直接接收消息,經過監視點,c 2就能夠知道/z節點的變化,從而消除隱藏通道的問題。
三、通知的順序
ZooKeeper對通知的排序涉及其餘通知和異步響應,以及對系統狀態更新的順序。如ZooKeeper對兩個狀態更新進行排序,u和u',u'緊隨u以後,若是u和u'分別修改了/a節點和/b節點,其中客戶端c在/a節點設置了監視點,c只能觀察到u'的更新,即接收到u所對應通知後讀取/b節點。這種順序可使應用經過監視點實現安全的參數配置。假設一個znode節點/z被建立或刪除表示在ZooKeeper中保存的一些配置信息變爲無效的。在對這個配置進行任何實際更新以前,將建立或刪除的通知發給客戶端,這一保障很是重要,能夠確保客戶端不會讀取到任何無效配置。
更具體一些,假如咱們有一個znode節點/config,其子節點包含應用配置元數據:/config/m1,/config/m2,,/config/m_n。目的只是爲了說明這個例子,無論這些znode節點的實際內容是什麼。假如主節點應用進程經過setData更新每一個znode節點,且不能讓客戶端只讀取到部分更新,一個解決方案就是在開始更新這些配置前主節點先建立一個/config/invalid節點,其餘須要讀取這一狀態的客戶端會監視/config/invalid節點,若是該節點存在就不會讀取配置狀態,當該節點被刪除,就意味着有一個新的有效的配置節點集合可用,客戶端能夠進行讀取該集合的操做。
對於這個具體的例子,咱們還可使用multiop來對/config/m[1-n]這些節點原子地執行全部setData操做,而不是使用一個znode節點來標識部分修改的狀態。在例子中的原子性問題,咱們可使用multiop代替對額外znode節點或通知的依賴,不過通知機制很是通用,並且並未約束爲原子性的。
由於ZooKeeper根據觸發通知的狀態更新對通知消息進行排序,客戶端就能夠經過這些通知感知到真正的狀態變化的順序。
注意:活性與安全性
在本章中,因活性普遍使用了通知機制。活性(liveness)會確保系統最終取得進展。新任務和新的從節點的通知只是關於活性的事件的例子。若是主節點沒有對新任務進行通知,這個任務就永遠不會被執行,至少從提交任務的客戶端的視角來看,已提交的任務沒有執行會致使活性缺失。原子更新一組配置節點的例子中,狀況不太同樣:這個例子涉及安全性,而不是活性。在更新中讀取znode節點可能會致使客戶端到非一致性配置信息,而invalid節點能夠確保只有當合法配置信息有效時,客戶端纔讀取正確狀態。
在咱們看到的關於活性的例子中,通知的傳送順序並非特別重要,只要最終客戶端最終獲知這些事件就能夠繼續取得進展。不過爲了安全性,不按順序接收通知也許會致使不正確的行爲。
避免在一個特定節點設置大量的監視點,最好是每次在特定的znode節點上,只有少許的客戶端設置監視點,理想狀況下最多隻設置一個。
這樣,每一個節點上設置的監視點只有最多一個客戶端
根據YourKit( http://www.yourkit.com/ )的分析工具所分析,設置一個監視點會使服務端的監視點管理器的內存消耗上增長大約250到300個字節,設置很是多的監視點意味着監視點管理器會消耗大量的服務器內存