(九)分佈式服務----Zookeeper註冊中心

 ==>>點擊查看本系列文章目錄html

 

首先看一下幾種註冊中心:

最老的就是Zookeeper了, 比較新的有Eureka,Consul 均可以作註冊中心。能夠自行搜索對比三者的優缺點。java

Zookeeper 最開始就是hadoop你們族中的一員,用於作協調的框架,後來已是apache的子項目了。node

幾年前大數據很火的時候,只要學hadoop必學zookeeper,固然還有其餘成員。數據庫

大數據簡單說就是分佈式,好比分佈式文件存儲hdfs,分佈式數據庫hbase,分佈式協調zookeeper,還有kafka,Flume等等都是hadoop你們族。apache

zookeeper,如今更多被用來作註冊中心,好比阿里的開源SOA框架dubbo就常常搭配zookeeper作註冊中心。緩存

Eureka:java的微服務框架Spring Cloud中內部已經集成了Eureka註冊中心。服務器

我選擇zookeeper,不是由於他比另外兩個強,而是由於我幾年前就已經學習過一些zookeeper的原理,上手更容易。網絡上學習書籍、資料、視頻教程也特別多,學習資料完善。網絡

 

註冊中心的基本功能:

1. 註冊服務,有點相似DNS,全部的服務註冊到註冊中心,包含服務的地址等信息。session

2. 服務訂閱,客戶端請求服務,註冊中心就要把那些能用的服務器地址告訴客戶端,服務端有變更時,註冊中心也能及時通知到客戶端。app

3. 性能好且高可用,註冊中心自身也是一個集羣,若是隻有一個註冊中心機器的話那豈不是把註冊中心累死啊,並且他一旦壞了之後,那客戶端都找不到服務器了。全部註冊中心就有不少臺,其中只有一個老大(leader),老大用來寫,小弟用來讀。就是說老大來決定一臺服務器能不能註冊進來,小弟負責幫助客戶端查找服務器。由於註冊服務的次數是不多的,一般有新服務器加入才須要註冊,可是客戶端訂閱那就不少了,因此註冊中心只有一個leader。leader萬一壞掉的話,會從小弟中選舉出一個來當老大接替工做。

 

上面提到說zookeeper集羣,就是說有不少臺機器作zookeeper機器,可是這些機器裏存儲的東西基本上都是同樣的,就是說客戶端無論連到哪一個zookeeper 都是同樣的,能作服務訂閱。

每個zookeeper 中都有不少節點(Znode)。

接下來講的zookeeper節點和集羣徹底不是一回事。 有些人喜歡吧集羣中的每一臺zookeeper機器稱爲一個節點,可是這個節點(zookeeper機器)和我說的節點(Znode)徹底不是一回事。

以下圖:

 

 本例的圖中能夠看到,一共有5臺機器,每臺機器都有5個znode,Znode下面的子節點就更多了。

先看5臺機器:

一臺leader,老大,上文已經介紹,服務都從這些註冊寫入。

兩臺follower,小弟,平時用於服務訂閱,老大掛掉之後,follower內部就會自行選出老大。

兩臺observer,觀察者,就是屬於無業遊民,只能看,沒有選老大的資格,不能參與競選也不能投票,惟一的功能就是服務訂閱。

  observer模式須要手動開啓,爲何會出現observer呢,是由於機器太多的話,每一個機器都有選舉權的話特別影響性能。全中國14億人口,每一個人都參與國家競選的話,效率極低。因此呢,選舉的工做就交給follower完成就好了,只須要確保一直都有leader接班人就好。

 

再看看zookeeper有什麼基本功能:

基本功能很簡單,組合之後卻能夠完成各類複雜工做。

1. 能夠建立:臨時節點(斷開鏈接時便刪除節點) 和 持久化節點(必須手動刪除節點)。

2. 能夠建立:無序節點 和 有序節點。

3. 節點上能夠添加watcher監聽功能,監聽該節點的增刪改,而後觸發自定義的事件。

 

看看這些功能怎麼用:

1. 節點: 每次註冊一個服務就建立一個節點,節點的名稱(Key)就是服務的名稱,服務的詳細信息存儲在節點value中,客戶端經過key找到對應的節點,再找打節點中的value。

2. 臨時節點:服務端註冊一個服務時建立一個臨時節點,服務斷開時,臨時節點自動銷燬,自動完成服務註銷。

3. watcher監聽: 客戶端在註冊中心訂閱了一個服務的時候,同時在這個服務所在的節點上加一個監聽事件,每當服務節點信息有變化的時候,註冊中心會自動回調通知客戶端。

