前言:立刻要過年了,祝你們新年快樂!在過年回家前分享一篇關於Zookeeper的文章,咱們都知道如今微服務盛行,大數據、分佈式系統中常常會使用到Zookeeper,它是微服務、分佈式系統中必不可少的分佈式協調框架。它的做用體如今分佈式系統中解決了配置中心的問題,以及解決了在分佈式環境中不一樣進程之間爭奪資源的問題,也就是分佈式鎖的功能以及分佈式消息隊列功能等等。因此在微服務的環境中Zookeeper是如今不少公司首選的分佈式協調框架,包括我以前的公司也在使用Zookeeper。說了這麼多,沒別的就是想說一下Zookeeper的重要性,廢話很少說,進入正題。本篇博客只是演示在.Net Core 環境中如何使用Zookeeper組件進行基本的增刪改查和一些注意的要點,若是對Zookeeper還不是太瞭解的話,建議認認真真、仔仔細細地閱讀該文章:http://www.cnblogs.com/sunddenly/p/4033574.html 不然可能下面演示的你會看不懂。html
概念:git
Zookeeper是一個開源的分佈式協調框架,它具備高性能 、高可用的特色,同時具備嚴格的順序訪問控制能力(主要是寫操做的嚴格順序性),基於對ZAB(Zookeeper原子消息廣播協議)的實現,它可以很好的保證分佈式環境下的數據一致性。也正是基於這樣的特徵,使得Zookeeper稱爲解決分佈式數據一致性問題的利器,Zookeeper由兩部分組成:Zookeeper服務端和客戶端。github
特色:數據庫
數據結構:apache
圖片來源:(https://www.cnblogs.com/xums/p/7074008.html)api
好了,基本的概念就聊到這裏,先有一個印象,若是須要詳細的學習,建議認認真真閱讀這篇博客:http://www.cnblogs.com/sunddenly/p/4033574.html,下面就開始演示基本的api操做。服務器
首先,添加下面的依賴包:session
新建一個.Net Core的控制檯應用:數據結構
Zookeeper的服務端使用的是張輝清老師新書《中小研發團隊架構實踐》裏面的服務,我這裏再也不安裝Zookeeper服務端,只是介紹一下Zookeeper的目錄結構架構
(1)bin:主要的一些運行命令
(2)conf:存放配置文件,其中咱們須要修改zk.cfg
(3)contrib:附加的一些功能
(4)dist-maven:mvn編譯後的目錄
(5)docs:文檔
(6)lib:須要依賴的jar包
配置文件zk.cfg文件內容介紹(單機版)
(1)trickTime:用於計算的時間單元,好比session超時:N*trickTime
(2)initLimit:用於集羣,容許從節點連接並同步到master節點的初始化連接時間,以trickTime的倍數來表示
(3)syncLimit:用於集羣,master主節點與從節點之間發送消息,請求和應答時間長度(心跳機制)
(4)dataDir:必須配置
(5)dataLogDir:日誌目錄,若是不配置會和dataDir公用
(6)clientPort:連接服務器的端口,默認是2181
好了就介紹到這裏,下面讓我會演示關於Zookeeper API的各類操做。
(1)代碼以下:
using org.apache.zookeeper; using org.apache.zookeeper.data; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using static org.apache.zookeeper.Watcher.Event; namespace ZookeeperNetCore { public class ZookeeperClient { public ZooKeeper ZK { get; set; } // 配置項 public string QueryPath { get; set; }= "/Configuration"; //節點狀態信息 public Stat Stat { get; set; } // 配置數據 public byte[] ConfigData { get; set; } = null; public ZookeeperClient(string serviceAddress, int timeout) { ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this)); Console.WriteLine("客戶端開始鏈接zookeeper服務器..."); Console.WriteLine($"鏈接狀態:{ZK.getState()}"); Thread.Sleep(1000);//注意:爲何要加上這行代碼,若是不加會出現什麼問題 Console.WriteLine($"鏈接狀態:{ZK.getState()}"); } // 讀取節點的配置數據 public async Task<string> ReadConfigDataAsync() { if (this.ZK == null) { return string.Empty; } var stat = await ZK.existsAsync(QueryPath, true); if (stat == null) { return string.Empty; } this.Stat = stat; var dataResult = await ZK.getDataAsync(QueryPath, true); return Encoding.UTF8.GetString(dataResult.Data); } public class ConfigServiceWatcher : Watcher { private ZookeeperClient _cs = null; public ConfigServiceWatcher(ZookeeperClient cs) { _cs = cs; } public override async Task process(WatchedEvent @event) { Console.WriteLine($"Zookeeper連接成功:{@event.getState() == KeeperState.SyncConnected}"); if (@event.get_Type() == EventType.NodeDataChanged) { var data = await _cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改成【{2}】。", Environment.NewLine, _cs.QueryPath, data); } } } } }
解釋:
首先,咱們來看看建立Zookeeper對象時,應該注意的問題:
Zookeeper的構造函數參數解釋以下:
客戶端和zk服務端連接是一個異步的過程,當鏈接成功後後,客戶端會收的一個watch通知,就是調用回調函數:ConfigServiceWatcher.process(WatchedEvent @event)注意這個類ConfigServiceWatcher必需要繼承Watcher,重寫 process(WatchedEvent @event),因此就打印出了。關於Zookeeper的watcher後面會詳細介紹,不明白的沒關係,後面會經過代碼給你們演示。
(1)connectString:鏈接服務器的ip字符串,好比: "192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181"能夠是一個ip,也能夠是多個ip,一個ip表明單機,多個ip表明集羣,也能夠在ip後加路徑。
(2)sessionTimeout:超時時間,心跳收不到了,那就超時
(3)watcher:通知事件,若是有對應的事件觸發,則會收到一個通知;若是不須要,那就設置爲null,在上面的演示中,咱們設置了一個watcher。
(4)canBeReadOnly:可讀,當這個物理機節點斷開後,仍是能夠讀到數據的,只是不能寫,此時數據被讀取到的多是舊數據,此處建議設置爲false,不推薦使用。
(5)sessionId:會話的id
(6)sessionPasswd:會話密碼 當會話丟失後,能夠依據 sessionId 和 sessionPasswd 從新獲取會話。
好了,基本的參數已經介紹完畢,那麼,來解釋一下爲何在建立Zookeeper對象時添加下面這句代碼:
其實上面我已經解釋了,因爲客戶端和zk服務端連接是一個異步的過程,須要必定的時間間隔,因此,若是不添加效果這樣:
(2)zookeeper 恢復以前的會話鏈接演示
using org.apache.zookeeper; using org.apache.zookeeper.data; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using static org.apache.zookeeper.Watcher.Event; namespace ZookeeperNetCore { public class ZookeeperClient { public ZooKeeper ZK { get; set; } // 配置項 public string QueryPath { get; set; }= "/Configuration"; //節點狀態信息 public Stat Stat { get; set; } // 配置數據 public byte[] ConfigData { get; set; } = null; public ZookeeperClient(string serviceAddress, int timeout) { ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this)); } public ZookeeperClient(string serviceAddress, int timeout, long sessionId, byte[] sessionPasswd) { ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher2(this), sessionId, sessionPasswd); } // 讀取節點的配置數據 public async Task<string> ReadConfigDataAsync() { if (this.ZK == null) { return string.Empty; } var stat = await ZK.existsAsync(QueryPath, true); if (stat == null) { return string.Empty; } this.Stat = stat; var dataResult = await ZK.getDataAsync(QueryPath, true); return Encoding.UTF8.GetString(dataResult.Data); } public class ConfigServiceWatcher : Watcher { private ZookeeperClient _cs = null; public ConfigServiceWatcher(ZookeeperClient cs) { _cs = cs; } public override async Task process(WatchedEvent @event) { Console.WriteLine($"Zookeeper連接成功:{@event.getState() == KeeperState.SyncConnected}"); if (@event.get_Type() == EventType.NodeDataChanged) { var data = await _cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改成【{2}】。", Environment.NewLine, _cs.QueryPath, data); } } } public class ConfigServiceWatcher2 : Watcher { private ZookeeperClient _cs = null; public ConfigServiceWatcher2(ZookeeperClient cs) { _cs = cs; } public override async Task process(WatchedEvent @event) { Console.WriteLine($"Zookeeper連接成功:{@event.getState() == KeeperState.SyncConnected}"); if (@event.get_Type() == EventType.NodeDataChanged) { var data = await _cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改成【{2}】。", Environment.NewLine, _cs.QueryPath, data); } } } } }
代碼:
using org.apache.zookeeper; using org.apache.zookeeper.data; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using static org.apache.zookeeper.Watcher.Event; using static org.apache.zookeeper.ZooDefs; namespace ZookeeperNetCore { public class ZookeeperClient { public ZooKeeper ZK { get; set; } // 配置項 public string QueryPath { get; set; }= "/Configuration"; //節點狀態信息 public Stat Stat { get; set; } // 配置數據 public byte[] ConfigData { get; set; } = null; public ZookeeperClient(string serviceAddress, int timeout) { ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this)); } public ZookeeperClient(string serviceAddress, int timeout, long sessionId, byte[] sessionPasswd) { ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher2(this), sessionId, sessionPasswd); } // 讀取節點的配置數據 public async Task<string> ReadConfigDataAsync() { if (this.ZK == null) { return string.Empty; } var stat = await ZK.existsAsync(QueryPath, true); if (stat == null) { return string.Empty; } this.Stat = stat; var dataResult = await ZK.getDataAsync(QueryPath, true); return Encoding.UTF8.GetString(dataResult.Data); } public class ConfigServiceWatcher : Watcher { private ZookeeperClient _cs = null; public ConfigServiceWatcher(ZookeeperClient cs) { _cs = cs; } public override async Task process(WatchedEvent @event) { Console.WriteLine($"Zookeeper連接成功:{@event.getState() == KeeperState.SyncConnected}"); if (@event.get_Type() == EventType.NodeDataChanged) { var data = await _cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改成【{2}】。", Environment.NewLine, _cs.QueryPath, data); } } } public class ConfigServiceWatcher2 : Watcher { private ZookeeperClient _cs = null; public ConfigServiceWatcher2(ZookeeperClient cs) { _cs = cs; } public override async Task process(WatchedEvent @event) { Console.WriteLine($"Zookeeper連接成功:{@event.getState() == KeeperState.SyncConnected}"); if (@event.get_Type() == EventType.NodeDataChanged) { var data = await _cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改成【{2}】。", Environment.NewLine, _cs.QueryPath, data); } } } // 關閉ZooKeeper鏈接 // 釋放資源 public async Task Close() { if (this.ZK != null) { await ZK.closeAsync(); } this.ZK = null; } } }
using org.apache.zookeeper; using System; using System.Text; using System.Threading; using System.Threading.Tasks; using static org.apache.zookeeper.ZooDefs; namespace ZookeeperNetCore { class Program { public const int timeout = 5000; static async Task Main(string[] args) { var conf = new ZookeeperClient("", timeout); try { conf.QueryPath = "/UserName"; Console.WriteLine("客戶端開始鏈接zookeeper服務器..."); Console.WriteLine($"鏈接狀態:{conf.ZK.getState()}"); Thread.Sleep(1000);//注意:爲何要加上這行代碼,若是不加會出現什麼問題 Console.WriteLine($"鏈接狀態:{conf.ZK.getState()}"); if (await conf.ZK.existsAsync(conf.QueryPath, false) == null) { conf.ConfigData = Encoding.Default.GetBytes("guozheng"); await conf.ZK.createAsync(conf.QueryPath, conf.ConfigData, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } string configData = await conf.ReadConfigDataAsync(); Console.WriteLine("節點【{0}】目前的值爲【{1}】。", conf.QueryPath, configData); Console.ReadLine(); Random random = new Random((int)DateTime.Now.Ticks & 0x0000FFFF); conf.ConfigData = Encoding.UTF8.GetBytes(string.Format("Mike_{0}", random.Next(100))); await conf.ZK.setDataAsync(conf.QueryPath, conf.ConfigData, -1); Console.WriteLine("節點【{0}】的值已被修改成【{1}】。", conf.QueryPath, Encoding.UTF8.GetString(conf.ConfigData)); Console.ReadLine(); if (await conf.ZK.existsAsync(conf.QueryPath, false) != null) { await conf.ZK.deleteAsync(conf.QueryPath, -1); Console.WriteLine("已刪除此【{0}】節點。{1}", conf.QueryPath, Environment.NewLine); } } catch (Exception ex) { if (conf.ZK == null) { Console.WriteLine("已關閉ZooKeeper的鏈接。"); Console.ReadLine(); return; } Console.WriteLine("拋出異常:{0}【{1}】。", Environment.NewLine, ex.ToString()); } finally { await conf.Close(); Console.WriteLine("已關閉ZooKeeper的鏈接。"); Console.ReadLine(); } ////開始會話重連 //Console.WriteLine("開始會話重連..."); //var conf2 = new ZookeeperClient("", timeout, sessionId, sessionPassword); //Console.WriteLine(conf2.ZK.getSessionId()); //Console.WriteLine( Encoding.UTF8.GetString(conf2.ZK.getSessionPasswd())); //Console.WriteLine($"從新鏈接狀態zkSession:{conf2.ZK.getState()}"); //Thread.Sleep(1000);//注意:爲何要加上這行代碼,若是不加會出現什麼問題 //Console.WriteLine($"從新鏈接狀態zkSession:{conf2.ZK.getState()}"); Console.ReadKey(); } } }
解釋:
關於異步建立節點的方法,是不支持子節點的遞歸建立,參數介紹:
(1)path:建立的路徑
(2)data:存儲的數據的byte[]
(3)acl:控制權限策略 Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa CREATOR_ALL_ACL --> auth:user:password:cdrwa
(4)createMode: 節點類型, 是一個枚舉 PERSISTENT:持久節點 PERSISTENT_SEQUENTIAL:持久順序節點 EPHEMERAL:臨時節點 EPHEMERAL_SEQUENTIAL:臨時順序節點
關於上面參數引出來的知識點,須要幾章來說解,本篇文章先不介紹,後面會介紹。好了,關於.Net Core中使用Zookeeper的介紹就到這裏,關於上面演示的結果,我先拋出一個問題,你們能夠思考一下:爲何「Zookeeper連接成功:True」會輸出屢次?也就是咱們下節要討論的Zookeeper的watcher機制。時間到了,收拾行李,準備一下回家啦,先寫到這裏,祝你們新年快樂!但願對你有幫助,過完年來見!
可能有些地方解釋的不是太清楚,你們多多見諒,有些的不對的地方,但願能指正出來。
說明:演示代碼裏面使用的Zookeeper服務過一段時間能用,不能用的話,在評論區留言,後面用阿里雲本身搭建一個。
代碼地址:
https://github.com/guozheng007/ZookeeperNetCoreDemo
參考資料:
(1)張輝清:《中小研發團隊架構實踐》
(2) 風間影月:《ZooKeeper分佈式專題與Dubbo微服務入門》
(3)sunddenly:http://www.cnblogs.com/sunddenly/p/4033574.html
做者:郭崢
出處:http://www.cnblogs.com/runningsmallguo/
本文版權歸做者和博客園共有,歡迎轉載,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文連接。