[奇思異想]使用Zookeeper管理數據庫鏈接串

背景

  有一套特定規格的應用(程序+數據庫),當有業務需求時,就須要多部署應用,而且全部的應用都使用一個共同的後臺來管理。應用新增後,如何通知後臺更新鏈接串成了一個關鍵的問題。因而就產生了使用ZooKeeper管理數據庫鏈接串的奇思異想。具體方案以下:mysql

  

 

 

  1. 運維負責搭建數據庫,並執行初始化腳本,而後把對應的數據庫配置刷入ZooKeeper;git

  2. 運維完成App(1...N)的部署,App(1...N)從ZooKeeper讀取對應的數據庫配置;github

  3. 後臺監聽ZooKeeper,更新數據庫配置到後臺應用內存。sql

 

環境準備

  1. 安裝Zookeeperdocker

  docker pull zookeeper:3.4.13數據庫

  

  

  docker run --name zookeeper -d -p 2181:2181 zookeeper:3.4.13apache

  

 

  2. 安裝Mysqlapp

  docker pull mysql:5.7運維

  

 

  docker run --name mysql -e MYSQL_ROOT_PASSWORD=root -p 3306:3306 -d mysql:5.7
  docker run --name mysql2 -e MYSQL_ROOT_PASSWORD=root -p 3307:3306 -d mysql:5.7
  docker run --name mysql3 -e MYSQL_ROOT_PASSWORD=root -p 3308:3306 -d mysql:5.7async

  

 

  3. 初始化數據庫

CREATE DATABASE test;
USE test;
CREATE TABLE `table` (
  `id` int(11) NOT NULL,
  `name` varchar(50) NOT NULL,
  PRIMARY KEY (`id`)
);

   

 

  分別在各個數據庫插入測試數據

  mysql:

USE test;
INSERT INTO `table` (id, name) VALUES (1, 'A1');
INSERT INTO `table` (id, name) VALUES (2, 'B1');
INSERT INTO `table` (id, name) VALUES (3, 'C1');

 

  mysql2:

USE test;
INSERT INTO `table` (id, name) VALUES (1, 'A2');
INSERT INTO `table` (id, name) VALUES (2, 'B2');
INSERT INTO `table` (id, name) VALUES (3, 'C2');

 

  mysql3:

USE test;
INSERT INTO `table` (id, name) VALUES (1, 'A3');
INSERT INTO `table` (id, name) VALUES (2, 'B3');
INSERT INTO `table` (id, name) VALUES (3, 'C3');

 

  4. 基於數據庫生成POCO

  Install-Package MySql.Data.EntityFrameworkCore -Version 8.0.13

  Scaffold-DbContext "server=127.0.0.1;port=3306;user=root;password=123456;database=test" MySql.Data.EntityFrameworkCore -OutputDir DataAccess -f

  

  

 

  5. 引用ZooKeeper相關組件

  Install-Package ZooKeeperNetEx -Version 3.4.12.1

  

