ASP.Net Core 中使用Zookeeper搭建分佈式環境中的配置中心繫列一:使用Zookeeper.Net組件演示基本的操做

前言:立刻要過年了,祝你們新年快樂!在過年回家前分享一篇關於Zookeeper的文章,咱們都知道如今微服務盛行,大數據、分佈式系統中常常會使用到Zookeeper,它是微服務、分佈式系統中必不可少的分佈式協調框架。它的做用體如今分佈式系統中解決了配置中心的問題,以及解決了在分佈式環境中不一樣進程之間爭奪資源的問題,也就是分佈式鎖的功能以及分佈式消息隊列功能等等。因此在微服務的環境中Zookeeper是如今不少公司首選的分佈式協調框架,包括我以前的公司也在使用Zookeeper。說了這麼多,沒別的就是想說一下Zookeeper的重要性,廢話很少說,進入正題。本篇博客只是演示在.Net Core 環境中如何使用Zookeeper組件進行基本的增刪改查和一些注意的要點,若是對Zookeeper還不是太瞭解的話,建議認認真真、仔仔細細地閱讀該文章:http://www.cnblogs.com/sunddenly/p/4033574.html   不然可能下面演示的你會看不懂。html

 

1、Zookeeper基本概念快速介紹

概念:git

Zookeeper是一個開源的分佈式協調框架,它具備高性能 、高可用的特色,同時具備嚴格的順序訪問控制能力(主要是寫操做的嚴格順序性),基於對ZAB(Zookeeper原子消息廣播協議)的實現,它可以很好的保證分佈式環境下的數據一致性。也正是基於這樣的特徵,使得Zookeeper稱爲解決分佈式數據一致性問題的利器,Zookeeper由兩部分組成:Zookeeper服務端和客戶端。github

特色:數據庫

  • 全局一致性:每一個server保存一份相同的數據副本,client不管連接哪一個server,展現的數據都是一致的,這是最重要的特徵。
  • 可靠性:若是消息其中一臺服務器接受,那麼將被全部的服務器接受。
  • 順序性:包括全局有序性和偏序兩種:全局有序是指若是在一臺服務器上消息a在消息b前發佈,則在全部server上消息a都將在消息b前被髮布;偏序是指若是一個消息b在消息a後被同一個發送者發佈,a必將排在b前面。
  • 數據更新原子性:一次數據更新要麼成功,要麼失敗,不存在中間狀態。
  • 實時性:Zookeeper保證客戶端將在一個時間間隔範圍內得到服務器的更新信息,或者服務器失敗的信息。

數據結構:apache

