編排治理
概述
- 和dubbo的服務治理很相似啊,不過不帶UI,功能還在完善中
- sharding-jdbc啓動時,將配置信息分庫分表,主從等信息持久化到zookeeper/etcd節點上
- 註冊zookeeper/etcd 的監聽zookeeper的watch。(不理解的先了解zk的特色)。
- 一旦節點改變,將新的節點信息下發到zk client,zk client從新創建配置信息
- 服務治理也就是這是原理,大同小異,配置信息改變,服務不須要重啓,可動態改變適應新配置
具體實現
- 工廠模式一路跟進,沒什麼特別的
- @1 開始註冊znode監聽器
DataSource dataSource = OrchestrationShardingDataSourceFactory.createDataSource(yamlFile);
public static DataSource createDataSource(......) throws SQLException {
OrchestrationShardingDataSource result = new OrchestrationShardingDataSource(dataSourceMap, shardingRuleConfig, configMap, props, orchestrationConfig);
result.init();
return result;
}
public void init() throws SQLException {
orchestrationFacade.init(dataSourceMap, shardingRuleConfig, configMap, props, this);
}
public void init(......) throws SQLException {
......
//持久化配置到znode
dataSourceService.persistDataSourcesNode();
//@1 註冊zk監聽器
listenerManager.initShardingListeners(shardingDataSource);
}
public void initShardingListeners(final ShardingDataSource shardingDataSource) {
......
dataSourceListenerManager.start(shardingDataSource);
}
註冊監聽器
@Override
public void start(final ShardingDataSource shardingDataSource) {
regCenter.watch(stateNode.getDataSourcesNodeFullPath(), new EventListener() {
@Override
public void onChange(final DataChangedEvent event) {
if (DataChangedEvent.Type.UPDATED == event.getEventType() || DataChangedEvent.Type.DELETED == event.getEventType()) {
try {
shardingDataSource.renew(dataSourceService.getAvailableShardingRuleConfiguration().build(dataSourceService.getAvailableDataSources()), configService.loadShardingProperties());
} catch (final SQLException ex) {
throw new ShardingJdbcException(ex);
}
}
}
});
}
public void renew(final ShardingRule newShardingRule, final Properties newProps) throws SQLException {
ShardingProperties newShardingProperties = new ShardingProperties(null == newProps ? new Properties() : newProps);
int originalExecutorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
int newExecutorSize = newShardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
if (originalExecutorSize != newExecutorSize) {
executorEngine.close();
executorEngine = new ExecutorEngine(newExecutorSize);
}
boolean newShowSQL = newShardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW);
shardingProperties = newShardingProperties;
shardingContext = new ShardingContext(newShardingRule, getDatabaseType(), executorEngine, newShowSQL);
}
- watch裏面就是zk client發現改變,觸發事件。一些sharding-jdbc根據狀態作不一樣處理。比較直白
@Override
public void watch(final String key, final EventListener eventListener) {
final String path = key + "/";
if (!caches.containsKey(path)) {
addCacheData(key);
}
TreeCache cache = caches.get(path);
cache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
if (null == data || null == data.getPath()) {
return;
}
eventListener.onChange(new DataChangedEvent(getEventType(event), data.getPath(), null == data.getData() ? null : new String(data.getData(), "UTF-8")));
}
private DataChangedEvent.Type getEventType(final TreeCacheEvent event) {
switch (event.getType()) {
case NODE_UPDATED:
return DataChangedEvent.Type.UPDATED;
case NODE_REMOVED:
return DataChangedEvent.Type.DELETED;
default:
return DataChangedEvent.Type.IGNORED;
}
}
});
}
小結
- 服務治理思想更深入了,註冊中心,內容改變,全部註冊用戶都自動改變。仍是看代碼理解的透徹啊
- 中間應該可能作更多的事,限流啊,熔斷啊,監控啊什麼的
- 阿里的代碼咋就那麼難以閱讀呢,RocketMQ,Dubbo,再接再礪吧
- sharding-jdbc 路由sql重寫下一步