核心代碼

  1. ZookeeperOption:從appsettings中讀取ZooKeeper相關配置

    public class ZookeeperOption
    {
        public ZookeeperOption(IConfiguration config)
        {
            if (config == null)
            {
                throw new ArgumentNullException(nameof(config));
            }

            var section = config.GetSection("zookeeper");
            section.Bind(this);
        }

        public string ConnectionString { get; set; }

        public int Timeout { get; set; }
    }

 

  2. ZookeeperServiceCollectionExtensions:註冊ZooKeeper服務

    public static class ZookeeperServiceCollectionExtensions
    {
        public static IServiceCollection AddZookeeper(this IServiceCollection services, IConfiguration config)
        {
            if (services == null)
            {
                throw new ArgumentNullException(nameof(services));
            }

            if (config == null)
            {
                throw new ArgumentNullException(nameof(config));
            }

            services.AddOptions();

            var option = new ZookeeperOption(config);

            var zookeeper = new org.apache.zookeeper.ZooKeeper(option.ConnectionString, option.Timeout * 1000, new DefaultWatcher());

            services.Add(ServiceDescriptor.Singleton(zookeeper));
            return services;
        }
    }

    public class DefaultWatcher : Watcher
    {
        public override Task process(WatchedEvent @event)
        {
            return Task.CompletedTask;
        }
    }

 

  3. ZookeeperHandler:ZooKeeper初始化及目錄變化處理類,並把數據庫鏈接信息寫入程序內存

    public interface IZookeeperHandler
    {
        Task InitAsync();

        Task RefreshAsync();
    }

    public class ZookeeperHandler: IZookeeperHandler
    {
        private readonly org.apache.zookeeper.ZooKeeper _zooKeeper;
        private readonly IMemoryCache _cache;

        public ZookeeperHandler(org.apache.zookeeper.ZooKeeper zooKeeper, IMemoryCache cache)
        {
            _zooKeeper = zooKeeper;
            _cache = cache;
        }

        public async Task InitAsync()
        {
            await RefreshAsync();
        }

        public async Task RefreshAsync()
        {
            var connDic = new Dictionary<string, string>();

            var isExisted = await _zooKeeper.existsAsync("/connections");
            if (isExisted == null)
            {
                await _zooKeeper.createAsync("/connections", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }

            var connResult = await _zooKeeper.getChildrenAsync("/connections", new ConnectionWatcher(this));

            foreach (var conn in connResult.Children)
            {
                var connData = await _zooKeeper.getDataAsync($"/connections/{conn}/value");
                var connStr = Encoding.UTF8.GetString(connData.Data);
                connDic[conn] = connStr;
            }

            _cache.Set("connections", connDic);
        }
    }

 

  4. ConnectionWatcher:監聽者,內容變化時調用ZookeeperHandler的RefreshAsync()方法,其中,變化只通知一次,所以須要再次創建監聽

    public class ConnectionWatcher : Watcher
    {
        private readonly IZookeeperHandler _zookeeperService;

        public ConnectionWatcher(IZookeeperHandler zookeeperService)
        {
            _zookeeperService = zookeeperService;
        }

        public override async Task process(WatchedEvent @event)
        {
            var type = @event.get_Type();

            if (type != Event.EventType.None)
            {
                await _zookeeperService.RefreshAsync();
            }
        }
    }

 

  5. ZookeeperApplicationBuilderExtensions:初始化

    public static class ZookeeperApplicationBuilderExtensions
    {
        public static IApplicationBuilder UseZookeeper(this IApplicationBuilder app)
        {
            var service = app.ApplicationServices.GetRequiredService<IZookeeperHandler>();
            service.InitAsync().Wait();
            return app;
        }
    }

 

  6. ContextProvider:根據Id從內存中讀取對應的數據庫鏈接串,並提供DbContext實例

    public interface IContextProvider
    {
        TestContext GetContext(string id);
    }

    public class ContextProvider : IContextProvider
    {
        private readonly IMemoryCache _cache;

        public ContextProvider(IMemoryCache cache)
        {
            _cache = cache;
        }

        public TestContext GetContext(string id)
        {
            var dic = _cache.Get<Dictionary<string, string>>("connections");
            var connectionStr = dic[id];

            var optionsBuilder = new DbContextOptionsBuilder<TestContext>();
            optionsBuilder.UseMySQL(connectionStr);
            return new TestContext(optionsBuilder.Options);
        }
    }

 

效果演示

  1. 剛開始沒有任何鏈接信息

  

 

  2. 添加一個鏈接信息

  

  

 

  3. 查詢鏈接對應的數據

  

 

  4. 再添加兩個鏈接信息

  

  

  

  

  

 

  5. 查看ZooKeeper的信息

  docker run -it --rm --link zookeeper:zookeeper zookeeper:3.4.13 zkCli.sh -server zookeeper

  ls /connections

  get /connections/1/value

  get /connections/2/value

  get /connections/3/value

  

 

補充說明

  由於把數據庫鏈接信息寫到了程序內存中,所以,若是當ZooKeeper出現了故障:

  1. 老的(正在運行)應用正在使用的數據庫不會受到影響,但沒法監聽到數據庫信息的變化;

  2. 新的應用沒法啓動。

  ZooKeeper恢復後:

  1. 老的(正在運行)應用會重連,從新監聽到數據庫信息的變化

  2. 新的應用能夠成功啓動。

 

源碼地址

  https://github.com/ErikXu/zookeeper-connection-management

相關文章
相關標籤/搜索