圖片來源:(https://www.cnblogs.com/xums/p/7074008.html)api

  • Zookeeper的數據結構模型採用相似於文件系統的樹結構。樹上的每一個節點稱爲ZNode,而每一個節點均可能有一個或者多個子節點。ZNode的節點路徑標識方式是由一系列斜槓"/"進行分割的路徑表示,必須是絕對路徑。既能夠向ZNode節點寫入、修改和讀取數據,也能夠建立、刪除ZNode節點或ZNode節點下的子節點。
  • 值的注意的是,Zookeeper的設計目標不是傳統的數據庫存儲或大數據對象存儲,而是協同數據的存儲,所以在實現的時候,ZNode存儲的數據大小不該該超過1MB。另外,每個節點都有一個ACL(訪問控制列表),據此控制該節點的訪問權限。
  • ZNode數據節點是有生命週期的,其生命週期的長短取決於數據節點的節點類型。節點類型共有四種:持久節點、持久順序節點、臨時節點、臨時順序節點

 

好了,基本的概念就聊到這裏,先有一個印象,若是須要詳細的學習,建議認認真真閱讀這篇博客:http://www.cnblogs.com/sunddenly/p/4033574.html,下面就開始演示基本的api操做。服務器

 

2、ASP.Net Core 中使用ZooKeeper

 首先,添加下面的依賴包:session

 

新建一個.Net Core的控制檯應用:數據結構

Zookeeper的服務端使用的是張輝清老師新書《中小研發團隊架構實踐》裏面的服務,我這裏再也不安裝Zookeeper服務端,只是介紹一下Zookeeper的目錄結構架構

  • Zookeeper目錄介紹

(1)bin:主要的一些運行命令

(2)conf:存放配置文件,其中咱們須要修改zk.cfg

(3)contrib:附加的一些功能

(4)dist-maven:mvn編譯後的目錄

(5)docs:文檔

(6)lib:須要依賴的jar包

配置文件zk.cfg文件內容介紹(單機版)

(1)trickTime:用於計算的時間單元,好比session超時:N*trickTime

(2)initLimit:用於集羣,容許從節點連接並同步到master節點的初始化連接時間,以trickTime的倍數來表示

(3)syncLimit:用於集羣,master主節點與從節點之間發送消息,請求和應答時間長度(心跳機制)

(4)dataDir:必須配置

(5)dataLogDir:日誌目錄,若是不配置會和dataDir公用

(6)clientPort:連接服務器的端口,默認是2181

好了就介紹到這裏,下面讓我會演示關於Zookeeper  API的各類操做。

  • 如何鏈接Zookeeper的服務端

(1)代碼以下:

using org.apache.zookeeper;
using org.apache.zookeeper.data;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static org.apache.zookeeper.Watcher.Event;

namespace ZookeeperNetCore
{
    public class ZookeeperClient
    {
        public ZooKeeper ZK { get; set; }

        // 配置項
        public string QueryPath { get; set; }= "/Configuration";
        //節點狀態信息
        public Stat Stat { get; set; }

        // 配置數據
        public byte[] ConfigData { get; set; } = null;


        public ZookeeperClient(string serviceAddress, int timeout)
        {
            ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this));

            Console.WriteLine("客戶端開始鏈接zookeeper服務器...");
            Console.WriteLine($"鏈接狀態:{ZK.getState()}");
            Thread.Sleep(1000);//注意:爲何要加上這行代碼,若是不加會出現什麼問題
            Console.WriteLine($"鏈接狀態:{ZK.getState()}");
        }

        // 讀取節點的配置數據
        public async Task<string> ReadConfigDataAsync()
        {
            if (this.ZK == null)
            {
                return string.Empty;
            }

            var stat = await ZK.existsAsync(QueryPath, true);

            if (stat == null)
            {
                return string.Empty;
            }

            this.Stat = stat;

            var dataResult = await ZK.getDataAsync(QueryPath, true);
            
            return Encoding.UTF8.GetString(dataResult.Data);
        }

        public class ConfigServiceWatcher : Watcher
        {
            private ZookeeperClient _cs = null;

            public ConfigServiceWatcher(ZookeeperClient cs)
            {
                _cs = cs;
            }

            public override  async Task process(WatchedEvent @event)
            {
                Console.WriteLine($"Zookeeper連接成功:{@event.getState() == KeeperState.SyncConnected}");

                if (@event.get_Type() == EventType.NodeDataChanged)
                {
                    var data = await _cs.ReadConfigDataAsync();

                    Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改成【{2}】。", Environment.NewLine, _cs.QueryPath, data);
                }
            }
        }

    }
}

解釋:

 首先,咱們來看看建立Zookeeper對象時,應該注意的問題:

Zookeeper的構造函數參數解釋以下:

客戶端和zk服務端連接是一個異步的過程,當鏈接成功後後,客戶端會收的一個watch通知,就是調用回調函數:ConfigServiceWatcher.process(WatchedEvent @event)注意這個類ConfigServiceWatcher必需要繼承Watcher,重寫 process(WatchedEvent @event),因此就打印出了。關於Zookeeper的watcher後面會詳細介紹,不明白的沒關係,後面會經過代碼給你們演示。

(1)connectString:鏈接服務器的ip字符串,好比: "192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181"能夠是一個ip,也能夠是多個ip,一個ip表明單機,多個ip表明集羣,也能夠在ip後加路徑。

(2)sessionTimeout:超時時間,心跳收不到了,那就超時

(3)watcher:通知事件,若是有對應的事件觸發,則會收到一個通知;若是不須要,那就設置爲null,在上面的演示中,咱們設置了一個watcher。

(4)canBeReadOnly:可讀,當這個物理機節點斷開後,仍是能夠讀到數據的,只是不能寫,此時數據被讀取到的多是舊數據,此處建議設置爲false,不推薦使用。

(5)sessionId:會話的id

(6)sessionPasswd:會話密碼 當會話丟失後,能夠依據 sessionId 和 sessionPasswd 從新獲取會話。

好了,基本的參數已經介紹完畢,那麼,來解釋一下爲何在建立Zookeeper對象時添加下面這句代碼:

其實上面我已經解釋了,因爲客戶端和zk服務端連接是一個異步的過程,須要必定的時間間隔,因此,若是不添加效果這樣:

 

(2)zookeeper 恢復以前的會話鏈接演示

using org.apache.zookeeper;
using org.apache.zookeeper.data;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static org.apache.zookeeper.Watcher.Event;

namespace ZookeeperNetCore
{
    public class ZookeeperClient
    {
        public ZooKeeper ZK { get; set; }

        // 配置項
        public string QueryPath { get; set; }= "/Configuration";
        //節點狀態信息
        public Stat Stat { get; set; }

        // 配置數據
        public byte[] ConfigData { get; set; } = null;


