【Soul網關探祕】http數據同步-Web端處理變動通知

我的知識庫java

引言

上一篇,梳理http 數據同步策略的變動通知機制,本篇開始探究配置變動通知到達後, soul-web 端的處理響應。web

不一樣數據變動的通知機制應當是一致的,故本篇以 selector 配置變動通知爲切入點進行深刻。json

通知處理入口

上回咱們說到 HttpSyncDataService 的 doLongPolling,在其內部發起通知訂閱並接收響應通知:緩存

private void doLongPolling(final String server) {
    ...
    String listenerUrl = server + "/configs/listener";
    ...
    try {
      	// 發起監聽請求
        String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
        log.debug("listener result: [{}]", json);
        groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
    } catch (RestClientException e) {
        ...
    }
  	// 處理變動通知
    if (groupJson != null) {
        // fetch group configuration async.
        ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
        if (ArrayUtils.isNotEmpty(changedGroups)) {
            log.info("Group config changed: {}", Arrays.toString(changedGroups));
            // 獲取組配置
          	this.doFetchGroupConfig(server, changedGroups);
        }
    }
}

在收到變動通知時,若存在配置組變動,則按變動組獲取相應配置。async

獲取配置

獲取組配置處理以下:ide

private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
    ...
    String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
    ...
    try {
        json = this.httpClient.getForObject(url, String.class);
    } catch (RestClientException e) {
        ...
    }
    // update local cache
    boolean updated = this.updateCacheWithJson(json);
    ...
}

內部發起配置獲取請求並更新本地緩存。post

更新配置組緩存

由 HttpSyncDataService 實現本地緩存更新:fetch

private boolean updateCacheWithJson(final String json) {
    JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
    JsonObject data = jsonObject.getAsJsonObject("data");
    // if the config cache will be updated?
    return factory.executor(data);
}

轉成 Json 對象後交由 DataRefreshFactory 進行處理。this

DataRefreshFactory 處理以下:url

public boolean executor(final JsonObject data) {
    final boolean[] success = {false};
    ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));
    return success[0];
}

調用相應數據刷新類刷新數據。

統一由 AbstractDataRefresh 的 refresh 進行處理:

public Boolean refresh(final JsonObject data) {
    boolean updated = false;
    JsonObject jsonObject = convert(data);
    if (null != jsonObject) {
        ConfigData<T> result = fromJson(jsonObject);
        if (this.updateCacheIfNeed(result)) {
            updated = true;
            refresh(result.getData());
        }
    }
    return updated;
}

先更新本地緩存,再調用子類實現的 refresh。

此處的更新本地緩存處理,由子類 SelectorDataRefresh 的 updateCacheIfNeed 實現:

protected boolean updateCacheIfNeed(final ConfigData<SelectorData> result) {
    return updateCacheIfNeed(result, ConfigGroupEnum.SELECTOR);
}

向父類 AbstractDataRefresh 的 updateCacheIfNeed 指定更新 selector 配置組。

父類 AbstractDataRefresh 的 updateCacheIfNeed 處理:

protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {
    // 首次初始化緩存
    if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {
        return true;
    }
    ResultHolder holder = new ResultHolder(false);
    GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {
        // 必須比較最後更新時間
        if (!StringUtils.equals(oldVal.getMd5(), newVal.getMd5()) && oldVal.getLastModifyTime() < newVal.getLastModifyTime()) {
            ...
            holder.result = true;
            return newVal;
        }
        ...
        return oldVal;
    });
    return holder.result;
}

經過比較新老緩存的 MD5 值來斷定是否發生變動,存在變動則更新本地緩存(注意還有最後更新時間斷定)。

處理刷新事件

SelectorDataRefresh 的 refresh 實現:

protected void refresh(final List<SelectorData> data) {
    if (CollectionUtils.isEmpty(data)) {
        log.info("clear all selector cache, old cache");
        data.forEach(pluginDataSubscriber::unSelectorSubscribe);
        pluginDataSubscriber.refreshSelectorDataAll();
    } else {
        // update cache for UpstreamCacheManager
        pluginDataSubscriber.refreshSelectorDataAll();
        data.forEach(pluginDataSubscriber::onSelectorSubscribe);
    }
}
  • 若最新數據爲空,則循環取消訂閱並刷新全部選擇器數據,實際是清空選擇器緩存。
  • 若最新數據不爲空,則刷新全部選擇器數據並循環響應選擇器訂閱事件處理,實際是更新上游服務緩存。

