我的知識庫java
上一篇,梳理http 數據同步策略的變動通知機制,本篇開始探究配置變動通知到達後, soul-web
端的處理響應。web
不一樣數據變動的通知機制應當是一致的,故本篇以 selector 配置變動通知爲切入點進行深刻。json
上回咱們說到 HttpSyncDataService 的 doLongPolling,在其內部發起通知訂閱並接收響應通知:緩存
private void doLongPolling(final String server) { ... String listenerUrl = server + "/configs/listener"; ... try { // 發起監聽請求 String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody(); log.debug("listener result: [{}]", json); groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data"); } catch (RestClientException e) { ... } // 處理變動通知 if (groupJson != null) { // fetch group configuration async. ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class); if (ArrayUtils.isNotEmpty(changedGroups)) { log.info("Group config changed: {}", Arrays.toString(changedGroups)); // 獲取組配置 this.doFetchGroupConfig(server, changedGroups); } } }
在收到變動通知時,若存在配置組變動,則按變動組獲取相應配置。async
獲取組配置處理以下:ide
private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) { ... String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&"); ... try { json = this.httpClient.getForObject(url, String.class); } catch (RestClientException e) { ... } // update local cache boolean updated = this.updateCacheWithJson(json); ... }
內部發起配置獲取請求並更新本地緩存。post
由 HttpSyncDataService 實現本地緩存更新:fetch
private boolean updateCacheWithJson(final String json) { JsonObject jsonObject = GSON.fromJson(json, JsonObject.class); JsonObject data = jsonObject.getAsJsonObject("data"); // if the config cache will be updated? return factory.executor(data); }
轉成 Json 對象後交由 DataRefreshFactory 進行處理。this
DataRefreshFactory 處理以下:url
public boolean executor(final JsonObject data) { final boolean[] success = {false}; ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data)); return success[0]; }
調用相應數據刷新類刷新數據。
統一由 AbstractDataRefresh 的 refresh 進行處理:
public Boolean refresh(final JsonObject data) { boolean updated = false; JsonObject jsonObject = convert(data); if (null != jsonObject) { ConfigData<T> result = fromJson(jsonObject); if (this.updateCacheIfNeed(result)) { updated = true; refresh(result.getData()); } } return updated; }
先更新本地緩存,再調用子類實現的 refresh。
此處的更新本地緩存處理,由子類 SelectorDataRefresh 的 updateCacheIfNeed 實現:
protected boolean updateCacheIfNeed(final ConfigData<SelectorData> result) { return updateCacheIfNeed(result, ConfigGroupEnum.SELECTOR); }
向父類 AbstractDataRefresh 的 updateCacheIfNeed 指定更新 selector 配置組。
父類 AbstractDataRefresh 的 updateCacheIfNeed 處理:
protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) { // 首次初始化緩存 if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) { return true; } ResultHolder holder = new ResultHolder(false); GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> { // 必須比較最後更新時間 if (!StringUtils.equals(oldVal.getMd5(), newVal.getMd5()) && oldVal.getLastModifyTime() < newVal.getLastModifyTime()) { ... holder.result = true; return newVal; } ... return oldVal; }); return holder.result; }
經過比較新老緩存的 MD5 值來斷定是否發生變動,存在變動則更新本地緩存(注意還有最後更新時間斷定)。
SelectorDataRefresh 的 refresh 實現:
protected void refresh(final List<SelectorData> data) { if (CollectionUtils.isEmpty(data)) { log.info("clear all selector cache, old cache"); data.forEach(pluginDataSubscriber::unSelectorSubscribe); pluginDataSubscriber.refreshSelectorDataAll(); } else { // update cache for UpstreamCacheManager pluginDataSubscriber.refreshSelectorDataAll(); data.forEach(pluginDataSubscriber::onSelectorSubscribe); } }
CommonPluginDataSubscriber 實現訂閱取消:
public void unSelectorSubscribe(final SelectorData selectorData) { subscribeDataHandler(selectorData, DataEventTypeEnum.DELETE); }
subscribeDataHandler 對 selectorData 的 delete 處理:
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) { Optional.ofNullable(classData).ifPresent(data -> { if (data instanceof PluginData) { ... } else if (data instanceof SelectorData) { SelectorData selectorData = (SelectorData) data; if (dataType == DataEventTypeEnum.UPDATE) { ... } else if (dataType == DataEventTypeEnum.DELETE) { BaseDataCache.getInstance().removeSelectData(selectorData); Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData)); } } else if (data instanceof RuleData) { ... } }); }
從 BaseDataCache 刪除目標選擇器數據,並移除選擇器。
此處由 DividePluginDataHandler 提供 removeSelector 實現:
public void removeSelector(final SelectorData selectorData) { UpstreamCacheManager.getInstance().removeByKey(selectorData.getId()); }
根據 selector id 移除緩存的上游服務,注意只是從 UPSTREAM_MAP_TEMP 移除
public void removeByKey(final String key) { UPSTREAM_MAP_TEMP.remove(key); }
CommonPluginDataSubscriber 實現數據刷新:
public void refreshSelectorDataAll() { BaseDataCache.getInstance().cleanSelectorData(); }
注意這裏的 refresh all 實際是作的 clean 操做。
BaseDataCache 的 cleanSelectorData 處理:
public void cleanSelectorData() { SELECTOR_MAP.clear(); }
直接清除 SELECTOR_MAP 全部數據。
CommonPluginDataSubscriber 實現訂閱響應:
public void onSelectorSubscribe(final SelectorData selectorData) { subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE); }
subscribeDataHandler 對 selectorData 的 update 處理:
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) { Optional.ofNullable(classData).ifPresent(data -> { if (data instanceof PluginData) { ... } else if (data instanceof SelectorData) { SelectorData selectorData = (SelectorData) data; if (dataType == DataEventTypeEnum.UPDATE) { BaseDataCache.getInstance().cacheSelectData(selectorData); Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData)); } else if (dataType == DataEventTypeEnum.DELETE) { ... } } else if (data instanceof RuleData) { ... } }); }
緩存選擇器數據到 BaseDataCache,並處理選擇器。
此處由 DividePluginDataHandler 提供 handlerSelector 實現:
public void handlerSelector(final SelectorData selectorData) { UpstreamCacheManager.getInstance().submit(selectorData); }
提交選擇器數據到 UpstreamCacheManager。
UpstreamCacheManager 的 submit 處理:
public void submit(final SelectorData selectorData) { final List<DivideUpstream> upstreamList = GsonUtils.getInstance().fromList(selectorData.getHandle(), DivideUpstream.class); if (null != upstreamList && upstreamList.size() > 0) { UPSTREAM_MAP.put(selectorData.getId(), upstreamList); UPSTREAM_MAP_TEMP.put(selectorData.getId(), upstreamList); } else { UPSTREAM_MAP.remove(selectorData.getId()); UPSTREAM_MAP_TEMP.remove(selectorData.getId()); } }
根據 selector id 更新 UPSTREAM_MAP 和 UPSTREAM_MAP_TEMP。
本篇梳理和分析了配置變動通知到達後 soul-web
端的處理流程,最終處理主要是更新本地配置緩存以及維護上游服務散列表。
soul-web
收到變動通知後處理流程以下:
soul-web 端收到響應
- 若配置組數據存在變動,則發起獲取配置請求獲取最新配置信息
- 更新配置組緩存
- 循環處理配置數據刷新事件
- 若最新配置數據爲空,則刪除本地配置數據並移除上游服務
- 若最新配置數據不爲空,則緩存配置組數據並更新上游服務
- 若配置組數據無變動,不做處理