        public ZookeeperClient(string serviceAddress, int timeout)
        {
            ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this));

        }

        public ZookeeperClient(string serviceAddress, int timeout, long sessionId, byte[] sessionPasswd)
        {
            ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher2(this), sessionId, sessionPasswd);

        }

        // 讀取節點的配置數據
        public async Task<string> ReadConfigDataAsync()
        {
            if (this.ZK == null)
            {
                return string.Empty;
            }

            var stat = await ZK.existsAsync(QueryPath, true);

            if (stat == null)
            {
                return string.Empty;
            }

            this.Stat = stat;

            var dataResult = await ZK.getDataAsync(QueryPath, true);
            
            return Encoding.UTF8.GetString(dataResult.Data);
        }

        public class ConfigServiceWatcher : Watcher
        {
            private ZookeeperClient _cs = null;

            public ConfigServiceWatcher(ZookeeperClient cs)
            {
                _cs = cs;
            }

            public override  async Task process(WatchedEvent @event)
            {
                Console.WriteLine($"Zookeeper連接成功:{@event.getState() == KeeperState.SyncConnected}");

                if (@event.get_Type() == EventType.NodeDataChanged)
                {
                    var data = await _cs.ReadConfigDataAsync();

                    Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改成【{2}】。", Environment.NewLine, _cs.QueryPath, data);
                }
            }
        }

        public class ConfigServiceWatcher2 : Watcher
        {
            private ZookeeperClient _cs = null;

            public ConfigServiceWatcher2(ZookeeperClient cs)
            {
                _cs = cs;
            }

            public override async Task process(WatchedEvent @event)
            {
                Console.WriteLine($"Zookeeper連接成功:{@event.getState() == KeeperState.SyncConnected}");

                if (@event.get_Type() == EventType.NodeDataChanged)
                {
                    var data = await _cs.ReadConfigDataAsync();

                    Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改成【{2}】。", Environment.NewLine, _cs.QueryPath, data);
                }
            }
        }
    }
}

 

  •  ZNode建立刪除修改查詢

代碼:

using org.apache.zookeeper;
using org.apache.zookeeper.data;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static org.apache.zookeeper.Watcher.Event;
using static org.apache.zookeeper.ZooDefs;

namespace ZookeeperNetCore
{
    public class ZookeeperClient
    {
        public ZooKeeper ZK { get; set; }

        // 配置項
        public string QueryPath { get; set; }= "/Configuration";
        //節點狀態信息
        public Stat Stat { get; set; }

        // 配置數據
        public byte[] ConfigData { get; set; } = null;


        public ZookeeperClient(string serviceAddress, int timeout)
        {
            ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this));

        }

        public ZookeeperClient(string serviceAddress, int timeout, long sessionId, byte[] sessionPasswd)
        {
            ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher2(this), sessionId, sessionPasswd);

        }

        // 讀取節點的配置數據
        public async Task<string> ReadConfigDataAsync()
        {
            if (this.ZK == null)
            {
                return string.Empty;
            }

            var stat = await ZK.existsAsync(QueryPath, true);

            if (stat == null)
            {
                return string.Empty;
            }

            this.Stat = stat;

            var dataResult = await ZK.getDataAsync(QueryPath, true);
            
            return Encoding.UTF8.GetString(dataResult.Data);
        }

        

        public class ConfigServiceWatcher : Watcher
        {
            private ZookeeperClient _cs = null;

            public ConfigServiceWatcher(ZookeeperClient cs)
            {
                _cs = cs;
            }

            public override  async Task process(WatchedEvent @event)
            {
                Console.WriteLine($"Zookeeper連接成功:{@event.getState() == KeeperState.SyncConnected}");

                if (@event.get_Type() == EventType.NodeDataChanged)
                {
                    var data = await _cs.ReadConfigDataAsync();

                    Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改成【{2}】。", Environment.NewLine, _cs.QueryPath, data);
                }
            }
        }

        public class ConfigServiceWatcher2 : Watcher
        {
            private ZookeeperClient _cs = null;

            public ConfigServiceWatcher2(ZookeeperClient cs)
            {
                _cs = cs;
            }

            public override async Task process(WatchedEvent @event)
            {
                Console.WriteLine($"Zookeeper連接成功:{@event.getState() == KeeperState.SyncConnected}");

                if (@event.get_Type() == EventType.NodeDataChanged)
                {
                    var data = await _cs.ReadConfigDataAsync();

                    Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改成【{2}】。", Environment.NewLine, _cs.QueryPath, data);
                }
            }
        }

        // 關閉ZooKeeper鏈接
        // 釋放資源
        public async Task Close()
        {
            if (this.ZK != null)
            {
               await ZK.closeAsync();
            }

            this.ZK = null;
        }


        
    }
}
using org.apache.zookeeper;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static org.apache.zookeeper.ZooDefs;