取消訂閱

CommonPluginDataSubscriber 實現訂閱取消:

public void unSelectorSubscribe(final SelectorData selectorData) {
    subscribeDataHandler(selectorData, DataEventTypeEnum.DELETE);
}

subscribeDataHandler 對 selectorData 的 delete 處理:

private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
    Optional.ofNullable(classData).ifPresent(data -> {
        if (data instanceof PluginData) {
            ...
        } else if (data instanceof SelectorData) {
            SelectorData selectorData = (SelectorData) data;
            if (dataType == DataEventTypeEnum.UPDATE) {
                ...
            } else if (dataType == DataEventTypeEnum.DELETE) {
                BaseDataCache.getInstance().removeSelectData(selectorData);
                Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
            }
        } else if (data instanceof RuleData) {
            ...
        }
    });
}

從 BaseDataCache 刪除目標選擇器數據,並移除選擇器。

此處由 DividePluginDataHandler 提供 removeSelector 實現:

public void removeSelector(final SelectorData selectorData) {
    UpstreamCacheManager.getInstance().removeByKey(selectorData.getId());
}

根據 selector id 移除緩存的上游服務,注意只是從 UPSTREAM_MAP_TEMP 移除

public void removeByKey(final String key) {
    UPSTREAM_MAP_TEMP.remove(key);
}

刷新數據

CommonPluginDataSubscriber 實現數據刷新:

public void refreshSelectorDataAll() {
    BaseDataCache.getInstance().cleanSelectorData();
}

注意這裏的 refresh all 實際是作的 clean 操做。

BaseDataCache 的 cleanSelectorData 處理:

public void cleanSelectorData() {
    SELECTOR_MAP.clear();
}

直接清除 SELECTOR_MAP 全部數據。

響應訂閱

CommonPluginDataSubscriber 實現訂閱響應:

public void onSelectorSubscribe(final SelectorData selectorData) {
    subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);
}

subscribeDataHandler 對 selectorData 的 update 處理:

private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
    Optional.ofNullable(classData).ifPresent(data -> {
        if (data instanceof PluginData) {
            ...
        } else if (data instanceof SelectorData) {
            SelectorData selectorData = (SelectorData) data;
            if (dataType == DataEventTypeEnum.UPDATE) {
                BaseDataCache.getInstance().cacheSelectData(selectorData);
                Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
            } else if (dataType == DataEventTypeEnum.DELETE) {
                ...
            }
        } else if (data instanceof RuleData) {
            ...
        }
    });
}

緩存選擇器數據到 BaseDataCache,並處理選擇器。

此處由 DividePluginDataHandler 提供 handlerSelector 實現:

public void handlerSelector(final SelectorData selectorData) {
    UpstreamCacheManager.getInstance().submit(selectorData);
}

提交選擇器數據到 UpstreamCacheManager。

UpstreamCacheManager 的 submit 處理:

public void submit(final SelectorData selectorData) {
    final List<DivideUpstream> upstreamList = GsonUtils.getInstance().fromList(selectorData.getHandle(), DivideUpstream.class);
    if (null != upstreamList && upstreamList.size() > 0) {
        UPSTREAM_MAP.put(selectorData.getId(), upstreamList);
        UPSTREAM_MAP_TEMP.put(selectorData.getId(), upstreamList);
    } else {
        UPSTREAM_MAP.remove(selectorData.getId());
        UPSTREAM_MAP_TEMP.remove(selectorData.getId());
    }
}

根據 selector id 更新 UPSTREAM_MAP 和 UPSTREAM_MAP_TEMP。

總結

本篇梳理和分析了配置變動通知到達後 soul-web 端的處理流程,最終處理主要是更新本地配置緩存以及維護上游服務散列表。

soul-web收到變動通知後處理流程以下:

soul-web 端收到響應

  • 若配置組數據存在變動,則發起獲取配置請求獲取最新配置信息
    • 更新配置組緩存
    • 循環處理配置數據刷新事件
      • 若最新配置數據爲空,則刪除本地配置數據並移除上游服務
      • 若最新配置數據不爲空,則緩存配置組數據並更新上游服務
  • 若配置組數據無變動,不做處理
相關文章
相關標籤/搜索