4. 有序臨時節點:分佈式鎖或者分佈式隊列(這裏與服務註冊無關),客戶端1想要操做一條數據的時候,在A節點下建立一個有序臨時節點,自動分配編號001;客戶端1也要操做該數據的時候,在A節點下也建立一個有序臨時節點,自動分配編號002。只有編號最小的子節點纔會被執行,所以001節點會被執行,客戶端1執行完畢後,自動刪除001節點,此時002編號爲最小子節點。即鎖的概念,不能同時操做同一數據;也能夠作隊列,按照前後順序依次執行。

5. 有序臨時節點+watcher監聽: 上面第4條中說到每次執行編號最小的節點,所以須要有一個程序,每次都須要遍歷所有節點,而後找出最小的節點,假如是002節點,這時客戶端2開始執行。可是添加監聽機制之後就不同了,002監聽001,003監聽比他小一號的002,這樣001銷燬的同時通知002開始執行,002銷燬的時候通知003開始執行,不須要遍歷最小節點,也能有序依次執行。

6. 臨時節點+watcher監聽: 集羣master選舉以及高可用。好比hadoop集羣,也有一個resourcemanager資源管理器,負責調度其它節點機器,至關於hadoop集羣的leader節點。這個leader就能夠交由zookeeper管理,全部的hadoop機器同時在zookeeper中建立一個同名的臨時節點,因爲是同名互斥的節點,所以只有一個節點能被建立,成功建立這個節點的hadoop機器就是leader。同時添加Watcher監聽,這個leader只要斷開鏈接,臨時節點自動銷燬,觸發監聽,其它hadoop開始新一輪的master選舉。這也是zookeeper最初在hadoop家族中的重要使命。

7....... 還要不少地方都能用zookeeper,簡直無所不能,並且自身也是高可用,高性能,牛x

 

zookeeper自己的操做仍是很簡單的,無非就是節點的增刪改查,能夠選擇要建立節點的類型,還有就是在節點上添加watcher監聽器。就這些。

 

文件結構:

 

上代碼:

zookeeper客戶端管理類:

public class ZookeeperClientProvider
    {
        private ConfigInfo _config;
        private readonly ILogger<ZookeeperClientProvider> _logger;
        private readonly Dictionary<string, ZooKeeper> _zookeeperClients = new Dictionary<string, ZooKeeper>();

        public ZookeeperClientProvider(ConfigInfo config, ILogger<ZookeeperClientProvider> logger)
        {
            _config = config;
            _logger = logger;
        }

        public async Task<ZooKeeper> GetZooKeeper()
        {
            return await CreateZooKeeper(_config.Addresses.FirstOrDefault());
        }
        public async Task<ZooKeeper> CreateZooKeeper(string address)
        {
            if (!_zookeeperClients.TryGetValue(address, out ZooKeeper result))
            {
                await Task.Run(() =>
                {
                    result = new ZooKeeper(address, (int)_config.SessionTimeout.TotalMilliseconds,
                        new ReconnectionWatcher(
                            async () =>
                            {
                                if (_zookeeperClients.Remove(address, out ZooKeeper value))
                                {
                                    await value.closeAsync();
                                }
                                await CreateZooKeeper(address);
                            }));
                    _zookeeperClients.TryAdd(address, result);
                });
            }
            return result;
        }

        public async Task<IEnumerable<ZooKeeper>> GetZooKeepers()
        {
            var result = new List<ZooKeeper>();
            foreach (var address in _config.Addresses)
            {
                result.Add(await CreateZooKeeper(address));
            }
            return result;
        }
    }
ZookeeperClientProvider

zookeeper服務註冊類:

/// <summary>
    /// 一個抽象的服務路由發現者。
    /// </summary>
    public interface IServiceRouteManager
    {

        /// <summary>
        /// 服務路由被建立。
        /// </summary>
        event EventHandler<ServiceRouteEventArgs> Created;

        /// <summary>
        /// 服務路由被刪除。
        /// </summary>
        event EventHandler<ServiceRouteEventArgs> Removed;

        /// <summary>
        /// 服務路由被修改。
        /// </summary>
        event EventHandler<ServiceRouteChangedEventArgs> Changed;

        /// <summary>
        /// 獲取全部可用的服務路由信息。
        /// </summary>
        /// <returns>服務路由集合。</returns>
        Task<IEnumerable<ServiceRoute>> GetRoutesAsync();

        /// <summary>
        /// 設置服務路由。
        /// </summary>
        /// <param name="routes">服務路由集合。</param>
        /// <returns>一個任務。</returns>
        Task SetRoutesAsync(IEnumerable<ServiceRoute> routes);

        /// <summary>
        /// 移除地址列表
        /// </summary>
        /// <param name="routes">地址列表。</param>
        /// <returns>一個任務。</returns>
        Task RemveAddressAsync(IEnumerable<string> Address);
        /// <summary>
        /// 清空全部的服務路由。
        /// </summary>
        /// <returns>一個任務。</returns>
        Task ClearAsync();
    }

    /// <summary>
    /// 服務路由事件參數。
    /// </summary>
    public class ServiceRouteEventArgs
    {
        public ServiceRouteEventArgs(ServiceRoute route)
        {
            Route = route;
        }

        /// <summary>
        /// 服務路由信息。
        /// </summary>
        public ServiceRoute Route { get; private set; }
    }

    /// <summary>
    /// 服務路由變動事件參數。
    /// </summary>
    public class ServiceRouteChangedEventArgs : ServiceRouteEventArgs
    {
        public ServiceRouteChangedEventArgs(ServiceRoute route, ServiceRoute oldRoute) : base(route)
        {
            OldRoute = oldRoute;
        }

        /// <summary>
        /// 舊的服務路由信息。
        /// </summary>
        public ServiceRoute OldRoute { get; set; }
    }