namespace ZookeeperNetCore
{
    class Program
    {
        public const int timeout = 5000;
        static async Task Main(string[] args)
        {
            var conf = new ZookeeperClient("", timeout);

            try
            {
                conf.QueryPath = "/UserName";

                Console.WriteLine("客戶端開始鏈接zookeeper服務器...");
                Console.WriteLine($"鏈接狀態:{conf.ZK.getState()}");
                Thread.Sleep(1000);//注意:爲何要加上這行代碼,若是不加會出現什麼問題
                Console.WriteLine($"鏈接狀態:{conf.ZK.getState()}");

                if (await conf.ZK.existsAsync(conf.QueryPath, false) == null)
                {
                    conf.ConfigData = Encoding.Default.GetBytes("guozheng");
                    await conf.ZK.createAsync(conf.QueryPath, conf.ConfigData, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }

                string configData = await conf.ReadConfigDataAsync();
                Console.WriteLine("節點【{0}】目前的值爲【{1}】。", conf.QueryPath, configData);
                Console.ReadLine();


                Random random = new Random((int)DateTime.Now.Ticks & 0x0000FFFF);
                conf.ConfigData = Encoding.UTF8.GetBytes(string.Format("Mike_{0}", random.Next(100)));

                await conf.ZK.setDataAsync(conf.QueryPath, conf.ConfigData, -1);

                Console.WriteLine("節點【{0}】的值已被修改成【{1}】。", conf.QueryPath, Encoding.UTF8.GetString(conf.ConfigData));

                Console.ReadLine();

                if (await conf.ZK.existsAsync(conf.QueryPath, false) != null)
                {
                    await conf.ZK.deleteAsync(conf.QueryPath, -1);

                    Console.WriteLine("已刪除此【{0}】節點。{1}", conf.QueryPath, Environment.NewLine);
                }

            }
            catch (Exception ex)
            {
                if (conf.ZK == null)
                {
                    Console.WriteLine("已關閉ZooKeeper的鏈接。");
                    Console.ReadLine();
                    return;
                }

                Console.WriteLine("拋出異常:{0}【{1}】。", Environment.NewLine, ex.ToString());
            }
            finally
            {
                await conf.Close();
                Console.WriteLine("已關閉ZooKeeper的鏈接。");
                Console.ReadLine();
            }



            ////開始會話重連
            //Console.WriteLine("開始會話重連...");

            //var conf2 = new ZookeeperClient("", timeout, sessionId, sessionPassword);

            //Console.WriteLine(conf2.ZK.getSessionId());
            //Console.WriteLine( Encoding.UTF8.GetString(conf2.ZK.getSessionPasswd()));

            //Console.WriteLine($"從新鏈接狀態zkSession:{conf2.ZK.getState()}");
            //Thread.Sleep(1000);//注意:爲何要加上這行代碼,若是不加會出現什麼問題
            //Console.WriteLine($"從新鏈接狀態zkSession:{conf2.ZK.getState()}");


            Console.ReadKey();
        }
    }
}

 

 解釋:

關於異步建立節點的方法,是不支持子節點的遞歸建立,參數介紹:

(1)path:建立的路徑

(2)data:存儲的數據的byte[]

(3)acl:控制權限策略   Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa      CREATOR_ALL_ACL --> auth:user:password:cdrwa

(4)createMode: 節點類型, 是一個枚舉    PERSISTENT:持久節點   PERSISTENT_SEQUENTIAL:持久順序節點   EPHEMERAL:臨時節點   EPHEMERAL_SEQUENTIAL:臨時順序節點

 關於上面參數引出來的知識點,須要幾章來說解,本篇文章先不介紹,後面會介紹。好了,關於.Net Core中使用Zookeeper的介紹就到這裏,關於上面演示的結果,我先拋出一個問題,你們能夠思考一下:爲何「Zookeeper連接成功:True」會輸出屢次?也就是咱們下節要討論的Zookeeper的watcher機制。時間到了,收拾行李,準備一下回家啦,先寫到這裏,祝你們新年快樂!但願對你有幫助,過完年來見!

 

3、總結

 可能有些地方解釋的不是太清楚,你們多多見諒,有些的不對的地方,但願能指正出來。

說明:演示代碼裏面使用的Zookeeper服務過一段時間能用,不能用的話,在評論區留言,後面用阿里雲本身搭建一個。

 代碼地址:

 https://github.com/guozheng007/ZookeeperNetCoreDemo

 

 

參考資料:

(1)張輝清:《中小研發團隊架構實踐》

(2) 風間影月:《ZooKeeper分佈式專題與Dubbo微服務入門》

 (3)sunddenly:http://www.cnblogs.com/sunddenly/p/4033574.html

做者:郭崢

出處:http://www.cnblogs.com/runningsmallguo/

本文版權歸做者和博客園共有,歡迎轉載,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文連接。

相關文章
相關標籤/搜索