目錄html
瘋狂創客圈 Java 分佈式聊天室【 億級流量】實戰系列之 -24【 博客園 總入口 】node
你們好,我是做者尼恩。目前和幾個小夥伴一塊兒,組織了一個高併發的實戰社羣【瘋狂創客圈】。正在開始高併發、億級流程的 IM 聊天程序 學習和實戰面試
前面,已經完成一個高性能的 Java 聊天程序的四件大事:apache
接下來,須要進入到分佈式開發的環節了。 分佈式的中間件,瘋狂創客圈的小夥伴們,一致的選擇了zookeeper,不單單是因爲其在大數據領域,太有名了。更重要的是,不少的著名框架,都使用了zk。api
本篇介紹 Curator客戶端的基本操做。服務器
打開Curator的官網,咱們能夠看到,Curator包含了如下幾個包:session
curator-framework:對zookeeper的底層api的一些封裝;併發
curator-client:提供一些客戶端的操做,例如重試策略等;框架
curator-recipes:封裝了一些高級特性,如:Cache事件監聽、選舉、分佈式鎖、分佈式計數器、分佈式Barrier等。異步
Maven依賴(使用curator的版本:4.0.0,對應Zookeeper的版本爲:3.4.x,若是版本不匹配,就會有兼容性問題,頗有可能致使節點操做失敗。具體的版本對應關係,能夠去curator的官網查看。
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.0.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency>
使用 curator-framework 包中的工廠類CuratorFrameworkFactory中的靜態方法newClient,來建立客戶端會話。
代碼以下:
/** * create by 尼恩 @ 瘋狂創客圈 **/ public class ClientFactory { /** * @param connectionString zk的鏈接地址 * @return CuratorFramework 實例 */ public static CuratorFramework createSimple(String connectionString) { // 重試策略:第一次重試等待1s,第二次重試等待2s,第三次重試等待4s // 第一個參數:等待時間的基礎單位,單位爲毫秒 // 第二個參數:最大重試次數 ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); // 獲取 CuratorFramework 實例的最簡單的方式 // 第一個參數:zk的鏈接地址 // 第二個參數:重試策略 return CuratorFrameworkFactory.newClient(connectionString, retryPolicy); } /** * @param connectionString zk的鏈接地址 * @param retryPolicy 重試策略 * @param connectionTimeoutMs 鏈接 * @param sessionTimeoutMs * @return CuratorFramework 實例 */ public static CuratorFramework createWithOptions( String connectionString, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) { // builder 模式建立 CuratorFramework 實例 return CuratorFrameworkFactory.builder() .connectString(connectionString) .retryPolicy(retryPolicy) .connectionTimeoutMs(connectionTimeoutMs) .sessionTimeoutMs(sessionTimeoutMs) // 其餘的建立選項 .build(); } }
這裏用到兩個版本,前一個是簡化版本,只須要設置ZK集羣的鏈接地址和重試策略。
後一個是相對複雜的重載版本,能夠設置鏈接超時connectionTimeoutMs、會話超時sessionTimeoutMs 等其餘的會話建立選項。
具體請看瘋狂創客圈的Demo源碼。
使用create()方法,最後使用forPath帶上須要建立的節點路徑。
/** * 建立節點 */ @Test public void createNode() { //建立客戶端 CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS); try { //啓動客戶端實例,鏈接服務器 client.start(); // 建立一個 ZNode 節點 // 節點的數據爲 payload String data = "hello"; byte[] payload = data.getBytes("UTF-8"); String zkPath = "/test/CRUD/node-1"; client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath(zkPath, payload); } catch (Exception e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(client); } }
使用withMode()方法,設置節點的類型。zookeeper節點有四種類型:
(1)PERSISTENT 持久節點
(2)PERSISTENT_SEQUENTIAL 持久順序節點
(3)PHEMERAL 臨時節
(4)EPHEMERAL_SEQUENTIAL 臨時順序節點
下面詳細介紹一下四種節點的區別和聯繫。
(1)持久節點(PERSISTENT)
所謂持久節點,是指在節點建立後,就一直存在,直到有刪除操做來主動清除這個節點。持久節點的生命週期是永久有效,不會由於建立該節點的客戶端會話失效而消失。
(2)持久順序節點(PERSISTENT_SEQUENTIAL)
這類節點的生命週期和持久節點是一致的。額外的特性是,在ZK中,每一個父節點會爲他的第一級子節點維護一份次序,會記錄每一個子節點建立的前後順序。若是在建立子節點的時候,能夠設置這個屬性,那麼在建立節點過程當中,ZK會自動爲給定節點名加上一個表示次序的數字後綴,做爲新的節點名。這個次序後綴的範圍是整型的最大值。
好比,在建立節點的時候只須要傳入節點 「/test_」,這樣以後,zookeeper自動會給」test_」後面補充數字次序。
(3)臨時節點(EPHEMERAL)
和持久節點不一樣的是,臨時節點的生命週期和客戶端會話綁定。也就是說,若是客戶端會話失效,那麼這個節點就會自動被清除掉。注意,這裏提到的是會話失效,而非鏈接斷開。這裏還要注意一件事,就是當你客戶端會話失效後,所產生的節點也不是一會兒就消失了,也要過一段時間,大概是10秒之內,能夠試一下,本機操做生成節點,在服務器端用命令來查看當前的節點數目,你會發現客戶端已經stop,可是產生的節點還在。
另外,在臨時節點下面不能建立子節點。
(4)臨時順序節點(EPHEMERAL_SEQUENTIAL)
此節點是屬於臨時節點,不過帶有順序,客戶端會話結束節點就消失。
與節點讀取的有關的方法,主要有三個:
(1)首先是判斷節點是否存在,使用checkExists方法。
(2)其次是獲取節點的數據,使用getData方法。
(3)最後是獲取子節點列表,使用getChildren方法。
演示代碼以下:
/** * 讀取節點 */ @Test public void readNode() { //建立客戶端 CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS); try { //啓動客戶端實例,鏈接服務器 client.start(); String zkPath = "/test/CRUD/node-1"; Stat stat = client.checkExists().forPath(zkPath); if (null != stat) { //讀取節點的數據 byte[] payload = client.getData().forPath(zkPath); String data = new String(payload, "UTF-8"); log.info("read data:", data); String parentPath = "/test"; List<String> children = client.getChildren().forPath(parentPath); for (String child : children) { log.info("child:", child); } } } catch (Exception e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(client); } }
節點的更新,分爲同步更新與異步更新。
/** * 更新節點 */ @Test public void updateNode() { //建立客戶端 CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS); try { //啓動客戶端實例,鏈接服務器 client.start(); String data = "hello world"; byte[] payload = data.getBytes("UTF-8"); String zkPath = "/test/node-1"; client.setData() .forPath(zkPath, payload); } catch (Exception e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(client); } }
異步更新,須要用到inBackground()方法,其做用是,讓更新操做異步執行。若是須要監聽到異步操做的結果,須要爲inBackground加上AsyncCallback回調實例。
異步更新的代碼以下:
/** * 更新節點 - 異步模式 */ @Test public void updateNodeAsync() { //建立客戶端 CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS); try { //更新完成監聽器 AsyncCallback.StringCallback callback = new AsyncCallback.StringCallback() { @Override public void processResult(int i, String s, Object o, String s1) { System.out.println( "i = " + i + " | " + "s = " + s + " | " + "o = " + o + " | " + "s1 = " + s1 ); } }; //啓動客戶端實例,鏈接服務器 client.start(); String data = "hello ,every body! "; byte[] payload = data.getBytes("UTF-8"); String zkPath = "/test/CRUD/node-1"; client.setData() .inBackground(callback) .forPath(zkPath, payload); Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(client); } }
刪除節點,使用delete 方法,代碼以下。
/** * 刪除節點 */ @Test public void deleteNode() { //建立客戶端 CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS); try { //啓動客戶端實例,鏈接服務器 client.start(); //刪除節點 String zkPath = "/test/CRUD/node-1"; client.delete().forPath(zkPath); //刪除後查看結果 String parentPath = "/test"; List<String> children = client.getChildren().forPath(parentPath); for (String child : children) { log.info("child:", child); } } catch (Exception e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(client); } }
和更新同樣,也能夠進行異步刪除,一樣須要用到inBackground()方法。若是須要監聽異步操做的結果,須要爲inBackground方法加上一個參數:AsyncCallback回調實例。
下一篇:開啓zk的客戶端開發。
Java (Netty) 聊天程序【 億級流量】實戰 開源項目實戰
瘋狂創客圈 【 博客園 總入口 】