目錄html
瘋狂創客圈 Java 分佈式聊天室【 億級流量】實戰系列之 -25【 博客園 總入口 】java
你們好,我是做者尼恩。目前和幾個小夥伴一塊兒,組織了一個高併發的實戰社羣【瘋狂創客圈】。正在開始高併發、億級流程的 IM 聊天程序 學習和實戰node
前面,已經完成一個高性能的 Java 聊天程序的四件大事:面試
接下來,須要進入到分佈式開發的環節了。 分佈式的中間件,瘋狂創客圈的小夥伴們,一致的選擇了zookeeper,不只僅是因爲其在大數據領域,太有名了。更重要的是,不少的著名框架,都使用了zk。apache
本篇介紹 ZK Curator 的事件監聽。緩存
Curator 事件有兩種模式,一種是標準的觀察模式,一種是緩存監聽模式。標準的監聽模式是使用Watcher 監聽器。第二種緩存監聽模式引入了一種本地緩存視圖的Cache機制,來實現對Zookeeper服務端事件監聽。服務器
Cache事件監聽能夠理解爲一個本地緩存視圖與遠程Zookeeper視圖的對比過程。Cache提供了反覆註冊的功能。Cache是一種緩存機制,能夠藉助Cache實現監聽。簡單來講,Cache在客戶端緩存了znode的各類狀態,當感知到zk集羣的znode狀態變化,會觸發event事件,註冊的監聽器會處理這些事件。網絡
Watcher 監聽器比較簡單,只有一種。Cache事件監聽的種類有3種Path Cache,Node Cache,Tree Cache。併發
在ZooKeeper中,接口類Watcher用於表示一個標準的事件處理器,其定義了事件通知相關的邏輯,包含KeeperState和EventType兩個枚舉類,分別表明了通知狀態和事件類型。框架
Watcher接口定義了事件的回調方法:process(WatchedEvent event)。定義一個Watcher的實例很簡單,代碼以下:
Watcher w = new Watcher() { @Override public void process(WatchedEvent watchedEvent) { log.info("監聽器watchedEvent:" + watchedEvent); } };
使用Watcher監聽器實例的方式也很簡單,在Curator的調用鏈上,加上usingWatcher方法便可,代碼以下:
byte[] content = client.getData() .usingWatcher(w).forPath(workerPath);
一個Watcher監聽器在向服務端完成註冊後,當服務端的一些事件觸發了這個Watcher,那麼就會向指定客戶端發送一個事件通知,來實現分佈式的通知功能。客戶收到服務器的通知後,Curator 會封裝一個WatchedEvent 事件實例,傳遞給監聽器的回調方法process(WatchedEvent event)。
WatchedEvent包含了三個基本屬性:
(1)通知狀態(keeperState)
(2)事件類型(EventType)
(3)節點路徑(path)
注意,WatchedEvent並非直接從ZooKeeper集羣直接傳遞過來的事件實例,而是Curator 封裝過的事件實例。WatchedEvent類型沒有實現序列化接口java.io.Serializable,所以不能用於網絡傳輸。ZooKeeper集羣直接網絡傳輸傳遞過來的事件實例是啥呢? 是一個WatcherEvent類型的實例,這個傳輸實例和Curator 封裝過的WatchedEvent實例,在名稱上有一個字母之差,並且功能也是同樣的,都表示的是同一個事物,都是對一個服務端事件的封裝。
所以,這裏只講Curator 封裝過的WatchedEvent實例。下邊列舉了ZooKeeper中最多見的幾個通知狀態和事件類型。
KeeperState | EventType | 觸發條件 | 說明 |
---|---|---|---|
None (-1) | 客戶端與服務端成功創建鏈接 | ||
SyncConnected (0) | NodeCreated (1) | Watcher監聽的對應數據節點被建立 | |
NodeDeleted (2) | Watcher監聽的對應數據節點被刪除 | 此時客戶端和服務器處於鏈接狀態 | |
NodeDataChanged (3) | Watcher監聽的對應數據節點的數據內容發生變動 | ||
NodeChildChanged (4) | Wather監聽的對應數據節點的子節點列表發生變動 | ||
Disconnected (0) | None (-1) | 客戶端與ZooKeeper服務器斷開鏈接 | 此時客戶端和服務器處於斷開鏈接狀態 |
Expired (-112) | Node (-1) | 會話超時 | 此時客戶端會話失效,一般同時也會受到SessionExpiredException異常 |
AuthFailed (4) | None (-1) | 一般有兩種狀況,1:使用錯誤的schema進行權限檢查 2:SASL權限檢查失敗 | 一般同時也會收到AuthFailedException異常 |
利用Watcher來對節點進行監聽操做,但此監聽操做只能監聽一次。來看一個簡單的實例程序:
@Slf4j @Data public class ZkWatcherDemo { private String workerPath = "/test/listener/node"; private String subWorkerPath = "/test/listener/node/id-"; @Test public void testWatcher() { CuratorFramework client = ZKclient.instance.getClient(); //檢查節點是否存在,沒有則建立 boolean isExist = ZKclient.instance.isNodeExist(workerPath); if (!isExist) { ZKclient.instance.createNode(workerPath, null); } try { Watcher w = new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("監聽到的變化 watchedEvent = " + watchedEvent); } }; byte[] content = client.getData() .usingWatcher(w).forPath(workerPath); log.info("監聽節點內容:" + new String(content)); // 第一次變動節點數據 client.setData().forPath(workerPath, "第1次更改內容".getBytes()); // 第二次變動節點數據 client.setData().forPath(workerPath, "第2次更改內容".getBytes()); Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } //.... }
運行代碼,輸出的結果以下:
監聽到的變化 watchedEvent = WatchedEvent state:SyncConnected type:NodeDataChanged path:/test/listener/node
程序中,對節點路徑 「/test/listener/node」註冊一個Watcher監聽器實例,隨後調用setData方法兩次改變節點內容,可是,監聽器僅僅監聽到了一個事件。也就是說,當第二次改變節點內容時,監聽已經失效,沒法再次得到節點變更事件。
也就是說,Watcher監聽器是一次性的,若是要反覆使用,就須要反覆的使用usingWatcher提早註冊。
因此,Watcher監聽器不能應用於節點的數據變更或者節點變更這樣的通常業務場景。而是適用於一些特殊的,好比會話超時、受權失敗等這樣的特殊場景。
既然Watcher監聽器是一次性的,在開發過程當中須要反覆註冊Watcher,比較繁瑣。Curator引入了Cache來監聽ZooKeeper服務端的事件。Cache對ZooKeeper事件監聽進行了封裝,可以自動處理反覆註冊監聽。
Curator引入的Cache緩存實現,是一個系列,包括了Node Cache 、Path Cache、Tree Cache三組類。其中Node Cache節點緩存能夠用於ZNode節點的監聽,Path Cache子節點緩存用於ZNode的子節點的監聽,而Tree Cache樹緩存是Path Cache的加強,不光能監聽子節點,也能監聽ZNode節點自身。
Node Cache,能夠用於監控本節點的新增,刪除,更新。
Node Cache使用的第一步,就是構造一個NodeCache緩存實例。
有兩個構造方法,具體以下:
NodeCache(CuratorFramework client, String path) NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
第一個參數就是傳入建立的Curator的框架客戶端,第二個參數就是監聽節點的路徑,第三個重載參數dataIsCompressed 表示是否對數據進行壓縮。
NodeCache使用的第二步,就是構造一個NodeCacheListener監聽器實例。該接口的定義以下:
package org.apache.curator.framework.recipes.cache; public interface NodeCacheListener { void nodeChanged() throws Exception; }
NodeCacheListener監聽器接口,只定義了一個簡單的方法 nodeChanged,當節點變化時,這個方法就會被回調到。
在建立完NodeCacheListener的實例以後,須要將這個實例註冊到NodeCache緩存實例,使用緩存實例的addListener方法。 而後使用緩存實例nodeCache的start方法,啓動節點的事件監聽。
nodeCache.getListenable().addListener(l); nodeCache.start();
強調下,須要調用nodeCache的start方法能進行緩存和事件監聽,這個方法有兩個版本:
void start()//Start the cache. void start(boolean buildInitial) //true表明緩存當前節點
惟一的一個參數buildInitial表明着是否將該節點的數據當即進行緩存。若是設置爲true的話,在start啓動時當即調用NodeCache的getCurrentData方法就可以獲得對應節點的信息ChildData類,若是設置爲false的就得不到對應的信息。
使用NodeCache來監聽節點的事件,完整的實例代碼以下:
@Test public void testNodeCache() { //檢查節點是否存在,沒有則建立 boolean isExist = ZKclient.instance.isNodeExist(workerPath); if (!isExist) { ZKclient.instance.createNode(workerPath, null); } CuratorFramework client = ZKclient.instance.getClient(); try { NodeCache nodeCache = new NodeCache(client, workerPath, false); NodeCacheListener l = new NodeCacheListener() { @Override public void nodeChanged() throws Exception { ChildData childData = nodeCache.getCurrentData(); log.info("ZNode節點狀態改變, path={}", childData.getPath()); log.info("ZNode節點狀態改變, data={}", new String(childData.getData(), "Utf-8")); log.info("ZNode節點狀態改變, stat={}", childData.getStat()); } }; nodeCache.getListenable().addListener(l); nodeCache.start(); // 第1次變動節點數據 client.setData().forPath(workerPath, "第1次更改內容".getBytes()); Thread.sleep(1000); // 第2次變動節點數據 client.setData().forPath(workerPath, "第2次更改內容".getBytes()); Thread.sleep(1000); // 第3次變動節點數據 client.setData().forPath(workerPath, "第3次更改內容".getBytes()); Thread.sleep(1000); // 第4次變動節點數據 // client.delete().forPath(workerPath); Thread.sleep(Integer.MAX_VALUE); } catch (Exception e) { log.error("建立NodeCache監聽失敗, path={}", workerPath); } }
運行的結果是,NodeCashe節點緩存可以重複的進行事件節點。代碼中的第三次監聽的輸出節選以下:
\- ZNode節點狀態改變, path=/test/listener/node \- ZNode節點狀態改變, data=第3次更改內容 \- ZNode節點狀態改變, stat=17179869191,...
最後說明一下,若是NodeCache監聽的節點爲空(也就是說傳入的路徑不存在)。那麼若是咱們後面建立了對應的節點,也是會觸發事件從而回調nodeChanged方法。
PathChildrenCache子節點緩存用於子節點的監聽,監控本節點的子節點被建立、更新或者刪除。須要強調兩點:
(1)只能監聽子節點,監聽不到當前節點
(2)不能遞歸監聽,子節點下的子節點不能遞歸監控
PathChildrenCache子節點緩存使用的第一步,就是構造一個緩存實例。
有多個重載版本的構造方法,選擇4個進行說明,具體以下:
public PathChildrenCache(CuratorFramework client, String path,boolean cacheData) public PathChildrenCache(CuratorFramework client, String path,boolean cacheData, boolean dataIsCompressed,final ExecutorService executorService) public PathChildrenCache(CuratorFramework client, String path,boolean cacheData, boolean dataIsCompressed,ThreadFactory threadFactory) public PathChildrenCache(CuratorFramework client, String path,boolean cacheData, ThreadFactory threadFactory)
全部的構造方法,前三個參數,都是同樣的。
第一個參數就是傳入建立的Curator的框架客戶端,第二個參數就是監聽節點的路徑,第三個重載參數cacheData表示是否把節點內容緩存起來。若是cacheData爲true,那麼接收到節點列表變動事件的同時,會將得到節點內容。
dataIsCompressed參數(若是有),表示是否對節點數據進行壓縮。
executorService 和threadFactory參數差很少,表示經過傳入的線程池或者線程工廠,來異步處理監聽事件。
threadFactory參數(若是有)表示線程池工廠,當PathChildrenCache內部須要開啓新的線程執行時,使用該線程池工廠來建立線程。
PathChildrenCache子節點緩存使用的第二步,就是構造一個子節點緩存監聽器PathChildrenCacheListener實例。該接口的定義以下:
package org.apache.curator.framework.recipes.cache; import org.apache.curator.framework.CuratorFramework; public interface PathChildrenCacheListener { void childEvent(CuratorFramework client, PathChildrenCacheEvent e) throws Exception; }
PathChildrenCacheListener監聽器接口中,也只定義了一個簡單的方法 childEvent,當子節點有變化時,這個方法就會被回調到。
在建立完PathChildrenCacheListener的實例以後,須要將這個實例註冊到PathChildrenCache緩存實例,使用緩存實例的addListener方法。 而後使用緩存實例nodeCache的start方法,啓動節點的事件監聽。
這裏的start方法,須要傳入啓動的模式。能夠傳入三種模式,也就是API列表中看到的StartMode,其中定義了下面三種枚舉:
(1)NORMAL——異步初始化cache
(2)BUILD_INITIAL_CACHE——同步初始化cache
(3)POST_INITIALIZED_EVENT——異步初始化cache,並觸發完成事件
對於start模式的三種啓動方式,詳細的說明以下:
BUILD_INITIAL_CACHE:啓動時,同步初始化cache,以及建立cache後,就從服務器拉取對應的數據。
POST_INITIALIZED_EVENT:啓動時,異步初始化cache,初始化完成觸發PathChildrenCacheEvent.Type#INITIALIZED事件,cache中Listener會收到該事件的通知。
最後是第一個枚舉常量,NORMAL:啓動時,異步初始化cache,完成後不會發出通知。
使用PathChildrenCache來監聽節點的事件,完整的實例代碼以下:
@Test public void testPathChildrenCache() { //檢查節點是否存在,沒有則建立 boolean isExist = ZKclient.instance.isNodeExist(workerPath); if (!isExist) { ZKclient.instance.createNode(workerPath, null); } CuratorFramework client = ZKclient.instance.getClient(); try { PathChildrenCache cache = new PathChildrenCache(client, workerPath, true); PathChildrenCacheListener l = new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { try { ChildData data = event.getData(); switch (event.getType()) { case CHILD_ADDED: log.info("子節點增長, path={}, data={}", data.getPath(), new String(data.getData(), "UTF-8")); break; case CHILD_UPDATED: log.info("子節點更新, path={}, data={}", data.getPath(), new String(data.getData(), "UTF-8")); break; case CHILD_REMOVED: log.info("子節點刪除, path={}, data={}", data.getPath(), new String(data.getData(), "UTF-8")); break; default: break; } } catch ( UnsupportedEncodingException e) { e.printStackTrace(); } } }; cache.getListenable().addListener(l); cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); Thread.sleep(1000); for (int i = 0; i < 3; i++) { ZKclient.instance.createNode(subWorkerPath + i, null); } Thread.sleep(1000); for (int i = 0; i < 3; i++) { ZKclient.instance.deleteNode(subWorkerPath + i); } } catch (Exception e) { log.error("PathCache監聽失敗, path=", workerPath); } }
運行的結果以下:
\- 子節點增長, path=/test/listener/node/id-0, data=to set content \- 子節點增長, path=/test/listener/node/id-2, data=to set content \- 子節點增長, path=/test/listener/node/id-1, data=to set content ...... \- 子節點刪除, path=/test/listener/node/id-2, data=to set content \- 子節點刪除, path=/test/listener/node/id-0, data=to set content \- 子節點刪除, path=/test/listener/node/id-1, data=to set content
能夠看到,PathChildrenCache 可以反覆的監聽到節點的新增和刪除。
簡單說下Curator的監聽原理,不管是PathChildrenCache,仍是TreeCache,所謂的監聽,都是進行Curator本地緩存視圖和ZooKeeper服務器遠程的數據節點的對比。
在什麼場景下觸發事件呢?
以節點增長事件NODE_ADDED爲例,所在本地緩存視圖開始的時候,本地視圖爲空,在數據同步的時候,本地的監聽器就能監聽到NODE_ADDED事件。這是由於,剛開始本地緩存並無內容,而後本地緩存和服務器緩存進行對比,發現ZooKeeper服務器有節點而本地緩存沒有,這纔將服務器的節點緩存到本地,就會觸發本地緩存的NODE_ADDED事件。
前面已經講完了兩個系列的緩存監聽。簡單回顧一下:
Node Cache用來觀察ZNode自身,若是ZNode節點自己被建立,更新或者刪除,那麼Node Cache會更新緩存,並觸發事件給註冊的監聽器。Node Cache是經過NodeCache類來實現的,監聽器對應的接口爲NodeCacheListener。
Path Cache子節點緩存用來觀察ZNode的子節點、並緩存子節點的狀態,若是ZNode的子節點被建立,更新或者刪除,那麼Path Cache會更新緩存,而且觸發事件給註冊的監聽器。Path Cache是經過PathChildrenCache類來實現的,監聽器註冊是經過PathChildrenCacheListener。
最後的一個系列,是Tree Cache。Tree Cache能夠看作是上兩種的合體,Tree Cache觀察的是當前ZNode節點的全部數據。而TreeCache節點樹緩存是PathChildrenCache的加強,不光能監聽子節點,也能監聽節點自身。
Tree Cache使用的第一步,就是構造一個TreeCache緩存實例。
有兩個構造方法,具體以下:
TreeCache(CuratorFramework client, String path) TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, ExecutorService executorService, boolean createParentNodes, TreeCacheSelector selector)
第一個參數就是傳入建立的Curator的框架客戶端,第二個參數就是監聽節點的路徑,第三個重載參數dataIsCompressed 表示是否對數據進行壓縮。maxDepth表示緩存的層次深度,默認爲整數最大值。executorService 表示監聽的的執行線程池,默認會建立一個單一線程的線程池。createParentNodes 表示是否建立父親節點,默認爲false。
通常狀況下,使用第一個構造函數便可。
TreeCache使用的第二步,就是構造一個TreeCacheListener監聽器實例。該接口的定義以下:
package org.apache.curator.framework.recipes.cache; import org.apache.curator.framework.CuratorFramework; public interface TreeCacheListener { void childEvent(CuratorFramework var1, TreeCacheEvent var2) throws Exception; }
TreeCacheListener 監聽器接口中,也只定義了一個簡單的方法 childEvent,當子節點有變化時,這個方法就會被回調到。
在建立完TreeCacheListener 的實例以後,使用緩存實例的addListener方法,將TreeCacheListener 監聽器實例註冊到TreeCache 緩存實例。 而後使用緩存實例nodeCache的start方法,啓動節點的事件監聽。
整個實例的代碼以下:
@Test public void testTreeCache() { //檢查節點是否存在,沒有則建立 boolean isExist = ZKclient.instance.isNodeExist(workerPath); if (!isExist) { ZKclient.instance.createNode(workerPath, null); } CuratorFramework client = ZKclient.instance.getClient(); try { TreeCache treeCache = new TreeCache(client, workerPath); TreeCacheListener l = new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) { try { ChildData data = event.getData(); if(data==null) { log.info("數據爲空"); return; } switch (event.getType()) { case NODE_ADDED: log.info("[TreeCache]節點增長, path={}, data={}", data.getPath(), new String(data.getData(), "UTF-8")); break; case NODE_UPDATED: log.info("[TreeCache]節點更新, path={}, data={}", data.getPath(), new String(data.getData(), "UTF-8")); break; case NODE_REMOVED: log.info("[TreeCache]節點刪除, path={}, data={}", data.getPath(), new String(data.getData(), "UTF-8")); break; default: break; } } catch ( UnsupportedEncodingException e) { e.printStackTrace(); } } }; treeCache.getListenable().addListener(l); treeCache.start(); Thread.sleep(1000); for (int i = 0; i < 3; i++) { ZKclient.instance.createNode(subWorkerPath + i, null); } Thread.sleep(1000); for (int i = 0; i < 3; i++) { ZKclient.instance.deleteNode(subWorkerPath + i); } Thread.sleep(1000); ZKclient.instance.deleteNode(workerPath); Thread.sleep(Integer.MAX_VALUE); } catch (Exception e) { log.error("PathCache監聽失敗, path=", workerPath); } }
運行的結果以下:
\- [TreeCache]節點增長, path=/test/listener/node, data=to set content \- [TreeCache]節點增長, path=/test/listener/node/id-0, data=to set content \- [TreeCache]節點增長, path=/test/listener/node/id-1, data=to set content \- [TreeCache]節點增長, path=/test/listener/node/id-2, data=to set content \- [TreeCache]節點刪除, path=/test/listener/node/id-2, data=to set content \- [TreeCache]節點刪除, path=/test/listener/node/id-1, data=to set content \- [TreeCache]節點刪除, path=/test/listener/node/id-0, data=to set content \- [TreeCache]節點刪除, path=/test/listener/node, data=to set content
最後,說明下事件的類型,對應於節點的增長、修改、刪除,TreeCache 的事件類型爲:
(1)NODE_ADDED
(2)NODE_UPDATED
(3)NODE_REMOVED
這一點,與Path Cache 的事件類型不一樣,與Path Cache 的事件類型爲:
(1)CHILD_ADDED
(2)CHILD_UPDATED
(3)CHILD_REMOVED
下一篇:基於zk,實現分佈式鎖。
Java (Netty) 聊天程序【 億級流量】實戰 開源項目實戰
瘋狂創客圈 【 博客園 總入口 】