IServiceRouteManager
public class ZooKeeperServiceRouteManager : IServiceRouteManager, IDisposable
    {
        private readonly ConfigInfo _configInfo;
        private readonly ISerializer<byte[]> _serializer;
        private readonly ILogger<ZooKeeperServiceRouteManager> _logger;
        private ServiceRoute[] _routes;
        private readonly ZookeeperClientProvider _zookeeperClientProvider;

        public ZooKeeperServiceRouteManager(ConfigInfo configInfo, ISerializer<byte[]> serializer,
            ISerializer<string> stringSerializer,
            ILogger<ZooKeeperServiceRouteManager> logger,
            ZookeeperClientProvider zookeeperClientProvider)
        {
            _configInfo = configInfo;
            _serializer = serializer;
            _logger = logger;
            _zookeeperClientProvider = zookeeperClientProvider;
            EnterRoutes().Wait();
        }

        private EventHandler<ServiceRouteEventArgs> _created;
        private EventHandler<ServiceRouteEventArgs> _removed;
        private EventHandler<ServiceRouteChangedEventArgs> _changed;

        /// <summary>
        /// 服務路由被建立。
        /// </summary>
        public event EventHandler<ServiceRouteEventArgs> Created
        {
            add { _created += value; }
            remove { _created -= value; }
        }

        /// <summary>
        /// 服務路由被刪除。
        /// </summary>
        public event EventHandler<ServiceRouteEventArgs> Removed
        {
            add { _removed += value; }
            remove { _removed -= value; }
        }

        /// <summary>
        /// 服務路由被修改。
        /// </summary>
        public event EventHandler<ServiceRouteChangedEventArgs> Changed
        {
            add { _changed += value; }
            remove { _changed -= value; }
        }



        protected void OnCreated(params ServiceRouteEventArgs[] args)
        {
            if (_created == null)
                return;

            foreach (var arg in args)
                _created(this, arg);
        }

        protected void OnChanged(params ServiceRouteChangedEventArgs[] args)
        {
            if (_changed == null)
                return;

            foreach (var arg in args)
                _changed(this, arg);
        }

        protected void OnRemoved(params ServiceRouteEventArgs[] args)
        {
            if (_removed == null)
                return;

            foreach (var arg in args)
                _removed(this, arg);
        }


        /// <summary>
        /// 獲取全部可用的服務路由信息。
        /// </summary>
        /// <returns>服務路由集合。</returns>
        public async Task<IEnumerable<ServiceRoute>> GetRoutesAsync()
        {
            await EnterRoutes();
            return _routes;
        }

        /// <summary>
        /// 清空全部的服務路由。
        /// </summary>
        /// <returns>一個任務。</returns>
        public async Task ClearAsync()
        {
            if (_logger.IsEnabled(LogLevel.Information))
                _logger.LogInformation("準備清空全部路由配置。");
            var zooKeepers = await _zookeeperClientProvider.GetZooKeepers();
            foreach (var zooKeeper in zooKeepers)
            {
                var path = _configInfo.RoutePath;
                var childrens = path.Split(new[] { '/' }, StringSplitOptions.RemoveEmptyEntries);

                var index = 0;
                while (childrens.Count() > 1)
                {
                    var nodePath = "/" + string.Join("/", childrens);

                    if (await zooKeeper.existsAsync(nodePath) != null)
                    {
                        var result = await zooKeeper.getChildrenAsync(nodePath);
                        if (result?.Children != null)
                        {
                            foreach (var child in result.Children)
                            {
                                var childPath = $"{nodePath}/{child}";
                                if (_logger.IsEnabled(LogLevel.Debug))
                                    _logger.LogDebug($"準備刪除:{childPath}。");
                                await zooKeeper.deleteAsync(childPath);
                            }
                        }
                        if (_logger.IsEnabled(LogLevel.Debug))
                            _logger.LogDebug($"準備刪除:{nodePath}。");
                        await zooKeeper.deleteAsync(nodePath);
                    }
                    index++;
                    childrens = childrens.Take(childrens.Length - index).ToArray();
                }
                if (_logger.IsEnabled(LogLevel.Information))
                    _logger.LogInformation("路由配置清空完成。");
            }
        }

        /// <summary>
        /// 設置服務路由。
        /// </summary>
        /// <param name="routes">服務路由集合。</param>
        /// <returns>一個任務。</returns>
        public async Task SetRoutesAsync(IEnumerable<ServiceRoute> routes)
        {
            var hostAddr = NetUtils.GetHostAddress();
            var serviceRoutes = await GetRoutes(routes.Select(p => p.serviceRouteDescriptor.Id));
            if (serviceRoutes.Count() > 0)
            {
                foreach (var route in routes)
                {
                    var serviceRoute = serviceRoutes.Where(p => p.serviceRouteDescriptor.Id == route.serviceRouteDescriptor.Id).FirstOrDefault();
                    if (serviceRoute != null)
                    {
                        var addresses = serviceRoute.Address.Concat(
                          route.Address.Except(serviceRoute.Address)).ToList();

                        foreach (var address in route.Address)
                        {
                            addresses.Remove(addresses.Where(p => p.ToString() == address.ToString()).FirstOrDefault());
                            addresses.Add(address);
                        }
                        route.Address = addresses;
                    }
                }
            }
            await RemoveExceptRoutesAsync(routes, hostAddr);

            if (_logger.IsEnabled(LogLevel.Information))
                _logger.LogInformation("準備添加服務路由。");
            var zooKeepers = await _zookeeperClientProvider.GetZooKeepers();
            foreach (var zooKeeper in zooKeepers)
            {
                await CreateSubdirectory(zooKeeper, _configInfo.RoutePath);

                var path = _configInfo.RoutePath;
                if (!path.EndsWith("/"))
                    path += "/";

                routes = routes.ToArray();

                foreach (var serviceRoute in routes)
                {
                    var nodePath = $"{path}{serviceRoute.serviceRouteDescriptor.Id}";
                    var nodeData = _serializer.Serialize(serviceRoute);
                    if (await zooKeeper.existsAsync(nodePath) == null)
                    {
                        if (_logger.IsEnabled(LogLevel.Debug))
                            _logger.LogDebug($"節點:{nodePath}不存在將進行建立。");

                        await zooKeeper.createAsync(nodePath, nodeData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    }
                    else
                    {
                        if (_logger.IsEnabled(LogLevel.Debug))
                            _logger.LogDebug($"將更新節點:{nodePath}的數據。");

                        var onlineData = (await zooKeeper.getDataAsync(nodePath)).Data;
                        if (!DataEquals(nodeData, onlineData))
                            await zooKeeper.setDataAsync(nodePath, nodeData);
                    }
                }
                if (_logger.IsEnabled(LogLevel.Information))
                    _logger.LogInformation("服務路由添加成功。");
            }
        }

        public async Task RemveAddressAsync(IEnumerable<string> Address)
        {
            var routes = await GetRoutesAsync();
            foreach (var route in routes)
            {
                route.Address = route.Address.Except(Address);
            }
            await SetRoutesAsync(routes);
        }

        private async Task RemoveExceptRoutesAsync(IEnumerable<ServiceRoute> routes, string hostAddr)
        {
            var path = _configInfo.RoutePath;
            if (!path.EndsWith("/"))
                path += "/";
            routes = routes.ToArray();
            var zooKeepers = await _zookeeperClientProvider.GetZooKeepers();
            foreach (var zooKeeper in zooKeepers)
            {
                if (_routes != null)
                {
                    var oldRouteIds = _routes.Select(i => i.serviceRouteDescriptor.Id).ToArray();
                    var newRouteIds = routes.Select(i => i.serviceRouteDescriptor.Id).ToArray();
                    var deletedRouteIds = oldRouteIds.Except(newRouteIds).ToArray();
                    foreach (var deletedRouteId in deletedRouteIds)
                    {
                        var addresses = _routes.Where(p => p.serviceRouteDescriptor.Id == deletedRouteId).Select(p => p.Address).FirstOrDefault();
                        if (addresses.Contains(hostAddr))
                        {
                            var nodePath = $"{path}{deletedRouteId}";
                            await zooKeeper.deleteAsync(nodePath);
                        }
                    }
                }
            }
        }

        private async Task CreateSubdirectory(ZooKeeper zooKeeper, string path)
        {
            if (await zooKeeper.existsAsync(path) != null)
                return;

            if (_logger.IsEnabled(LogLevel.Information))
                _logger.LogInformation($"節點{path}不存在,將進行建立。");

            var childrens = path.Split(new[] { '/' }, StringSplitOptions.RemoveEmptyEntries);
            var nodePath = "/";

            foreach (var children in childrens)
            {
                nodePath += children;
                if (await zooKeeper.existsAsync(nodePath) == null)
                {
                    await zooKeeper.createAsync(nodePath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
                nodePath += "/";
            }
        }

        private async Task<ServiceRoute> GetRoute(byte[] data)
        {
            if (_logger.IsEnabled(LogLevel.Debug))
                _logger.LogDebug($"準備轉換服務路由,配置內容:{Encoding.UTF8.GetString(data)}。");

            if (data == null)
                return null;

            return await Task.Run(() =>
            {
                return _serializer.Deserialize<ServiceRoute>(data);
            });
        }

        private async Task<ServiceRoute> GetRoute(string path)
        {
            ServiceRoute result = null;
            var zooKeeper = await GetZooKeeper();
            var watcher = new NodeMonitorWatcher(GetZooKeeper(), path,
                 async (oldData, newData) => await NodeChange(oldData, newData));
            if (await zooKeeper.existsAsync(path) != null)
            {
                var data = (await zooKeeper.getDataAsync(path, watcher)).Data;
                watcher.SetCurrentData(data);
                result = await GetRoute(data);
            }
            return result;
        }

        private async Task<ServiceRoute[]> GetRoutes(IEnumerable<string> childrens)
        {
            var rootPath = _configInfo.RoutePath;
            if (!rootPath.EndsWith("/"))
                rootPath += "/";

            childrens = childrens.ToArray();
            var routes = new List<ServiceRoute>(childrens.Count());

            foreach (var children in childrens)
            {
                if (_logger.IsEnabled(LogLevel.Debug))
                    _logger.LogDebug($"準備從節點:{children}中獲取路由信息。");

                var nodePath = $"{rootPath}{children}";
                var route = await GetRoute(nodePath);
                if (route != null)
                    routes.Add(route);
            }

            return routes.ToArray();
        }

        private async Task EnterRoutes()
        {
            if (_routes != null)
                return;
            var zooKeeper = await GetZooKeeper();
            var watcher = new ChildrenMonitorWatcher(GetZooKeeper(), _configInfo.RoutePath,
             async (oldChildrens, newChildrens) => await ChildrenChange(oldChildrens, newChildrens));
            if (await zooKeeper.existsAsync(_configInfo.RoutePath, watcher) != null)
            {
                var result = await zooKeeper.getChildrenAsync(_configInfo.RoutePath, watcher);
                var childrens = result.Children.ToArray();
                watcher.SetCurrentData(childrens);
                _routes = await GetRoutes(childrens);
            }
            else
            {
                if (_logger.IsEnabled(LogLevel.Warning))
                    _logger.LogWarning($"沒法獲取路由信息,由於節點:{_configInfo.RoutePath},不存在。");
                _routes = new ServiceRoute[0];
            }
        }

        private static bool DataEquals(IReadOnlyList<byte> data1, IReadOnlyList<byte> data2)
        {
            if (data1.Count != data2.Count)
                return false;
            for (var i = 0; i < data1.Count; i++)
            {
                var b1 = data1[i];
                var b2 = data2[i];
                if (b1 != b2)
                    return false;
            }
            return true;
        }

        public async Task NodeChange(byte[] oldData, byte[] newData)
        {
            if (DataEquals(oldData, newData))
                return;

            var newRoute = await GetRoute(newData);
            //獲得舊的路由。
            var oldRoute = _routes.FirstOrDefault(i => i.serviceRouteDescriptor.Id == newRoute.serviceRouteDescriptor.Id);

            lock (_routes)
            {
                //刪除舊路由,並添加上新的路由。
                _routes =
                    _routes
                        .Where(i => i.serviceRouteDescriptor.Id != newRoute.serviceRouteDescriptor.Id)
                        .Concat(new[] { newRoute }).ToArray();
            }

            //觸發路由變動事件。
            OnChanged(new ServiceRouteChangedEventArgs(newRoute, oldRoute));
        }

        public async Task ChildrenChange(string[] oldChildrens, string[] newChildrens)
        {
            if (_logger.IsEnabled(LogLevel.Debug))
                _logger.LogDebug($"最新的節點信息:{string.Join(",", newChildrens)}");

            if (_logger.IsEnabled(LogLevel.Debug))
                _logger.LogDebug($"舊的節點信息:{string.Join(",", oldChildrens)}");

            //計算出已被刪除的節點。
            var deletedChildrens = oldChildrens.Except(newChildrens).ToArray();
            //計算出新增的節點。
            var createdChildrens = newChildrens.Except(oldChildrens).ToArray();

            if (_logger.IsEnabled(LogLevel.Debug))
                _logger.LogDebug($"須要被刪除的路由節點:{string.Join(",", deletedChildrens)}");
            if (_logger.IsEnabled(LogLevel.Debug))
                _logger.LogDebug($"須要被添加的路由節點:{string.Join(",", createdChildrens)}");

            //獲取新增的路由信息。
            var newRoutes = (await GetRoutes(createdChildrens)).ToArray();

            var routes = _routes.ToArray();
            lock (_routes)
            {
                _routes = _routes
                    //刪除無效的節點路由。
                    .Where(i => !deletedChildrens.Contains(i.serviceRouteDescriptor.Id))
                    //鏈接上新的路由。
                    .Concat(newRoutes)
                    .ToArray();
            }
            //須要刪除的路由集合。
            var deletedRoutes = routes.Where(i => deletedChildrens.Contains(i.serviceRouteDescriptor.Id)).ToArray();
            //觸發刪除事件。
            OnRemoved(deletedRoutes.Select(route => new ServiceRouteEventArgs(route)).ToArray());

            //觸發路由被建立事件。
            OnCreated(newRoutes.Select(route => new ServiceRouteEventArgs(route)).ToArray());

            if (_logger.IsEnabled(LogLevel.Information))
                _logger.LogInformation("路由數據更新成功。");
        }


        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
        }

        private async Task<ZooKeeper> GetZooKeeper()
        {
            return await _zookeeperClientProvider.GetZooKeeper();
        }

    }
ZooKeeperServiceRouteManager

zookeeper鏈接配置類:

public class ConfigInfo
    {
        /// <summary>
        /// 初始化會話超時爲20秒的Zookeeper配置信息。
        /// </summary>
        /// <param name="connectionString">鏈接字符串。</param>
        /// <param name="routePath">路由配置路徑。</param>
        /// <param name="subscriberPath">訂閱者配置路徑</param>
        /// <param name="commandPath">服務命令配置路徑</param>
        /// <param name="cachePath">緩存中心配置路徑</param>
        /// <param name="mqttRoutePath">mqtt路由配置路徑</param>
        /// <param name="chRoot">根節點。</param>
        public ConfigInfo(string connectionString, string routePath = "/services/serviceRoutes",
            string subscriberPath = "/services/serviceSubscribers",
            string commandPath = "/services/serviceCommands",
            string cachePath = "/services/serviceCaches",
            string mqttRoutePath = "/services/mqttServiceRoutes",
            string chRoot = null,
            bool reloadOnChange = false, bool enableChildrenMonitor = false) : this(connectionString,
                TimeSpan.FromSeconds(20),
                routePath,
                subscriberPath,
                commandPath,
                cachePath,
                mqttRoutePath,
                chRoot,
                reloadOnChange, enableChildrenMonitor)
        {
        }

        /// <summary>
        /// 初始化Zookeeper配置信息。
        /// </summary>
        /// <param name="connectionString">鏈接字符串。</param>
        /// <param name="routePath">路由配置路徑。</param>
        /// <param name="commandPath">服務命令配置路徑</param>
        /// <param name="subscriberPath">訂閱者配置路徑</param>
        /// <param name="sessionTimeout">會話超時時間。</param>
        /// <param name="cachePath">緩存中心配置路徑</param>
        /// <param name="mqttRoutePath">mqtt路由配置路徑</param>
        /// <param name="chRoot">根節點。</param>
        public ConfigInfo(string connectionString, TimeSpan sessionTimeout, string routePath = "/services/serviceRoutes",
            string subscriberPath = "/services/serviceSubscribers",
            string commandPath = "/services/serviceCommands",
            string cachePath = "/services/serviceCaches",
            string mqttRoutePath = "/services/mqttServiceRoutes",
            string chRoot = null,
            bool reloadOnChange = false, bool enableChildrenMonitor = false)
        {
            CachePath = cachePath;
            ReloadOnChange = reloadOnChange;
            ChRoot = chRoot;
            CommandPath = commandPath;
            SubscriberPath = subscriberPath;
            ConnectionString = connectionString;
            RoutePath = routePath;
            SessionTimeout = sessionTimeout;
            MqttRoutePath = mqttRoutePath;
            EnableChildrenMonitor = enableChildrenMonitor;
            Addresses = connectionString?.Split(",");
        }

        public bool EnableChildrenMonitor { get; set; }

        public bool ReloadOnChange { get; set; }

        /// <summary>
        /// 鏈接字符串。
        /// </summary>
        public string ConnectionString { get; set; }

        /// <summary>
        /// 命令配置路徑
        /// </summary>
        public string CommandPath { get; set; }

        /// <summary>
        /// 路由配置路徑。
        /// </summary>
        public string RoutePath { get; set; }

        /// <summary>
        /// 訂閱者配置路徑
        /// </summary>
        public string SubscriberPath { get; set; }

        /// <summary>
        /// 會話超時時間。
        /// </summary>
        public TimeSpan SessionTimeout { get; set; }

        /// <summary>
        /// 根節點。
        /// </summary>
        public string ChRoot { get; set; }


        public IEnumerable<string> Addresses { get; set; }

        /// <summary>
        /// 緩存中心配置中心
        /// </summary>
        public string CachePath { get; set; }


        /// <summary>
        /// Mqtt路由配置路徑。
        /// </summary>
        public string MqttRoutePath { get; set; }
    }
ConfigInfo

路由和路由描述:

public class ServiceRoute
    {
        /// <summary>
        /// 服務可用地址。
        /// </summary>
        public IEnumerable<string> Address { get; set; }
        /// <summary>
        /// 服務描述符。
        /// </summary>
        public ServiceRouteDescriptor serviceRouteDescriptor { get; set; }

        #region Equality members

        /// <summary>Determines whether the specified object is equal to the current object.</summary>
        /// <returns>true if the specified object  is equal to the current object; otherwise, false.</returns>
        /// <param name="obj">The object to compare with the current object. </param>
        public override bool Equals(object obj)
        {
            var model = obj as ServiceRoute;
            if (model == null)
                return false;

            if (obj.GetType() != GetType())
                return false;

            if (model.serviceRouteDescriptor != serviceRouteDescriptor)
                return false;

            return model.Address.Count() == Address.Count() && model.Address.All(addressModel => Address.Contains(addressModel));
        }

        /// <summary>Serves as the default hash function. </summary>
        /// <returns>A hash code for the current object.</returns>
        public override int GetHashCode()
        {
            return ToString().GetHashCode();
        }

        public static bool operator ==(ServiceRoute model1, ServiceRoute model2)
        {
            return Equals(model1, model2);
        }

        public static bool operator !=(ServiceRoute model1, ServiceRoute model2)
        {
            return !Equals(model1, model2);
        }

        #endregion Equality members
    }
ServiceRoute
/// <summary>
    /// 服務描述符。
    /// </summary>
    [Serializable]
    public class ServiceRouteDescriptor
    {
        /// <summary>
        /// 初始化一個新的服務描述符。
        /// </summary>
        public ServiceRouteDescriptor()
        {
            Metadatas = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase);
        }

        /// <summary>
        /// 服務Id。
        /// </summary>
        public string Id { get; set; }

        /// <summary>
        /// 訪問的令牌
        /// </summary>
        public string Token { get; set; }

        /// <summary>
        /// 路由
        /// </summary>
        public string RoutePath { get; set; }

        /// <summary>
        /// 元數據。
        /// </summary> 
        public IDictionary<string, object> Metadatas { get; set; }

        /// <summary>
        /// 獲取一個元數據。
        /// </summary>
        /// <typeparam name="T">元數據類型。</typeparam>
        /// <param name="name">元數據名稱。</param>
        /// <param name="def">若是指定名稱的元數據不存在則返回這個參數。</param>
        /// <returns>元數據值。</returns>
        public T GetMetadata<T>(string name, T def = default(T))
        {
            if (!Metadatas.ContainsKey(name))
                return def;

            return (T)Metadatas[name];
        }

        #region Equality members

        /// <summary>Determines whether the specified object is equal to the current object.</summary>
        /// <returns>true if the specified object  is equal to the current object; otherwise, false.</returns>
        /// <param name="obj">The object to compare with the current object. </param>
        public override bool Equals(object obj)
        {
            var model = obj as ServiceRouteDescriptor;
            if (model == null)
                return false;

            if (obj.GetType() != GetType())
                return false;

            if (model.Id != Id)
                return false;

            return model.Metadatas.Count == Metadatas.Count && model.Metadatas.All(metadata =>
            {
                object value;
                if (!Metadatas.TryGetValue(metadata.Key, out value))
                    return false;

                if (metadata.Value == null && value == null)
                    return true;
                if (metadata.Value == null || value == null)
                    return false;

                return metadata.Value.Equals(value);
            });
        }

        /// <summary>Serves as the default hash function. </summary>
        /// <returns>A hash code for the current object.</returns>
        public override int GetHashCode()
        {
            return ToString().GetHashCode();
        }

        public static bool operator ==(ServiceRouteDescriptor model1, ServiceRouteDescriptor model2)
        {
            return Equals(model1, model2);
        }

        public static bool operator !=(ServiceRouteDescriptor model1, ServiceRouteDescriptor model2)
        {
            return !Equals(model1, model2);
        }

        #endregion Equality members
    }
ServiceRouteDescriptor

Watcher監聽器:

子節點監聽器:

internal class ChildrenMonitorWatcher : Watcher
    {
        private readonly Task<ZooKeeper> _zooKeeperCall;
        private readonly string _path;
        private readonly Action<string[], string[]> _action;
        private string[] _currentData = new string[0];

        public ChildrenMonitorWatcher(Task<ZooKeeper> zooKeeperCall, string path, Action<string[], string[]> action)
        {
            _zooKeeperCall = zooKeeperCall;
            _path = path;
            _action = action;
        }

        public ChildrenMonitorWatcher SetCurrentData(string[] currentData)
        {
            _currentData = currentData ?? new string[0];

            return this;
        }

        #region Overrides of WatcherBase

        public override async Task process(WatchedEvent watchedEvent)
        {
            if (watchedEvent.getState() != Event.KeeperState.SyncConnected || watchedEvent.getPath() != _path)
                return;
            var zooKeeper = await _zooKeeperCall;
            //Func<ChildrenMonitorWatcher> getWatcher = () => new ChildrenMonitorWatcher(_zooKeeperCall, path, _action);
            Task<ChildrenMonitorWatcher> getWatcher =  Task.Run(() => {return new ChildrenMonitorWatcher(_zooKeeperCall, _path, _action); });
            switch (watchedEvent.get_Type())
            {
                //建立以後開始監視下面的子節點狀況。
                case Event.EventType.NodeCreated:
                    await zooKeeper.getChildrenAsync(_path, await getWatcher);
                    break;

                //子節點修改則繼續監控子節點信息並通知客戶端數據變動。
                case Event.EventType.NodeChildrenChanged:
                    try
                    {
                        var watcher = await getWatcher;
                        var result = await zooKeeper.getChildrenAsync(_path, watcher);
                        var childrens = result.Children.ToArray();
                        _action(_currentData, childrens);
                        watcher.SetCurrentData(childrens);
                    }
                    catch (KeeperException.NoNodeException)
                    {
                        _action(_currentData, new string[0]);
                    }
                    break;

                //刪除以後開始監控自身節點,並通知客戶端數據被清空。
                case Event.EventType.NodeDeleted:
                    {
                        var watcher = await getWatcher;
                        await zooKeeper.existsAsync(_path, watcher);
                        _action(_currentData, new string[0]);
                        watcher.SetCurrentData(new string[0]);
                    }
                    break;
            }
        }
        #endregion Overrides of WatcherBase
    }
ChildrenMonitorWatcher

當前節點監聽器:

internal class NodeMonitorWatcher : Watcher
    {
        private readonly Task<ZooKeeper> _zooKeeperCall;
        private readonly string _path;
        private readonly Action<byte[], byte[]> _action;
        private byte[] _currentData;

        public NodeMonitorWatcher(Task<ZooKeeper> zooKeeperCall, string path, Action<byte[], byte[]> action)
        {
            _zooKeeperCall = zooKeeperCall;
            _path = path;
            _action = action;
        }

        public NodeMonitorWatcher SetCurrentData(byte[] currentData)
        {
            _currentData = currentData;

            return this;
        }

        #region Overrides of WatcherBase

        public override async Task process(WatchedEvent watchedEvent)
        {
            switch (watchedEvent.get_Type())
            {
                case Event.EventType.NodeDataChanged:
                    var zooKeeper = await _zooKeeperCall;
                    var watcher = new NodeMonitorWatcher(_zooKeeperCall, _path, _action);
                    var data = await zooKeeper.getDataAsync(_path, watcher);
                    var newData = data.Data;
                    _action(_currentData, newData);
                    watcher.SetCurrentData(newData);
                    break;
            }
        }

        #endregion Overrides of WatcherBase
    }
NodeMonitorWatcher

鏈接斷開監聽器:

internal class ReconnectionWatcher : Watcher
    {
        private readonly Action _reconnection;

        public ReconnectionWatcher(Action reconnection)
        {
            _reconnection = reconnection;
        }

        #region Overrides of Watcher

        /// <summary>Processes the specified event.</summary>
        /// <param name="watchedEvent">The event.</param>
        /// <returns></returns>
        public override async Task process(WatchedEvent watchedEvent)
        {
            var state = watchedEvent.getState();
            switch (state)
            {
                case Event.KeeperState.Expired:
                case Event.KeeperState.Disconnected:
                    {
                        _reconnection();
                        break;
                    }
            }
            await Task.CompletedTask;
        }

        #endregion Overrides of Watcher
    }
ReconnectionWatcher
相關文章
相關標籤/搜索