有一套特定規格的應用(程序+數據庫),當有業務需求時,就須要多部署應用,而且全部的應用都使用一個共同的後臺來管理。應用新增後,如何通知後臺更新鏈接串成了一個關鍵的問題。因而就產生了使用ZooKeeper管理數據庫鏈接串的奇思異想。具體方案以下:mysql
1. 運維負責搭建數據庫,並執行初始化腳本,而後把對應的數據庫配置刷入ZooKeeper;git
2. 運維完成App(1...N)的部署,App(1...N)從ZooKeeper讀取對應的數據庫配置;github
3. 後臺監聽ZooKeeper,更新數據庫配置到後臺應用內存。sql
1. 安裝Zookeeperdocker
docker pull zookeeper:3.4.13數據庫
docker run --name zookeeper -d -p 2181:2181 zookeeper:3.4.13apache
2. 安裝Mysqlapp
docker pull mysql:5.7運維
docker run --name mysql -e MYSQL_ROOT_PASSWORD=root -p 3306:3306 -d mysql:5.7
docker run --name mysql2 -e MYSQL_ROOT_PASSWORD=root -p 3307:3306 -d mysql:5.7
docker run --name mysql3 -e MYSQL_ROOT_PASSWORD=root -p 3308:3306 -d mysql:5.7async
3. 初始化數據庫
CREATE DATABASE test; USE test; CREATE TABLE `table` ( `id` int(11) NOT NULL, `name` varchar(50) NOT NULL, PRIMARY KEY (`id`) );
分別在各個數據庫插入測試數據
mysql:
USE test; INSERT INTO `table` (id, name) VALUES (1, 'A1'); INSERT INTO `table` (id, name) VALUES (2, 'B1'); INSERT INTO `table` (id, name) VALUES (3, 'C1');
mysql2:
USE test; INSERT INTO `table` (id, name) VALUES (1, 'A2'); INSERT INTO `table` (id, name) VALUES (2, 'B2'); INSERT INTO `table` (id, name) VALUES (3, 'C2');
mysql3:
USE test; INSERT INTO `table` (id, name) VALUES (1, 'A3'); INSERT INTO `table` (id, name) VALUES (2, 'B3'); INSERT INTO `table` (id, name) VALUES (3, 'C3');
4. 基於數據庫生成POCO
Install-Package MySql.Data.EntityFrameworkCore -Version 8.0.13
Scaffold-DbContext "server=127.0.0.1;port=3306;user=root;password=123456;database=test" MySql.Data.EntityFrameworkCore -OutputDir DataAccess -f
5. 引用ZooKeeper相關組件
Install-Package ZooKeeperNetEx -Version 3.4.12.1
1. ZookeeperOption:從appsettings中讀取ZooKeeper相關配置
public class ZookeeperOption { public ZookeeperOption(IConfiguration config) { if (config == null) { throw new ArgumentNullException(nameof(config)); } var section = config.GetSection("zookeeper"); section.Bind(this); } public string ConnectionString { get; set; } public int Timeout { get; set; } }
2. ZookeeperServiceCollectionExtensions:註冊ZooKeeper服務
public static class ZookeeperServiceCollectionExtensions { public static IServiceCollection AddZookeeper(this IServiceCollection services, IConfiguration config) { if (services == null) { throw new ArgumentNullException(nameof(services)); } if (config == null) { throw new ArgumentNullException(nameof(config)); } services.AddOptions(); var option = new ZookeeperOption(config); var zookeeper = new org.apache.zookeeper.ZooKeeper(option.ConnectionString, option.Timeout * 1000, new DefaultWatcher()); services.Add(ServiceDescriptor.Singleton(zookeeper)); return services; } } public class DefaultWatcher : Watcher { public override Task process(WatchedEvent @event) { return Task.CompletedTask; } }
3. ZookeeperHandler:ZooKeeper初始化及目錄變化處理類,並把數據庫鏈接信息寫入程序內存
public interface IZookeeperHandler { Task InitAsync(); Task RefreshAsync(); } public class ZookeeperHandler: IZookeeperHandler { private readonly org.apache.zookeeper.ZooKeeper _zooKeeper; private readonly IMemoryCache _cache; public ZookeeperHandler(org.apache.zookeeper.ZooKeeper zooKeeper, IMemoryCache cache) { _zooKeeper = zooKeeper; _cache = cache; } public async Task InitAsync() { await RefreshAsync(); } public async Task RefreshAsync() { var connDic = new Dictionary<string, string>(); var isExisted = await _zooKeeper.existsAsync("/connections"); if (isExisted == null) { await _zooKeeper.createAsync("/connections", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } var connResult = await _zooKeeper.getChildrenAsync("/connections", new ConnectionWatcher(this)); foreach (var conn in connResult.Children) { var connData = await _zooKeeper.getDataAsync($"/connections/{conn}/value"); var connStr = Encoding.UTF8.GetString(connData.Data); connDic[conn] = connStr; } _cache.Set("connections", connDic); } }
4. ConnectionWatcher:監聽者,內容變化時調用ZookeeperHandler的RefreshAsync()方法,其中,變化只通知一次,所以須要再次創建監聽
public class ConnectionWatcher : Watcher { private readonly IZookeeperHandler _zookeeperService; public ConnectionWatcher(IZookeeperHandler zookeeperService) { _zookeeperService = zookeeperService; } public override async Task process(WatchedEvent @event) { var type = @event.get_Type(); if (type != Event.EventType.None) { await _zookeeperService.RefreshAsync(); } } }
5. ZookeeperApplicationBuilderExtensions:初始化
public static class ZookeeperApplicationBuilderExtensions { public static IApplicationBuilder UseZookeeper(this IApplicationBuilder app) { var service = app.ApplicationServices.GetRequiredService<IZookeeperHandler>(); service.InitAsync().Wait(); return app; } }
6. ContextProvider:根據Id從內存中讀取對應的數據庫鏈接串,並提供DbContext實例
public interface IContextProvider { TestContext GetContext(string id); } public class ContextProvider : IContextProvider { private readonly IMemoryCache _cache; public ContextProvider(IMemoryCache cache) { _cache = cache; } public TestContext GetContext(string id) { var dic = _cache.Get<Dictionary<string, string>>("connections"); var connectionStr = dic[id]; var optionsBuilder = new DbContextOptionsBuilder<TestContext>(); optionsBuilder.UseMySQL(connectionStr); return new TestContext(optionsBuilder.Options); } }
1. 剛開始沒有任何鏈接信息
2. 添加一個鏈接信息
3. 查詢鏈接對應的數據
4. 再添加兩個鏈接信息
5. 查看ZooKeeper的信息
docker run -it --rm --link zookeeper:zookeeper zookeeper:3.4.13 zkCli.sh -server zookeeper
ls /connections
get /connections/1/value
get /connections/2/value
get /connections/3/value
由於把數據庫鏈接信息寫到了程序內存中,所以,若是當ZooKeeper出現了故障:
1. 老的(正在運行)應用正在使用的數據庫不會受到影響,但沒法監聽到數據庫信息的變化;
2. 新的應用沒法啓動。
ZooKeeper恢復後:
1. 老的(正在運行)應用會重連,從新監聽到數據庫信息的變化
2. 新的應用能夠成功啓動。